In [None]:
from tensorflow.keras import models, layers, optimizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Add
from tensorflow.keras import backend as K
from transformers import ViTImageProcessor, TFAutoModel

df_train = pd.read_csv('/kaggle/input/visual-taxonomy/train.csv')
df_test = pd.read_csv('/kaggle/input/visual-taxonomy/test.csv')
train_image_dir = '/kaggle/input/visual-taxonomy/train_images'
test_image_dir = '/kaggle/input/visual-taxonomy/test_images'

# Define attribute configurations for each category
categories_attributes = {
    'Kurtis': ['attr_1', 'attr_2', 'attr_3', 'attr_4', 'attr_5', 'attr_6', 'attr_7', 'attr_8', 'attr_9'],
}

# Dictionary to store encoders for each category and attribute
label_encoders = {}

# Custom F1 Score Metric
def f1_metric(y_true, y_pred):
    if len(y_true.shape) == 2 and y_true.shape[1] == 1:
        y_pred = tf.round(y_pred)
    else:
        y_pred = tf.argmax(y_pred, axis=-1)
        y_true = tf.argmax(y_true, axis=-1)
    
    true_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred), tf.float32))
    false_positives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 0), tf.equal(y_pred, 1)), tf.float32))
    false_negatives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 1), tf.equal(y_pred, 0)), tf.float32))

    precision = true_positives / (true_positives + false_positives + K.epsilon())
    recall = true_positives / (true_positives + false_negatives + K.epsilon())
    f1 = 2 * (precision * recall) / (precision + recall + K.epsilon())
    return f1

# Load a smaller pre-trained Vision Transformer

# Extract features function using ViT
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input

# Initialize VGG model with pretrained weights, excluding the top layers.
vgg_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Feature extraction function using VGG
def extract_features_vgg(image_files, image_dir, feature_model, target_size=(224, 224)):
    images = []
    for filename in image_files['filename']:
        img = tf.keras.preprocessing.image.load_img(f"{image_dir}/{filename}", target_size=target_size)
        img = tf.keras.preprocessing.image.img_to_array(img)
        img = preprocess_input(img)  # Preprocess as VGG expects
        images.append(img)
    images = np.array(images)
    features = feature_model.predict(images)  # Extract features with VGG
    features = features.reshape(features.shape[0], -1)  # Flatten features
    return features


# Define the model for multi-output classification
from tensorflow.keras import layers, models, optimizers

# Define the model for multi-output classification with custom layers per attribute
def create_custom_multi_output_model(input_shape, num_classes_dict):
    inputs = layers.Input(shape=input_shape)
    
    outputs = {}
    
    # Custom Layers for attr_1
    attr1_branch = layers.Dense(256, activation=None)(inputs)
    attr1_branch = layers.BatchNormalization()(attr1_branch)
    attr1_branch = layers.Dropout(0.6)(attr1_branch)
    attr1_branch = layers.Dense(128, activation=None)(attr1_branch)
    attr1_branch = layers.BatchNormalization()(attr1_branch)
    attr1_branch = layers.Dropout(0.5)(attr1_branch)
    outputs['attr_1'] = layers.Dense(1, activation='sigmoid', name='output_attr_1')(attr1_branch) if num_classes_dict['attr_1'] == 2 else layers.Dense(num_classes_dict['attr_1'], activation='softmax', name='output_attr_1')(attr1_branch)

    # Custom Layers for attr_2
    attr2_branch = layers.Dense(256, activation=None)(inputs)
    attr2_branch = layers.Dropout(0.4)(attr2_branch)
    attr2_branch = layers.Dense(64, activation=None)(attr2_branch)
    attr2_branch = layers.BatchNormalization()(attr2_branch)
    attr2_branch = layers.Dropout(0.3)(attr2_branch)
    outputs['attr_2'] = layers.Dense(1, activation='sigmoid', name='output_attr_2')(attr2_branch) if num_classes_dict['attr_2'] == 2 else layers.Dense(num_classes_dict['attr_2'], activation='softmax', name='output_attr_2')(attr2_branch)

    # Custom Layers for attr_3
    attr3_branch = layers.Dense(256, activation=None)(inputs)
    attr3_branch = layers.BatchNormalization()(attr3_branch)
    attr3_branch = layers.Dropout(0.3)(attr3_branch)
    attr3_branch = layers.Dense(128, activation=None)(attr3_branch)
    attr3_branch = layers.BatchNormalization()(attr3_branch)
    attr3_branch = layers.Dropout(0.2)(attr3_branch)
    residual = layers.Dense(128, activation=None)(inputs)
    residual = layers.BatchNormalization()(residual)
    attr3_branch = layers.Add()([attr3_branch, residual])
    attr3_branch = layers.ReLU()(attr3_branch)
    outputs['attr_3'] = layers.Dense(1, activation='sigmoid', name='output_attr_3')(attr3_branch) if num_classes_dict['attr_3'] == 2 else layers.Dense(num_classes_dict['attr_3'], activation='softmax', name='output_attr_3')(attr3_branch)

    # Custom Layers for attr_4
    attr4_branch = layers.Dense(256, activation=None)(inputs)
    attr4_branch = layers.ReLU()(attr4_branch)
    attr4_branch = layers.Dropout(0.3)(attr4_branch)
    attr4_branch = layers.Dense(128, activation=None)(attr4_branch)
    attr4_branch = layers.Dropout(0.2)(attr4_branch)
    outputs['attr_4'] = layers.Dense(1, activation='sigmoid', name='output_attr_4')(attr4_branch) if num_classes_dict['attr_4'] == 2 else layers.Dense(num_classes_dict['attr_4'], activation='softmax', name='output_attr_4')(attr4_branch)

    # Custom Layers for attr_5
    attr5_branch = layers.Dense(256, activation=None)(inputs)
    attr5_branch = layers.BatchNormalization()(attr5_branch)
    attr5_branch = layers.ReLU()(attr5_branch)
    attr5_branch = layers.Dropout(0.3)(attr5_branch)
    attr5_branch = layers.Dense(64, activation=None)(attr5_branch)
    attr5_branch = layers.Dropout(0.2)(attr5_branch)
    outputs['attr_5'] = layers.Dense(1, activation='sigmoid', name='output_attr_5')(attr5_branch) if num_classes_dict['attr_5'] == 2 else layers.Dense(num_classes_dict['attr_5'], activation='softmax', name='output_attr_5')(attr5_branch)

    # Custom Layers for attr_6
    attr6_branch = layers.Dense(256, activation=None)(inputs)
    attr6_branch = layers.Dropout(0.3)(attr6_branch)
    attr6_branch = layers.Dense(64, activation=None)(attr6_branch)
    attr6_branch = layers.ReLU()(attr6_branch)
    attr6_branch = layers.Dropout(0.2)(attr6_branch)
    outputs['attr_6'] = layers.Dense(1, activation='sigmoid', name='output_attr_6')(attr6_branch) if num_classes_dict['attr_6'] == 2 else layers.Dense(num_classes_dict['attr_6'], activation='softmax', name='output_attr_6')(attr6_branch)

    # Custom Layers for attr_7
    attr7_branch = layers.Dense(256, activation=None)(inputs)
    attr7_branch = layers.Dropout(0.3)(attr7_branch)
    attr7_branch = layers.Dense(64, activation=None)(attr7_branch)
    attr7_branch = layers.ReLU()(attr7_branch)
    attr7_branch = layers.Dropout(0.2)(attr7_branch)
    outputs['attr_7'] = layers.Dense(1, activation='sigmoid', name='output_attr_7')(attr7_branch) if num_classes_dict['attr_7'] == 2 else layers.Dense(num_classes_dict['attr_7'], activation='softmax', name='output_attr_7')(attr7_branch)

    # Custom Layers for attr_8
    attr8_branch = layers.Dense(256, activation=None)(inputs)
    attr8_branch = layers.BatchNormalization()(attr8_branch)
    attr8_branch = layers.Dropout(0.3)(attr8_branch)
    attr8_branch = layers.Dense(128, activation=None)(attr8_branch)
    attr8_branch = layers.BatchNormalization()(attr8_branch)
    attr8_branch = layers.Dropout(0.3)(attr8_branch)
    attr8_branch = layers.Dense(64, activation=None)(attr8_branch)
    attr8_branch = layers.Dropout(0.2)(attr8_branch)
    outputs['attr_8'] = layers.Dense(1, activation='sigmoid', name='output_attr_8')(attr8_branch) if num_classes_dict['attr_8'] == 2 else layers.Dense(num_classes_dict['attr_8'], activation='softmax', name='output_attr_8')(attr8_branch)

    # Custom Layers for attr_9
    attr9_branch = layers.Dense(256, activation=None)(inputs)
    attr9_branch = layers.Dropout(0.3)(attr9_branch)
    attr9_branch = layers.Dense(64, activation=None)(attr9_branch)
    attr9_branch = layers.ReLU()(attr9_branch)
    attr9_branch = layers.Dropout(0.2)(attr9_branch)
    outputs['attr_9'] = layers.Dense(1, activation='sigmoid', name='output_attr_9')(attr9_branch) if num_classes_dict['attr_9'] == 2 else layers.Dense(num_classes_dict['attr_9'], activation='softmax', name='output_attr_9')(attr9_branch)

    # Build and compile the model
    model = models.Model(inputs=inputs, outputs=list(outputs.values()))
    metrics = [f1_metric for _ in range(len(num_classes_dict))]
    model.compile(
        optimizer=optimizers.Adam(learning_rate=0.0001),
        loss=['binary_crossentropy' if num_classes == 2 else 'sparse_categorical_crossentropy' for num_classes in num_classes_dict.values()],
        metrics= metrics
    )
    
    return model


# Training and prediction
for category, attributes in categories_attributes.items():
    df_category = df_train[df_train['Category'] == category]
    df_category['id'] = df_category['id'].astype(str)
    df_category['filename'] = df_category['id'].apply(lambda x: x.zfill(6) + '.jpg')

    # Prepare label encoders for each attribute
    df_attr = df_category[['filename'] + attributes].dropna()
    num_classes_dict = {}
    label_encoders[category] = {}

    for attr in attributes:
        le = LabelEncoder()
        df_attr[attr] = le.fit_transform(df_attr[attr])
        label_encoders[category][attr] = le
        num_classes_dict[attr] = len(le.classes_)

    # Set up K-Fold Cross-Validation
    kf = KFold(n_splits=3, shuffle=True, random_state=42)
    best_f1_across_folds = 0
    best_model_path = f'best_multi_output_model_{category}.h5'

    for fold, (train_idx, val_idx) in enumerate(kf.split(df_attr)):
        print(f"Training fold {fold+1} for {category}")

        train_fold = df_attr.iloc[train_idx]
        val_fold = df_attr.iloc[val_idx]

        # Extract features for train and validation sets
        train_features = extract_features_vgg(train_fold[['filename']], train_image_dir, vgg_model)
        val_features = extract_features_vgg(val_fold[['filename']], train_image_dir, vgg_model)

        # Prepare labels as a dictionary of targets
        train_labels = {f'output_{attr}': train_fold[attr].values for attr in attributes}
        val_labels = {f'output_{attr}': val_fold[attr].values for attr in attributes}

        # Create and train the model
        multi_output_model = create_custom_multi_output_model(train_features.shape[1:], num_classes_dict)
        multi_output_model.fit(train_features, train_labels, epochs=10, batch_size=16, validation_data=(val_features, val_labels))

        # Predict on validation data
        val_preds = multi_output_model.predict(val_features)
        fold_f1_scores = []
        for i, attr in enumerate(attributes):
            true_labels = val_labels[f'output_{attr}']
            pred_labels = np.round(val_preds[i]).astype(int) if num_classes_dict[attr] == 2 else val_preds[i].argmax(axis=1)
            fold_f1_scores.append(f1_score(true_labels, pred_labels, average='macro'))

        # Calculate the mean F1-score for this fold
        fold_f1 = np.mean(fold_f1_scores)
        print(f"Mean F1-score for fold {fold+1}: {fold_f1}")

        if fold_f1 > best_f1_across_folds:
            best_f1_across_folds = fold_f1
            multi_output_model.save(best_model_path)
            print(f"New best model for {category} with F1-score: {best_f1_across_folds}")

    print(f"Best F1-score across all folds for {category}: {best_f1_across_folds}")

# ---- Prediction on Test Set ----
all_predictions = []
for category, attributes in categories_attributes.items():
    df_test_category = df_test[df_test['Category'] == category]
    df_test_category['id'] = df_test_category['id'].astype(str)
    df_test_category['filename'] = df_test_category['id'].apply(lambda x: x.zfill(6) + '.jpg')

    # Load the best model for this category
    multi_output_model = models.load_model(best_model_path)

    # Extract features for the test set
    test_features = extract_features_vgg(df_test_category[['filename']], test_image_dir, vgg_model)

    # Predict for each attribute
    test_preds = multi_output_model.predict(test_features)
    for i, attr in enumerate(attributes):
        pred_labels = np.round(test_preds[i]).astype(int) if num_classes_dict[attr] == 2 else test_preds[i].argmax(axis=1)
        test_preds_decoded = label_encoders[category][attr].inverse_transform(pred_labels)
        df_test_category[f'predicted_{attr}'] = test_preds_decoded

    all_predictions.append(df_test_category[['id'] + [f'predicted_{attr}' for attr in attributes]])

# Concatenate all predictions and save to CSV
df_predictions = pd.concat(all_predictions)
df_predictions.to_csv('multi_output_predictions.csv', index=False)

print("Test predictions saved to 'multi_output_predictions.csv'")


In [None]:
from tensorflow.keras import models, layers, optimizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Add
from tensorflow.keras import backend as K
from transformers import ViTImageProcessor, TFAutoModel

df_train = pd.read_csv('/kaggle/input/visual-taxonomy/train.csv')
df_test = pd.read_csv('/kaggle/input/visual-taxonomy/test.csv')
train_image_dir = '/kaggle/input/visual-taxonomy/train_images'
test_image_dir = '/kaggle/input/visual-taxonomy/test_images'

# Define attribute configurations for each category
categories_attributes = {
    'Mens Tshirt': ['attr_1', 'attr_2', 'attr_3', 'attr_4', 'attr_5'],
}

# Dictionary to store encoders for each category and attribute
label_encoders = {}

# Custom F1 Score Metric
def f1_metric(y_true, y_pred):
    if len(y_true.shape) == 2 and y_true.shape[1] == 1:
        y_pred = tf.round(y_pred)
    else:
        y_pred = tf.argmax(y_pred, axis=-1)
        y_true = tf.argmax(y_true, axis=-1)
    
    true_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred), tf.float32))
    false_positives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 0), tf.equal(y_pred, 1)), tf.float32))
    false_negatives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 1), tf.equal(y_pred, 0)), tf.float32))

    precision = true_positives / (true_positives + false_positives + K.epsilon())
    recall = true_positives / (true_positives + false_negatives + K.epsilon())
    f1 = 2 * (precision * recall) / (precision + recall + K.epsilon())
    return f1

# Load a smaller pre-trained Vision Transformer

# Extract features function using ViT
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input

# Initialize VGG model with pretrained weights, excluding the top layers.
vgg_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Feature extraction function using VGG
def extract_features_vgg(image_files, image_dir, feature_model, target_size=(224, 224)):
    images = []
    for filename in image_files['filename']:
        img = tf.keras.preprocessing.image.load_img(f"{image_dir}/{filename}", target_size=target_size)
        img = tf.keras.preprocessing.image.img_to_array(img)
        img = preprocess_input(img)  # Preprocess as VGG expects
        images.append(img)
    images = np.array(images)
    features = feature_model.predict(images)  # Extract features with VGG
    features = features.reshape(features.shape[0], -1)  # Flatten features
    return features


# Define the model for multi-output classification
from tensorflow.keras import layers, models, optimizers

# Define the model for multi-output classification with custom layers per attribute
def create_custom_multi_output_model(input_shape, num_classes_dict):
    inputs = layers.Input(shape=input_shape)
    
    outputs = {}
    
    # Custom Layers for attr_1
#     attr1_branch = layers.Dense(256, activation=None)(inputs)
#     attr1_branch = layers.Dropout(0.5)(attr1_branch)
    
    attr1_branch = layers.Dense(128, activation=None)(attr1_branch)
    attr1_branch = layers.BatchNormalization()(attr1_branch)
    attr1_branch = layers.ReLU()(attr1_branch)
    attr1_branch = layers.Dense(64, activation=None)(attr1_branch)
    attr1_branch = layers.BatchNormalization()(attr1_branch)
    attr1_branch = layers.Dropout(0.5)(attr1_branch)
    
    outputs['attr_1'] = layers.Dense(1, activation='sigmoid', name='output_attr_1')(attr1_branch) if num_classes_dict['attr_1'] == 2 else layers.Dense(num_classes_dict['attr_1'], activation='softmax', name='output_attr_1')(attr1_branch)

    # Custom Layers for attr_2
    attr2_branch = layers.Dense(256, activation=None)(inputs)
    attr2_branch = layers.BatchNormalization()(attr2_branch)
    attr2_branch = layers.Dropout(0.5)(attr2_branch)
    outputs['attr_2'] = layers.Dense(1, activation='sigmoid', name='output_attr_2')(attr2_branch) if num_classes_dict['attr_2'] == 2 else layers.Dense(num_classes_dict['attr_2'], activation='softmax', name='output_attr_2')(attr2_branch)

    # Custom Layers for attr_3
    attr3_branch = layers.Dense(256, activation=None)(inputs)
    attr3_branch = layers.Dropout(0.5)(attr3_branch)
    
    attr3_branch = layers.Dense(128, activation=None)(attr3_branch)
    attr3_branch = layers.BatchNormalization()(attr3_branch)
    attr3_branch = layers.ReLU()(attr3_branch)
    attr3_branch = layers.Dropout(0.5)(attr3_branch)
    
    outputs['attr_3'] = layers.Dense(1, activation='sigmoid', name='output_attr_3')(attr3_branch) if num_classes_dict['attr_3'] == 2 else layers.Dense(num_classes_dict['attr_3'], activation='softmax', name='output_attr_3')(attr3_branch)

    # Custom Layers for attr_4
    attr4_branch = layers.Dense(256, activation=None)(inputs)
    attr4_branch = layers.BatchNormalization()(attr4_branch)
    attr4_branch = layers.ReLU()(attr4_branch)
    attr4_branch = layers.Dropout(0.3)(attr4_branch)
    outputs['attr_4'] = layers.Dense(1, activation='sigmoid', name='output_attr_4')(attr4_branch) if num_classes_dict['attr_4'] == 2 else layers.Dense(num_classes_dict['attr_4'], activation='softmax', name='output_attr_4')(attr4_branch)

    # Custom Layers for attr_5
    attr5_branch = layers.Dense(256, activation=None)(inputs)
    attr5_branch = layers.BatchNormalization()(attr5_branch)
    attr5_branch = layers.ReLU()(attr5_branch)
    
    attr5_branch = layers.Dense(128, activation=None)(attr5_branch)
    attr5_branch = layers.BatchNormalization()(attr5_branch)
    outputs['attr_5'] = layers.Dense(1, activation='sigmoid', name='output_attr_5')(attr5_branch) if num_classes_dict['attr_5'] == 2 else layers.Dense(num_classes_dict['attr_5'], activation='softmax', name='output_attr_5')(attr5_branch)

    # Build and compile the model
    model = models.Model(inputs=inputs, outputs=list(outputs.values()))
    metrics = [f1_metric for _ in range(len(num_classes_dict))]
    model.compile(
        optimizer=optimizers.Adam(learning_rate=0.0001),
        loss=['binary_crossentropy' if num_classes == 2 else 'sparse_categorical_crossentropy' for num_classes in num_classes_dict.values()],
        metrics= metrics
    )
    
    return model


# Training and prediction
for category, attributes in categories_attributes.items():
    df_category = df_train[df_train['Category'] == category]
    df_category['id'] = df_category['id'].astype(str)
    df_category['filename'] = df_category['id'].apply(lambda x: x.zfill(6) + '.jpg')

    # Prepare label encoders for each attribute
    df_attr = df_category[['filename'] + attributes].dropna()
    num_classes_dict = {}
    label_encoders[category] = {}

    for attr in attributes:
        le = LabelEncoder()
        df_attr[attr] = le.fit_transform(df_attr[attr])
        label_encoders[category][attr] = le
        num_classes_dict[attr] = len(le.classes_)

    # Set up K-Fold Cross-Validation
    kf = KFold(n_splits=3, shuffle=True, random_state=42)
    best_f1_across_folds = 0
    best_model_path = f'best_multi_output_model_{category}.h5'

    for fold, (train_idx, val_idx) in enumerate(kf.split(df_attr)):
        print(f"Training fold {fold+1} for {category}")

        train_fold = df_attr.iloc[train_idx]
        val_fold = df_attr.iloc[val_idx]

        # Extract features for train and validation sets
        train_features = extract_features_vgg(train_fold[['filename']], train_image_dir, vgg_model)
        val_features = extract_features_vgg(val_fold[['filename']], train_image_dir, vgg_model)

        # Prepare labels as a dictionary of targets
        train_labels = {f'output_{attr}': train_fold[attr].values for attr in attributes}
        val_labels = {f'output_{attr}': val_fold[attr].values for attr in attributes}

        # Create and train the model
        multi_output_model = create_custom_multi_output_model(train_features.shape[1:], num_classes_dict)
        multi_output_model.fit(train_features, train_labels, epochs=10, batch_size=16, validation_data=(val_features, val_labels))

        # Predict on validation data
        val_preds = multi_output_model.predict(val_features)
        fold_f1_scores = []
        for i, attr in enumerate(attributes):
            true_labels = val_labels[f'output_{attr}']
            pred_labels = np.round(val_preds[i]).astype(int) if num_classes_dict[attr] == 2 else val_preds[i].argmax(axis=1)
            fold_f1_scores.append(f1_score(true_labels, pred_labels, average='macro'))

        # Calculate the mean F1-score for this fold
        fold_f1 = np.mean(fold_f1_scores)
        print(f"Mean F1-score for fold {fold+1}: {fold_f1}")

        if fold_f1 > best_f1_across_folds:
            best_f1_across_folds = fold_f1
            multi_output_model.save(best_model_path)
            print(f"New best model for {category} with F1-score: {best_f1_across_folds}")

    print(f"Best F1-score across all folds for {category}: {best_f1_across_folds}")

# ---- Prediction on Test Set ----
all_predictions = []
for category, attributes in categories_attributes.items():
    df_test_category = df_test[df_test['Category'] == category]
    df_test_category['id'] = df_test_category['id'].astype(str)
    df_test_category['filename'] = df_test_category['id'].apply(lambda x: x.zfill(6) + '.jpg')

    # Load the best model for this category
    multi_output_model = models.load_model(best_model_path)

    # Extract features for the test set
    test_features = extract_features_vgg(df_test_category[['filename']], test_image_dir, vgg_model)

    # Predict for each attribute
    test_preds = multi_output_model.predict(test_features)
    for i, attr in enumerate(attributes):
        pred_labels = np.round(test_preds[i]).astype(int) if num_classes_dict[attr] == 2 else test_preds[i].argmax(axis=1)
        test_preds_decoded = label_encoders[category][attr].inverse_transform(pred_labels)
        df_test_category[f'predicted_{attr}'] = test_preds_decoded

    all_predictions.append(df_test_category[['id'] + [f'predicted_{attr}' for attr in attributes]])

# Concatenate all predictions and save to CSV
df_predictions = pd.concat(all_predictions)
df_predictions.to_csv('multi_output_predictions.csv', index=False)

print("Test predictions saved to 'multi_output_predictions.csv'")


In [None]:
from tensorflow.keras import models, layers, optimizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Add
from tensorflow.keras import backend as K
from transformers import ViTImageProcessor, TFAutoModel

df_train = pd.read_csv('/kaggle/input/visual-taxonomy/train.csv')
df_test = pd.read_csv('/kaggle/input/visual-taxonomy/test.csv')
train_image_dir = '/kaggle/input/visual-taxonomy/train_images'
test_image_dir = '/kaggle/input/visual-taxonomy/test_images'

# Define attribute configurations for each category
categories_attributes = {
    'Sarees': ['attr_1', 'attr_2', 'attr_3', 'attr_4', 'attr_5', 'attr_6', 'attr_7', 'attr_8', 'attr_9', 'attr_10'],
}

# Dictionary to store encoders for each category and attribute
label_encoders = {}

# Custom F1 Score Metric
def f1_metric(y_true, y_pred):
    if len(y_true.shape) == 2 and y_true.shape[1] == 1:
        y_pred = tf.round(y_pred)
    else:
        y_pred = tf.argmax(y_pred, axis=-1)
        y_true = tf.argmax(y_true, axis=-1)
    
    true_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred), tf.float32))
    false_positives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 0), tf.equal(y_pred, 1)), tf.float32))
    false_negatives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 1), tf.equal(y_pred, 0)), tf.float32))

    precision = true_positives / (true_positives + false_positives + K.epsilon())
    recall = true_positives / (true_positives + false_negatives + K.epsilon())
    f1 = 2 * (precision * recall) / (precision + recall + K.epsilon())
    return f1

# Load a smaller pre-trained Vision Transformer

# Extract features function using ViT
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input

# Initialize VGG model with pretrained weights, excluding the top layers.
vgg_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Feature extraction function using VGG
def extract_features_vgg(image_files, image_dir, feature_model, target_size=(224, 224)):
    images = []
    for filename in image_files['filename']:
        img = tf.keras.preprocessing.image.load_img(f"{image_dir}/{filename}", target_size=target_size)
        img = tf.keras.preprocessing.image.img_to_array(img)
        img = preprocess_input(img)  # Preprocess as VGG expects
        images.append(img)
    images = np.array(images)
    features = feature_model.predict(images)  # Extract features with VGG
    features = features.reshape(features.shape[0], -1)  # Flatten features
    return features


# Define the model for multi-output classification
from tensorflow.keras import layers, models, optimizers

# Define the model for multi-output classification with custom layers per attribute
def create_custom_multi_output_model(input_shape, num_classes_dict):
    inputs = layers.Input(shape=input_shape)
    
    outputs = {}
    
    # Custom Layers for attr_1
#     attr1_branch = layers.Dense(256, activation=None)(inputs)
#     attr1_branch = layers.Dropout(0.5)(attr1_branch)
    
    attr1_branch = layers.Dense(128, activation=None)(attr1_branch)
    attr1_branch = layers.BatchNormalization()(attr1_branch)
    attr1_branch = layers.ReLU()(attr1_branch)
    attr1_branch = layers.ReLU()(attr1_branch)
    attr1_branch = layers.Dropout(0.5)(attr1_branch)
    
    outputs['attr_1'] = layers.Dense(1, activation='sigmoid', name='output_attr_1')(attr1_branch) if num_classes_dict['attr_1'] == 2 else layers.Dense(num_classes_dict['attr_1'], activation='softmax', name='output_attr_1')(attr1_branch)

    # Custom Layers for attr_2
    attr2_branch = layers.Dense(256, activation=None)(inputs)
    attr2_branch = layers.BatchNormalization()(attr2_branch)
    attr2_branch = layers.ReLU()(attr2_branch)
    attr2_branch = layers.Dropout(0.5)(attr2_branch)
    outputs['attr_2'] = layers.Dense(1, activation='sigmoid', name='output_attr_2')(attr2_branch) if num_classes_dict['attr_2'] == 2 else layers.Dense(num_classes_dict['attr_2'], activation='softmax', name='output_attr_2')(attr2_branch)

    # Custom Layers for attr_3
    attr3_branch = layers.Dense(256, activation=None)(inputs)
    attr3_branch = layers.Dropout(0.5)(attr3_branch)
    
    attr3_branch = layers.Dense(128, activation=None)(attr3_branch)
    attr3_branch = layers.BatchNormalization()(attr3_branch)
    attr3_branch = layers.ReLU()(attr3_branch)
    attr3_branch = layers.Dropout(0.5)(attr3_branch)
    
    outputs['attr_3'] = layers.Dense(1, activation='sigmoid', name='output_attr_3')(attr3_branch) if num_classes_dict['attr_3'] == 2 else layers.Dense(num_classes_dict['attr_3'], activation='softmax', name='output_attr_3')(attr3_branch)

    # Custom Layers for attr_4
    attr4_branch = layers.Dense(256, activation=None)(inputs)
    attr4_branch = layers.BatchNormalization()(attr4_branch)
    attr4_branch = layers.ReLU()(attr4_branch)
    attr4_branch = layers.Dropout(0.3)(attr4_branch)
    outputs['attr_4'] = layers.Dense(1, activation='sigmoid', name='output_attr_4')(attr4_branch) if num_classes_dict['attr_4'] == 2 else layers.Dense(num_classes_dict['attr_4'], activation='softmax', name='output_attr_4')(attr4_branch)

    # Custom Layers for attr_5
    attr5_branch = layers.Dense(256, activation=None)(inputs)
    attr5_branch = layers.BatchNormalization()(attr5_branch)
    attr5_branch = layers.ReLU()(attr5_branch)
    attr5_branch = layers.Dense(128, activation=None)(attr5_branch)
    outputs['attr_5'] = layers.Dense(1, activation='sigmoid', name='output_attr_5')(attr5_branch) if num_classes_dict['attr_5'] == 2 else layers.Dense(num_classes_dict['attr_5'], activation='softmax', name='output_attr_5')(attr5_branch)

    # Custom Layers for attr_6
    attr6_branch = layers.Dense(256, activation=None)(inputs)
    attr6_branch = layers.BatchNormalization()(attr6_branch)
    attr6_branch = layers.ReLU()(attr6_branch)
    attr6_branch = layers.Dropout(0.5)(attr6_branch)
    outputs['attr_6'] = layers.Dense(1, activation='sigmoid', name='output_attr_6')(attr6_branch) if num_classes_dict['attr_6'] == 2 else layers.Dense(num_classes_dict['attr_6'], activation='softmax', name='output_attr_6')(attr6_branch)

    # Custom Layers for attr_7
    attr7_branch = layers.Dense(256, activation=None)(inputs)
    attr7_branch = layers.BatchNormalization()(attr7_branch)
    attr7_branch = layers.ReLU()(attr7_branch)
    attr7_branch = layers.Dropout(0.5)(attr7_branch)
    outputs['attr_7'] = layers.Dense(1, activation='sigmoid', name='output_attr_7')(attr7_branch) if num_classes_dict['attr_7'] == 2 else layers.Dense(num_classes_dict['attr_7'], activation='softmax', name='output_attr_7')(attr7_branch)

    # Custom Layers for attr_8
    attr8_branch = layers.Dense(256, activation=None)(inputs)
    attr8_branch = layers.BatchNormalization()(attr8_branch)
    outputs['attr_8'] = layers.Dense(1, activation='sigmoid', name='output_attr_8')(attr8_branch) if num_classes_dict['attr_8'] == 2 else layers.Dense(num_classes_dict['attr_8'], activation='softmax', name='output_attr_8')(attr8_branch)

   # custom layers for attr_9
    attr9_branch = layers.Dense(256, activation=None)(inputs)
    attr9_branch = layers.BatchNormalization()(attr9_branch)
    attr9_branch = layers.Dropout(0.5)(attr9_branch)
    outputs['attr_9'] = layers.Dense(1, activation='sigmoid', name='output_attr_9')(attr9_branch) if num_classes_dict['attr_9'] == 2 else layers.Dense(num_classes_dict['attr_9'], activation='softmax', name='output_attr_9')(attr9_branch)

   # custom layer for attr_10 
    attr10_branch = layers.Dense(256, activation=None)(inputs)
    attr10_branch = layers.BatchNormalization()(attr10_branch)
    attr10_branch = layers.Dropout(0.5)(attr10_branch)
    outputs['attr_10'] = layers.Dense(1, activation='sigmoid', name='output_attr_10')(attr10_branch) if num_classes_dict['attr_10'] == 2 else layers.Dense(num_classes_dict['attr_10'], activation='softmax', name='output_attr_10')(attr10_branch)

    # Build and compile the model
    model = models.Model(inputs=inputs, outputs=list(outputs.values()))
    metrics = [f1_metric for _ in range(len(num_classes_dict))]
    model.compile(
        optimizer=optimizers.Adam(learning_rate=0.0001),
        loss=['binary_crossentropy' if num_classes == 2 else 'sparse_categorical_crossentropy' for num_classes in num_classes_dict.values()],
        metrics= metrics
    )
    
    return model


# Training and prediction
for category, attributes in categories_attributes.items():
    df_category = df_train[df_train['Category'] == category]
    df_category['id'] = df_category['id'].astype(str)
    df_category['filename'] = df_category['id'].apply(lambda x: x.zfill(6) + '.jpg')

    # Prepare label encoders for each attribute
    df_attr = df_category[['filename'] + attributes].dropna()
    num_classes_dict = {}
    label_encoders[category] = {}

    for attr in attributes:
        le = LabelEncoder()
        df_attr[attr] = le.fit_transform(df_attr[attr])
        label_encoders[category][attr] = le
        num_classes_dict[attr] = len(le.classes_)

    # Set up K-Fold Cross-Validation
    kf = KFold(n_splits=3, shuffle=True, random_state=42)
    best_f1_across_folds = 0
    best_model_path = f'best_multi_output_model_{category}.h5'

    for fold, (train_idx, val_idx) in enumerate(kf.split(df_attr)):
        print(f"Training fold {fold+1} for {category}")

        train_fold = df_attr.iloc[train_idx]
        val_fold = df_attr.iloc[val_idx]

        # Extract features for train and validation sets
        train_features = extract_features_vgg(train_fold[['filename']], train_image_dir, vgg_model)
        val_features = extract_features_vgg(val_fold[['filename']], train_image_dir, vgg_model)

        # Prepare labels as a dictionary of targets
        train_labels = {f'output_{attr}': train_fold[attr].values for attr in attributes}
        val_labels = {f'output_{attr}': val_fold[attr].values for attr in attributes}

        # Create and train the model
        multi_output_model = create_custom_multi_output_model(train_features.shape[1:], num_classes_dict)
        multi_output_model.fit(train_features, train_labels, epochs=10, batch_size=16, validation_data=(val_features, val_labels))

        # Predict on validation data
        val_preds = multi_output_model.predict(val_features)
        fold_f1_scores = []
        for i, attr in enumerate(attributes):
            true_labels = val_labels[f'output_{attr}']
            pred_labels = np.round(val_preds[i]).astype(int) if num_classes_dict[attr] == 2 else val_preds[i].argmax(axis=1)
            fold_f1_scores.append(f1_score(true_labels, pred_labels, average='macro'))

        # Calculate the mean F1-score for this fold
        fold_f1 = np.mean(fold_f1_scores)
        print(f"Mean F1-score for fold {fold+1}: {fold_f1}")

        if fold_f1 > best_f1_across_folds:
            best_f1_across_folds = fold_f1
            multi_output_model.save(best_model_path)
            print(f"New best model for {category} with F1-score: {best_f1_across_folds}")

    print(f"Best F1-score across all folds for {category}: {best_f1_across_folds}")

# ---- Prediction on Test Set ----
all_predictions = []
for category, attributes in categories_attributes.items():
    df_test_category = df_test[df_test['Category'] == category]
    df_test_category['id'] = df_test_category['id'].astype(str)
    df_test_category['filename'] = df_test_category['id'].apply(lambda x: x.zfill(6) + '.jpg')

    # Load the best model for this category
    multi_output_model = models.load_model(best_model_path)

    # Extract features for the test set
    test_features = extract_features_vgg(df_test_category[['filename']], test_image_dir, vgg_model)

    # Predict for each attribute
    test_preds = multi_output_model.predict(test_features)
    for i, attr in enumerate(attributes):
        pred_labels = np.round(test_preds[i]).astype(int) if num_classes_dict[attr] == 2 else test_preds[i].argmax(axis=1)
        test_preds_decoded = label_encoders[category][attr].inverse_transform(pred_labels)
        df_test_category[f'predicted_{attr}'] = test_preds_decoded

    all_predictions.append(df_test_category[['id'] + [f'predicted_{attr}' for attr in attributes]])

# Concatenate all predictions and save to CSV
df_predictions = pd.concat(all_predictions)
df_predictions.to_csv('multi_output_predictions.csv', index=False)

print("Test predictions saved to 'multi_output_predictions.csv'")
