In [None]:
# !pip install torch torchvision
# !pip install yolov5

In [None]:
# from google.colab import drive
# drive.mount('/content/drive', force_remount=True)

In [None]:
import numpy as np
import os
import cv2
from sklearn.model_selection import train_test_split

# Splitting for training model

In [None]:
def preprocess_image(img_path, target_size=(224, 224)):  # size should fit yolov5 input
    img = cv2.imread(img_path)
    img = cv2.resize(img, target_size)
    #img = img.astype('float32') / 255.0  # normalozation # dont work with yolov5
    return img


# Google drive (if colab)
# base_folder = '/content/drive/My Drive/ConsumerCompass/data'

base_folder = 'data 2' #insert your path

folders = [os.path.join(base_folder, folder_name) for folder_name in ['healthy', 'unhealthy', 'glamorous', 'drab', 'rugged', 'gentle', 'fun', 'dull',
                                                                      'innovation', 'conservatism', 'premuim', 'accessibility', 'minimalism', 'detail', 'safety', 'risk']]
#class lables 
label_map = {'healthy': 0, 'unhealthy': 1, 'glamorous': 2, 'drab': 3, 'rugged': 4, 'gentle': 5, 'fun': 6, 'dull': 7,
             'innovation': 8, 'conservatism': 9, 'premuim': 10, 'accessibility': 11, 'minimalism': 12, 'detail': 13, 'safety': 14, 'risk': 15}

all_images = []
all_labels = []

# Dataframe creation 
for folder in folders:
    label_index = label_map[os.path.basename(folder)]
    print(folder)
    for img_file in os.listdir(folder):
        img_path = os.path.join(folder, img_file)
        if os.path.isfile(img_path):
            img = preprocess_image(img_path)
            label_vector = [0] * 16 # number of lables
            label_vector[label_index] = 1
            all_images.append(img)
            all_labels.append(label_vector)

all_images = np.array(all_images)
all_labels = np.array(all_labels)

train_images, test_images, train_labels, test_labels = train_test_split(all_images, all_labels, test_size=0.3, random_state=42)
val_images, test_images, val_labels, test_labels = train_test_split(test_images, test_labels, test_size=0.5, random_state=42)

In [None]:
# def preprocess_image(img_path, target_size=(224, 224)):
#     img = cv2.imread(img_path)
#     if img is None:
#         print(f"Failed to load image at {img_path}")
#         return None
#     img = cv2.resize(img, target_size)
#     return img

# def load_and_preprocess_images(image_folder):
#     processed_images = []
#     image_names = []  # Список для сохранения имен файлов
#     image_files = os.listdir(image_folder)
    
#     for img_file in image_files:
#         img_path = os.path.join(image_folder, img_file)
#         if os.path.isfile(img_path):
#             print(f"Processing image: {img_file}")  # Вывод названия файла перед обработкой
#             img = preprocess_image(img_path)
#             if img is not None:
#                 processed_images.append(img)
#                 image_names.append(img_file)  # Сохранение имени файла
#             else:
#                 print(f"Skipping file: {img_path}")  # Сообщение о пропуске файла

#     return np.array(processed_images), image_names

# data_3_folder = '/Users/andrewshatalov/Downloads/Telegram Desktop/data 3'
# df_real, image_names = load_and_preprocess_images(data_3_folder)

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Lambda, Dense, Dropout, Flatten, Concatenate, Conv2D, MaxPooling2D, BatchNormalization
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Sequential
import torch

#download yolo pretrained model
#if this dont work try to download from yolo githab direct link 
model_yolo = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)

In [None]:
import cv2

#function fot yolo object detection
def detect_objects_yolo(images, model):
    results = []
    # transfer each image to the model
    for img in images:
        result = model(img)
        results.append(result)
    return results

#detection of train test and val datasets
train_results = detect_objects_yolo(train_images, model_yolo)
test_results = detect_objects_yolo(test_images, model_yolo)
val_results = detect_objects_yolo(val_images, model_yolo)

In [None]:
# df_real_results = detect_objects_yolo(df_real, model_yolo)

In [None]:
from IPython.display import display, Image
import io

def display_detection_results(images, results, num):
    # We process the first n images and the results
    for img, result in zip(images[:num], results[:num]):
        # get an image with superimposed frames
        result_img = result.render()[0]

        # converting the image to JPEG format for display in Jupyter
        is_success, buffer = cv2.imencode(".jpg", result_img)
        if not is_success:
            raise ValueError("Failed to encode image")
        
        # creating an Image object and displaying it
        io_buf = io.BytesIO(buffer)
        display(Image(data=io_buf.getvalue()))

display_detection_results(train_images, train_results, 5)

In [None]:
#function for creadtion a dataframe prepared for model input
def prepare_model_inputs(images, results, image_size=(224, 224), max_objects=10):
    prepared_images = []
    object_infos = []

    for img, detection in zip(images, results):
        img_resized = cv2.resize(img, image_size)
        prepared_images.append(img_resized)

        # extract coordinates and classes, check if there are detections
        if detection.xyxy[0].shape[0] > 0:
            objects_info = detection.xyxy[0][:max_objects, :5].cpu().numpy()  
        else:
            objects_info = np.zeros((max_objects, 5))  # if there are no detections, creating an empty array

        # coordinate normalization for each detection
        if objects_info.shape[0] > 0:
            objects_info[:, [0, 2]] /= img.shape[1]
            objects_info[:, [1, 3]] /= img.shape[0]

        if objects_info.shape[0] < max_objects:
            padding = np.zeros((max_objects - objects_info.shape[0], 5))
            objects_info = np.vstack([objects_info, padding])

        object_infos.append(objects_info)

    prepared_images = np.array(prepared_images, dtype=np.float32)
    object_infos = np.array(object_infos, dtype=np.float32)

    return prepared_images, object_infos

In [None]:
train_images_prepared, train_object_infos = prepare_model_inputs(train_images, train_results)
test_images_prepared, test_object_infos = prepare_model_inputs(test_images, test_results)
val_images_prepared, val_object_infos = prepare_model_inputs(val_images, val_results)

In [None]:
# df_real_prepared, df_real_infos = prepare_model_inputs(df_real, df_real_results)

# MODEL 1

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, Flatten, Concatenate, Conv2D, MaxPooling2D, BatchNormalization
from tensorflow.keras.layers import GlobalAveragePooling1D

# Define the base model
base_model = Sequential([
    Conv2D(96, (11, 11), strides=(4, 4), activation='relu', input_shape=(224, 224, 3)),
    MaxPooling2D((3, 3), strides=(2, 2)),
    BatchNormalization(),
    Conv2D(256, (5, 5), padding='same', activation='relu'),
    MaxPooling2D((3, 3), strides=(2, 2)),
    BatchNormalization(),
    Conv2D(384, (3, 3), padding='same', activation='relu'),
    Conv2D(384, (3, 3), padding='same', activation='relu'),
    Conv2D(256, (3, 3), padding='same', activation='relu'),
    MaxPooling2D((3, 3), strides=(2, 2)),
    Flatten(),
    Dense(4096, activation='relu'),
    Dropout(0.5),
    Dense(4096, activation='relu'),
    Dropout(0.5)
])

# Inputs
image_input = Input(shape=(224, 224, 3))
object_info_input = Input(shape=(None, 5))  # Assuming this is [x_min, y_min, x_max, y_max, class_id]

# Base model features
features = base_model(image_input)

# Use global pooling for the object info
object_info_pooled = GlobalAveragePooling1D()(object_info_input)

# Combine the features
combined_features = Concatenate()([features, object_info_pooled])

# Classification layers
classification_output = Dense(4096, activation='relu')(combined_features)
classification_output = Dropout(0.5)(classification_output)
classification_output = Dense(4096, activation='relu')(classification_output)
classification_output = Dropout(0.5)(classification_output)
final_output = Dense(16, activation='sigmoid')(classification_output)

# Create the model
model1 = Model(inputs=[image_input, object_info_input], outputs=final_output)
model1.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model1.summary()

In [None]:
history = model1.fit(
    [train_images, train_object_infos], train_labels,
    validation_data=([val_images, val_object_infos], val_labels),
    epochs=10,
    batch_size=32
)

In [None]:
from tensorflow.keras.metrics import BinaryAccuracy

#make predictions
predictions = model1.predict([test_images_prepared, test_object_infos])

#calculate accuracy
accuracy_metrics = []
for i in range(0, predictions.shape[1], 2):
    metric = BinaryAccuracy()
    metric.update_state(test_labels[:, i:i+2], predictions[:, i:i+2])
    accuracy_metrics.append(metric.result().numpy())

class_pairs = ['healthy-unhealthy', 'glamorous-drab', 'rugged-gentle', 'fun-dull',
               'innovation-conservatism', 'premuim-accessibility', 'minimalism-detail', 'safety-risk']

# print accuracy for every single pair 
for i, acc in enumerate(accuracy_metrics):
    print(f"Accuracy for {class_pairs[i]}: {acc}")

# print avg accuracy
average_accuracy = np.mean(accuracy_metrics)
print(f"Average Accuracy across all class pairs: {average_accuracy}")

# MODEL 2

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, Flatten, Concatenate, Conv2D, MaxPooling2D, BatchNormalization, GlobalAveragePooling2D, GlobalAveragePooling1D, LSTM, Bidirectional
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.regularizers import l2

base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = Dropout(0.5)(x)
x = BatchNormalization()(x)
x = Dense(1024, activation='relu')(x)
x = Dropout(0.5)(x)

object_info_input = Input(shape=(None, 5))  # Assuming this is [x_min, y_min, x_max, y_max, class_id]

object_info_processed = Bidirectional(LSTM(256))(object_info_input)
object_info_processed = Dense(512, activation='relu', kernel_regularizer=l2(0.01))(object_info_processed)
object_info_processed = Dropout(0.5)(object_info_processed)

combined_features = Concatenate()([x, object_info_processed])
combined_features = Dense(1024, activation='relu')(combined_features)
combined_features = Dropout(0.5)(combined_features)
combined_features = BatchNormalization()(combined_features)

final_output = Dense(16, activation='sigmoid')(combined_features)

model2 = Model(inputs=[base_model.input, object_info_input], outputs=final_output)

model2.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model2.summary()

In [None]:
history = model2.fit(
    [train_images, train_object_infos], train_labels,
    validation_data=([val_images, val_object_infos], val_labels),
    epochs=10,
    batch_size=32
)

# MODEL 3

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (
    Input, Dense, Dropout, Flatten, Concatenate, Conv2D, MaxPooling2D, BatchNormalization,
    GlobalAveragePooling1D, Add, Activation
)

# residual block definition
def residual_block(x, filters, kernel_size=3, stride=1, conv_shortcut=True):
    """A residual block with or without a convolutional shortcut."""
    shortcut = x
    if conv_shortcut:
        shortcut = Conv2D(filters, (1, 1), strides=stride)(x)
        shortcut = BatchNormalization()(shortcut)

    x = Conv2D(filters, kernel_size, strides=stride, padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(filters, kernel_size, padding='same')(x)
    x = BatchNormalization()(x)

    x = Add()([shortcut, x])
    x = Activation('relu')(x)
    return x

image_input = Input(shape=(224, 224, 3))
object_info_input = Input(shape=(None, 5))  # Assuming [x_min, y_min, x_max, y_max, class_id]

# base model with residual blocks
x = Conv2D(64, (7, 7), strides=(2, 2), padding='same', activation='relu')(image_input)
x = MaxPooling2D((3, 3), strides=(2, 2))(x)
x = BatchNormalization()(x)

x = residual_block(x, 64)
x = residual_block(x, 64)
x = residual_block(x, 128, stride=2)
x = residual_block(x, 128)

# adding dense layers
x = Flatten()(x)
x = Dense(4096, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(4096, activation='relu')(x)
x = Dropout(0.5)(x)

# use global pooling for object information
object_info_pooled = GlobalAveragePooling1D()(object_info_input)

# combine the features
combined_features = Concatenate()([x, object_info_pooled])

# classification layers
classification_output = Dense(4096, activation='relu')(combined_features)
classification_output = Dropout(0.5)(classification_output)
final_output = Dense(16, activation='sigmoid')(classification_output)

model3 = Model(inputs=[image_input, object_info_input], outputs=final_output)
model3.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model3.summary()

In [None]:
history = model3.fit(
    [train_images, train_object_infos], train_labels,
    validation_data=([val_images, val_object_infos], val_labels),
    epochs=10,  # Number of training epochs
    batch_size=32  # Batch size
)

In [None]:
real_predictions = model3.predict([df_real_prepared, df_real_infos])

In [None]:
column_names = [name for name, index in sorted(label_map.items(), key=lambda x: x[1])]
predictions_df = pd.DataFrame(real_predictions, columns=column_names)

for pair in class_pairs:
    class_1, class_2 = pair.split('-')
    greater_class = predictions_df[[class_1, class_2]].idxmax(axis=1)
    predictions_df[class_1] = 0
    predictions_df[class_2] = 0
    for i, cls in enumerate(greater_class):
        predictions_df.at[i, cls] = 1

predictions_df['image_name'] = image_names 
predictions_df.rename(columns={'premuim': 'premium'}, inplace=True)

print(predictions_df.head())

In [None]:
predictions_df

In [None]:
# def determine_company(image_name):
#     num = int(image_name.split('_')[1].split('.')[0])
    
#     if num == 0:
#         return 'Nike'
#     elif 1 <= num <= 6:
#         return 'Netflix'
#     elif 7 <= num <= 15:
#         return 'Yandex_Travel'
#     elif 16 <= num <= 35:
#         return 'Nike'
#     elif 36 <= num <= 78:
#         return 'Netflix'
#     else:
#         return 'Unknown'

# predictions_df['company_name'] = predictions_df['image_name'].apply(determine_company)


In [None]:
# predictions_df

In [None]:
# predictions_df.to_csv('real_predictions3.csv', index=False)

In [None]:
from tensorflow.keras.metrics import BinaryAccuracy

#make predictions
predictions = model3.predict([test_images_prepared, test_object_infos])

#calculate accuracy
accuracy_metrics = []
for i in range(0, predictions.shape[1], 2):
    metric = BinaryAccuracy()
    metric.update_state(test_labels[:, i:i+2], predictions[:, i:i+2])
    accuracy_metrics.append(metric.result().numpy())

class_pairs = ['healthy-unhealthy', 'glamorous-drab', 'rugged-gentle', 'fun-dull',
               'innovation-conservatism', 'premuim-accessibility', 'minimalism-detail', 'safety-risk']

# print accuracy for every single pair 
for i, acc in enumerate(accuracy_metrics):
    print(f"Accuracy for {class_pairs[i]}: {acc}")

# print avg accuracy
average_accuracy = np.mean(accuracy_metrics)
print(f"Average Accuracy across all class pairs: {average_accuracy}")

In [None]:
import tensorflow as tf

def calculate_tp_fp_tn_fn_per_pair(model, test_images, test_object_infos, test_labels):
    predictions = model.predict([test_images, test_object_infos])
    predictions_binary = (predictions > 0.5).astype(int)
    
    results = []

    for i in range(0, test_labels.shape[1], 2):
        # filtering images for every pair
        relevant_indices = (test_labels[:, i] + test_labels[:, i + 1] > 0)
        relevant_labels = test_labels[relevant_indices, i:i+2]
        relevant_predictions = predictions_binary[relevant_indices, i:i+2]

        tp_metric = tf.keras.metrics.TruePositives()
        fp_metric = tf.keras.metrics.FalsePositives()
        tn_metric = tf.keras.metrics.TrueNegatives()
        fn_metric = tf.keras.metrics.FalseNegatives()
        
        #calculate tp fp tn fn for inside pair
        tp_metric.update_state(relevant_labels, relevant_predictions)
        fp_metric.update_state(relevant_labels, relevant_predictions)
        tn_metric.update_state(relevant_labels, relevant_predictions)
        fn_metric.update_state(relevant_labels, relevant_predictions)
        
        results.append({
            'TP': tp_metric.result().numpy(),
            'FP': fp_metric.result().numpy(),
            'TN': tn_metric.result().numpy(),
            'FN': fn_metric.result().numpy()
        })
        
    return results

results = calculate_tp_fp_tn_fn_per_pair(model3, test_images_prepared, test_object_infos, test_labels)

class_pairs = ['healthy-unhealthy', 'glamorous-drab', 'rugged-gentle', 'fun-dull',
               'innovation-conservatism', 'premuim-accessibility', 'minimalism-detail', 'safety-risk']

In [None]:
# function to calculate precision for pair
def calculate_precision(results):
    precisions = []
    for result in results:
        tp = result['TP']
        fp = result['FP']
        if tp + fp == 0:
            precision = 0
        else:
            precision = tp / (tp + fp)
        precisions.append(precision)
    return precisions

#calculate precision for every pair
precisions = calculate_precision(results)

class_pairs = ['healthy-unhealthy', 'glamorous-drab', 'rugged-gentle', 'fun-dull',
               'innovation-conservatism', 'premuim-accessibility', 'minimalism-detail', 'safety-risk']

#print precision for every pair
for i, (precision, result) in enumerate(zip(precisions, results)):
    print(f"{class_pairs[i]}: Precision={precision:.4f}")

#calculate and print avg precision
average_precision = np.mean(precisions)
print(f"Average Precision across all class pairs: {average_precision:.4f}")


In [None]:
model3.save('model3.h5')  # Download model in HDF5

# Model ensemble 

In [None]:
# predictions1 = (model1.predict([test_images_prepared, test_object_infos]) > 0.5).astype(int)
# predictions2 = (model2.predict([test_images_prepared, test_object_infos]) > 0.5).astype(int)
# predictions3 = (model3.predict([test_images_prepared, test_object_infos]) > 0.5).astype(int)


# # Stack predictions along a new axis to form a matrix [n_samples, n_models]
# predictions_stack = np.stack([predictions1, predictions2, predictions3], axis=1)

# # Majority voting: Count the number of times 1 appears and decide the class based on the majority
# majority_vote = np.apply_along_axis(lambda x: 1 if np.sum(x) >= 2 else 0, axis=1, arr=predictions_stack)

# from tensorflow.keras.metrics import BinaryAccuracy

# # Assume test_labels is your ground truth labels array of the same shape as predictions [n_samples, n_classes]
# accuracy_metrics = []
# class_pairs = ['healthy-unhealthy', 'glamorous-drab', 'rugged-gentle', 'fun-dull',
#                'innovation-conservatism', 'premium-accessibility', 'minimalism-detail', 'safety-risk']

# # Iterate over each pair (assuming each pair is composed of two classes)
# for i in range(0, majority_vote.shape[1], 2):
#     metric = BinaryAccuracy()
#     metric.update_state(test_labels[:, i:i+2], majority_vote[:, i:i+2])
#     accuracy_metrics.append(metric.result().numpy())

# # Print the results for each class pair
# for i, acc in enumerate(accuracy_metrics):
#     print(f"Accuracy for {class_pairs[i]}: {acc}")

# average_accuracy = np.mean(accuracy_metrics)
# print(f"Average Accuracy across all class pairs: {average_accuracy}")