# title notebook


In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import os

In [2]:
! sysctl -n hw.ncpu

8


In [3]:

tf.config.threading.set_intra_op_parallelism_threads(2)
tf.config.threading.set_inter_op_parallelism_threads(2)


In [4]:
import tensorflow as tf

class OverclassAwareLoss(tf.keras.losses.Loss):
    """
    Combines standard class-level binary crossentropy with an overclass-level binary crossentropy loss.

    Args:
        overclass_dict (dict): Mapping from class index to overclass index.
        lambda_penalty (float): Trade-off between class-level and overclass-level loss.
    """

    def __init__(self, overclass_dict, lambda_penalty=0.0,from_logits=False, name="overclass_aware_loss"):
        super().__init__(name=name)
        self.overclass_dict = overclass_dict
        self._validate_overclass_dict()

        self.num_classes = len(overclass_dict)
        self.n_overclasses = max(overclass_dict.values()) + 1
        self.class_to_overclass_matrix = self._build_projection_matrix()
        self.lambda_penalty = lambda_penalty
        self.from_logits = from_logits
        
    def _validate_overclass_dict(self):
        if not all(isinstance(k, int) and isinstance(v, int) for k, v in self.overclass_dict.items()):
            raise ValueError("overclass_dict must map class indices (int) to overclass indices (int).")

    def _build_projection_matrix(self):
        """
        Builds a binary projection matrix that maps each class to its corresponding overclass.

        Returns:
            tf.Tensor: A 2D tensor of shape (num_classes, n_overclasses) where each row corresponds
                    to a class and each column to an overclass. The entry at (i, j) is 1.0 if class i
                    belongs to overclass j, and 0.0 otherwise.

        How it works:
            - Initializes a zero matrix of shape (num_classes, n_overclasses).
            - Uses self.overclass_dict, which maps class indices to overclass indices, to determine
            which positions in the matrix should be set to 1.0.
            - The positions are updated using tf.tensor_scatter_nd_update.

        Notes:
            You must pass an array because tf.tensor_scatter_nd_update() does not support automatic broadcasting of a scalar across multiple indices.
        
        Example:
            If self.overclass_dict = {0: 2, 1: 0, 2: 1} and there are 3 classes and 3 overclasses,
            the resulting matrix will be:

                [[0.0, 0.0, 1.0],
                [1.0, 0.0, 0.0],
                [0.0, 1.0, 0.0]]
        """
        indices = [[cls, overcls] for cls, overcls in self.overclass_dict.items()]
        updates = tf.ones(len(indices), dtype=tf.float32)
        shape = (self.num_classes, self.n_overclasses)
        return tf.tensor_scatter_nd_update(tf.zeros(shape, dtype=tf.float32), indices, updates)
    
    
    def _compute_overclass_loss(self, target, output):
        target_overclass = tf.matmul(target, self.class_to_overclass_matrix)
        output_overclass = tf.matmul(output, self.class_to_overclass_matrix)
        # numerically stabilize the output
        output_overclass = tf.clip_by_value(output_overclass, tf.keras.backend.epsilon(), 1. - tf.keras.backend.epsilon())
        if self.from_logits:
            output = tf.nn.sigmoid(output)
        
        loss = tf.keras.losses.binary_crossentropy(target_overclass, output_overclass)
        return tf.reduce_mean(loss)
    
    def call(self, target, output):
        target = tf.cast(target, tf.float32)
        output = tf.cast(output, tf.float32)
        
        if self.from_logits:
            output = tf.nn.sigmoid(output)

        class_loss = tf.keras.losses.binary_crossentropy(target, output)
        overclass_loss = self._compute_overclass_loss(target, output)

        return self.lambda_penalty * overclass_loss + (1 - self.lambda_penalty) * class_loss

In [5]:
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
import numpy as np
import tensorflow as tf
df = pd.read_csv('../data/rare_species 1/metadata.csv')
df_classes = df[["family", "phylum"]]
df_pddict = df_classes.drop_duplicates()

# assign a number to each class
df_pddict['class_index'] = range(len(df_pddict))
df_pddict['overclass_index'] = df_pddict['phylum'].astype('category').cat.codes
df_pddict['overclass_index'] = df_pddict['overclass_index'].astype(int)
# create a dictionary mapping from class index to overclass index
overclass_dict = df_pddict.set_index('class_index')['overclass_index'].to_dict()
overclass_dict

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_pddict['class_index'] = range(len(df_pddict))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_pddict['overclass_index'] = df_pddict['phylum'].astype('category').cat.codes
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_pddict['overclass_index'] = df_pddict['overclass_index'].astype(int)


{0: 4,
 1: 1,
 2: 1,
 3: 1,
 4: 1,
 5: 0,
 6: 1,
 7: 0,
 8: 1,
 9: 1,
 10: 1,
 11: 1,
 12: 1,
 13: 1,
 14: 1,
 15: 1,
 16: 1,
 17: 1,
 18: 1,
 19: 1,
 20: 1,
 21: 1,
 22: 1,
 23: 1,
 24: 1,
 25: 1,
 26: 0,
 27: 1,
 28: 1,
 29: 1,
 30: 1,
 31: 1,
 32: 1,
 33: 3,
 34: 1,
 35: 1,
 36: 1,
 37: 2,
 38: 1,
 39: 1,
 40: 1,
 41: 1,
 42: 1,
 43: 2,
 44: 1,
 45: 1,
 46: 1,
 47: 1,
 48: 0,
 49: 2,
 50: 1,
 51: 1,
 52: 1,
 53: 1,
 54: 1,
 55: 1,
 56: 1,
 57: 1,
 58: 1,
 59: 1,
 60: 1,
 61: 2,
 62: 1,
 63: 1,
 64: 1,
 65: 1,
 66: 1,
 67: 0,
 68: 1,
 69: 1,
 70: 1,
 71: 1,
 72: 1,
 73: 1,
 74: 1,
 75: 1,
 76: 1,
 77: 1,
 78: 1,
 79: 2,
 80: 1,
 81: 1,
 82: 1,
 83: 1,
 84: 1,
 85: 0,
 86: 1,
 87: 1,
 88: 0,
 89: 1,
 90: 0,
 91: 2,
 92: 1,
 93: 1,
 94: 1,
 95: 1,
 96: 4,
 97: 1,
 98: 1,
 99: 1,
 100: 1,
 101: 1,
 102: 1,
 103: 1,
 104: 1,
 105: 1,
 106: 2,
 107: 2,
 108: 1,
 109: 1,
 110: 1,
 111: 1,
 112: 1,
 113: 0,
 114: 1,
 115: 1,
 116: 1,
 117: 1,
 118: 2,
 119: 1,
 120: 2,
 121: 1,
 122: 1,
 12

In [6]:
# Global definitions
base_folder = os.getcwd()
print("Base folder:", base_folder)
parent_folder = os.path.dirname(base_folder)
print("Parent folder:", parent_folder)
data_folder = os.path.join(parent_folder, "data")
print("Data folder:", data_folder)
data_prep_folder = os.path.join(parent_folder, "data_preprocessing")
print("Data prep folder:", data_prep_folder)
# print the directory contents
print("Directory contents:")    
# run 'main.py' in the data_prep_folder
os.chdir(data_prep_folder)
#os.system('python3 main.py')
from config import DEST_DIR


Base folder: /Users/leonardodicaterina/Desktop/NovaIMS/DL/notebooks
Parent folder: /Users/leonardodicaterina/Desktop/NovaIMS/DL
Data folder: /Users/leonardodicaterina/Desktop/NovaIMS/DL/data
Data prep folder: /Users/leonardodicaterina/Desktop/NovaIMS/DL/data_preprocessing
Directory contents:


In [7]:
data_folder_rearranged = os.path.join(data_folder, "rearranged")
# print the directory contents
os.chdir(data_folder_rearranged)
print("Directory contents:")    
os.system('ls')

Directory contents:
[34mfold_0[m[m
[34mfold_1[m[m
[34mfold_2[m[m
[34mfold_3[m[m
[34mfold_4[m[m
[34mtest[m[m


0

In [8]:
import keras

test_folder = os.path.join(data_folder_rearranged, "fold_4")

folds = []
for i in range(5):
    folds.append(os.path.join(data_folder_rearranged, f"fold_{i}"))



test_ds = keras.utils.image_dataset_from_directory(
    directory=test_folder,
    labels='inferred',
    label_mode='categorical',
    batch_size=32,
    image_size=(256, 256))

train_folds = []
for i in range(5):
    fold_i = keras.utils.image_dataset_from_directory(
        directory=folds[i],
        labels='inferred',
        label_mode='categorical',
        batch_size=32,
        image_size=(256, 256))
    train_folds.append(fold_i)

Found 1917 files belonging to 202 classes.
Found 1918 files belonging to 202 classes.
Found 1917 files belonging to 202 classes.
Found 1917 files belonging to 202 classes.
Found 1917 files belonging to 202 classes.
Found 1917 files belonging to 202 classes.


# Import vgg16

In [9]:
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Flatten, Dense, Dropout, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam

NUM_CLASSES = len(train_folds[0].class_names)

In [None]:
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
# print the model summary
base_model.summary()

In [11]:


# Load VGG16 without the top classification layers

# Freeze the convolutional base
for layer in base_model.layers:
    layer.trainable = False  # You can change this to fine-tune later layers

# 3. Add custom head
x = base_model.output
x = GlobalAveragePooling2D()(x)       # or Flatten()
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(128, activation='relu')(x)
output = Dense(NUM_CLASSES, activation='softmax')(x)  # use 'sigmoid' for binary/multi-label
# delete the variable x
del x

# Create the final model
model = Model(inputs=base_model.input, outputs=output)

# Compile the model
model.compile(optimizer=Adam(learning_rate=1e-4), loss= 'categorical_crossentropy', metrics=['accuracy'])

for layer in base_model.layers[:3]:
    layer.trainable = False
    
for layer in base_model.layers[3:]:
    layer.trainable = True


model.summary()
model_fold = [model] * 5


In [None]:
histories = []
for i in range(len(train_folds)):
    print(f"\nFold {i+1}")

    val_ds = train_folds[i]

    # Concatenate all folds except the i-th
    train_ds = None
    for j, ds in enumerate(train_folds):
        if j != i:
            print(type(train_ds))
            train_ds = ds if train_ds is None else train_ds.concatenate(ds)
            
    
    model_to_train = model_fold[i]
    # Train the model
    history = model_to_train.fit(
        train_ds,
        validation_data=val_ds,
        epochs=10,
        batch_size=2,
        callbacks=[
            tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)
        ]
    )
    histories.append(history)
    # Save the model
    model_to_train.save(f"model_to_fold_{i}.h5")
    


Fold 1
<class 'NoneType'>
<class 'tensorflow.python.data.ops.prefetch_op._PrefetchDataset'>
<class 'tensorflow.python.data.ops.concatenate_op._ConcatenateDataset'>
<class 'tensorflow.python.data.ops.concatenate_op._ConcatenateDataset'>
Epoch 1/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6171s[0m 26s/step - accuracy: 0.0134 - loss: 5.4103 - val_accuracy: 0.0235 - val_loss: 5.1668
Epoch 2/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5764s[0m 24s/step - accuracy: 0.0267 - loss: 5.1478 - val_accuracy: 0.0401 - val_loss: 5.0947
Epoch 3/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5767s[0m 24s/step - accuracy: 0.0269 - loss: 5.0892 - val_accuracy: 0.0297 - val_loss: 5.0406
Epoch 4/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5687s[0m 24s/step - accuracy: 0.0336 - loss: 5.0101 - val_accuracy: 0.0396 - val_loss: 4.9382
Epoch 5/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5912s[0m 25s/step - accuracy: 0.




Fold 2
<class 'NoneType'>
<class 'tensorflow.python.data.ops.prefetch_op._PrefetchDataset'>
<class 'tensorflow.python.data.ops.concatenate_op._ConcatenateDataset'>
<class 'tensorflow.python.data.ops.concatenate_op._ConcatenateDataset'>
Epoch 1/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5682s[0m 24s/step - accuracy: 0.1129 - loss: 4.5316 - val_accuracy: 0.1273 - val_loss: 4.3719
Epoch 2/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5639s[0m 24s/step - accuracy: 0.1250 - loss: 4.4365 - val_accuracy: 0.1309 - val_loss: 4.2676
Epoch 3/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5645s[0m 24s/step - accuracy: 0.1302 - loss: 4.3343 - val_accuracy: 0.1508 - val_loss: 4.2402
Epoch 4/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5644s[0m 24s/step - accuracy: 0.1449 - loss: 4.2042 - val_accuracy: 0.1455 - val_loss: 4.1090
Epoch 5/10
[1m240/240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5656s[0m 24s/step - accuracy: 0.

# evaluate the model

In [None]:
# load the model
model = tf.keras.models.load_model("model_to_fold_0.h5")

# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(test_ds)
print(f"Test accuracy: {test_accuracy:.4f}")

# plot the confusion matrix
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import numpy as np


# Get the true labels and predicted labels
y_true = np.concatenate([y for x, y in test_ds], axis=0)
y_pred = model.predict(test_ds)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_true, axis=1)
# Compute the confusion matrix
cm = confusion_matrix(y_true_classes, y_pred_classes)
# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=test_ds.class_names, yticklabels=test_ds.class_names)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()




In [None]:
# go back on the parent folder
os.chdir(parent_folder)
# print the directory contents
from keras_custom.Metrics import MulticlassDistanceQuantile

