## **Introduction**
In this notebook, we will build and train a neural network for classifying blood cells using ResNet, a deep residual network architecture.

This project demonstrates the application of deep learning to medical imaging and classification tasks.


## 🚀 Setting Up the Environment: Installing Packages and Connecting to Google Drive


In [None]:
!pip install tensorflow==2.17.0 keras==3.4.1 tensorflow-decision-forests==1.10.0 tensorflow-text==2.17.0 tf-keras==2.17.0 keras_cv

Collecting tensorflow==2.17.0
  Downloading tensorflow-2.17.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Collecting keras==3.4.1
  Downloading keras-3.4.1-py3-none-any.whl.metadata (5.8 kB)
Collecting tensorflow-decision-forests==1.10.0
  Downloading tensorflow_decision_forests-1.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.0 kB)
Collecting tensorflow-text==2.17.0
  Downloading tensorflow_text-2.17.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.8 kB)
Collecting keras_cv
  Downloading keras_cv-0.9.0-py3-none-any.whl.metadata (12 kB)
Collecting wurlitzer (from tensorflow-decision-forests==1.10.0)
  Downloading wurlitzer-3.1.1-py3-none-any.whl.metadata (2.5 kB)
Collecting ydf (from tensorflow-decision-forests==1.10.0)
  Downloading ydf-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.2 kB)
Collecting keras-core (from keras_cv)
  Downloading keras_core-0.1.7-py3-none-any.w

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
import keras_cv

import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

"""np.random.seed(42)
tf.random.set_seed(42);"""

np.random.seed(11)
tf.random.set_seed(11);

In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/My Drive/[2024-2025] AN2DL/Homework 1/Base_3

Mounted at /gdrive
/gdrive/My Drive/[2024-2025] AN2DL/Homework 1/Base_3


## ⏳ Load the Data

In [None]:
# Load the dataset
#data = np.load('training_set.npz')
#data = np.load('dataset_augmented_10k.npz')
data = np.load('dataset_augmented_default.npz')
x_train = data['images']
y_train = data['labels']

data_original = np.load('training_set.npz')
x_test = data_original['images']
y_test = data_original['labels']

In [None]:
import hashlib

# Get the hash of a certain image
def image_hash(image):
  image_bytes = image.tobytes()
  return hashlib.sha256(image_bytes).hexdigest()

print(f"Initial length is {len(x_train)} and {len(y_train)}")

# Remove all duplicates
def remove_duplicates(x_train, y_train):
  unique_images = []
  unique_labels = []
  duplicate_positions = set()

  seen_hashes = {}

  for i in range(len(x_train)):
      img_hash = image_hash(x_train[i])
      if img_hash not in seen_hashes:
        if i not in duplicate_positions:
          unique_images.append(x_train[i])
          unique_labels.append(y_train[i])
        seen_hashes[img_hash] = i
      else:
        duplicate_positions.add(seen_hashes[img_hash])
        duplicate_positions.add(i)

  x_train = [x_train[i] for i in range(len(x_train)) if i not in duplicate_positions]
  y_train = [y_train[i] for i in range(len(y_train)) if i not in duplicate_positions]

  x_train = np.array(x_train)
  y_train = np.array(y_train)

  return x_train, y_train

#x_train, y_train = remove_duplicates(x_train, y_train)

x_test, y_test = remove_duplicates(x_test, y_test)

print(f"Len: {len(x_train)}\n Shape: {x_train.shape}\n")
print(f"Len: {len(y_train)}\n Shape: {y_train.shape}\n")
print(f"Len: {len(x_test)}\n Shape: {x_test.shape}\n")
print(f"Len: {len(y_test)}\n Shape: {y_test.shape}\n")

Initial length is 20776 and 20776
Len: 20776
 Shape: (20776, 96, 96, 3)

Len: 20776
 Shape: (20776, 8)

Len: 11943
 Shape: (11943, 96, 96, 3)

Len: 11943
 Shape: (11943, 1)



In [None]:
from keras import layers

"""# Define a basic set of transformations 1
data_augmentation = keras_cv.layers.Augmenter(
  layers = [
    layers.RandomFlip("horizontal_and_vertical"),
    layers.RandomRotation(0.2),
    layers.RandomZoom(0.1),
    layers.RandomTranslation(0.1, 0.1),
    layers.RandomContrast(0.2),
  ]
)

# Extract unique classes and their counts
unique, counts = np.unique(y_train, return_counts=True)
class_counts = dict(zip(unique, counts))
print("Original distribution:", class_counts)

# Maximum number of samples for a class
max_samples = max(class_counts.values())

def make_balanced_with_augmented(x_train, y_train):

  # New balanced dataset
  x_balanced = []
  y_balanced = []

  # Oversampling for each class
  for cls in class_counts:
    # Filter samples for the current class
    x_class = x_train[np.where(y_train == cls)[0]]
    y_class = y_train[y_train == cls]

    # Calculate the number of samples to add
    samples_to_add = max_samples - len(x_class)

    if samples_to_add > 0:
      # Augment the samples to add
      augmented_images = []
      for _ in range(samples_to_add):
        augmented_image = data_augmentation(x_class[np.random.randint(len(x_class))][np.newaxis, ...])
        augmented_images.append(augmented_image.numpy()[0])

      # Add the original and augmented samples to the balanced dataset
      x_balanced.append(np.concatenate([x_class, np.array(augmented_images)]))
      y_balanced.append(np.concatenate([y_class, np.full(samples_to_add, cls)]))
    else:
      # Add the original samples to the balanced dataset (if no oversampling is needed)
      x_balanced.append(x_class)
      y_balanced.append(y_class)

  # Concatenate the balanced dataset
  x_train = np.concatenate(x_balanced)
  y_train = np.concatenate(y_balanced)

  return x_train, y_train

x_train, y_train = make_balanced_with_augmented(x_train, y_train)

# Check the new distribution
unique_balanced, counts_balanced = np.unique(y_train, return_counts=True)
class_counts_balanced = dict(zip(unique_balanced, counts_balanced))
print("Balanced distribution of classes:", class_counts_balanced)
print(f"New total number of sample is {len(x_train)}")"""

'# Define a basic set of transformations 1\ndata_augmentation = keras_cv.layers.Augmenter(\n  layers = [\n    layers.RandomFlip("horizontal_and_vertical"),\n    layers.RandomRotation(0.2),\n    layers.RandomZoom(0.1),\n    layers.RandomTranslation(0.1, 0.1),\n    layers.RandomContrast(0.2),\n  ]\n)\n\n# Extract unique classes and their counts\nunique, counts = np.unique(y_train, return_counts=True)\nclass_counts = dict(zip(unique, counts))\nprint("Original distribution:", class_counts)\n\n# Maximum number of samples for a class\nmax_samples = max(class_counts.values())\n\ndef make_balanced_with_augmented(x_train, y_train):\n\n  # New balanced dataset\n  x_balanced = []\n  y_balanced = []\n\n  # Oversampling for each class\n  for cls in class_counts:\n    # Filter samples for the current class\n    x_class = x_train[np.where(y_train == cls)[0]]\n    y_class = y_train[y_train == cls]\n\n    # Calculate the number of samples to add\n    samples_to_add = max_samples - len(x_class)\n\n    i

In [None]:
# Converti y in one-hot encoding
#y_train = tf.keras.utils.to_categorical(y_train, num_classes=8)
#y_train = tf.keras.utils.to_categorical(y_train, num_classes=8)
#y_test2 = tf.keras.utils.to_categorical(y_test2, num_classes=8)

y_test = tf.keras.utils.to_categorical(y_test, num_classes=8)

print(f"Len: {len(x_train)}\n Shape: {x_train.shape}\n")
print(f"Len: {len(y_train)}\n Shape: {y_train.shape}\n")
print(f"Len: {len(x_test)}\n Shape: {x_test.shape}\n")
print(f"Len: {len(y_test)}\n Shape: {y_test.shape}\n")

Len: 20776
 Shape: (20776, 96, 96, 3)

Len: 20776
 Shape: (20776, 8)

Len: 11943
 Shape: (11943, 96, 96, 3)

Len: 11943
 Shape: (11943, 8)



In [None]:
# split train in training and test set
#x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.1, random_state=42)
#x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.1, random_state=99)
### x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.1, random_state=11)

# Preprocessing

In [None]:
# Function to get a set of layers for augmentation 2
"""def augment(images, labels):
  augmenter = keras_cv.layers.Augmenter(
    layers = [
      keras_cv.layers.RandomFlip(
          mode="horizontal_and_vertical"
      ),
      keras_cv.layers.RandomRotation(
          factor=0.2,
          fill_mode='nearest'
      ),
      keras_cv.layers.MixUp(
          alpha=0.5,
      ),
      keras_cv.layers.CutMix(
          alpha=0.5
      ),
    ]
  )

  inputs = {"images": images, "labels": labels}
  output = augmenter(inputs)
  return output["images"], output["labels"]"""

'def augment(images, labels):\n  augmenter = keras_cv.layers.Augmenter(\n    layers = [\n      keras_cv.layers.RandomFlip(\n          mode="horizontal_and_vertical"\n      ),\n      keras_cv.layers.RandomRotation(\n          factor=0.2,\n          fill_mode=\'nearest\'\n      ),\n      keras_cv.layers.MixUp(\n          alpha=0.5,\n      ),\n      keras_cv.layers.CutMix(\n          alpha=0.5\n      ),\n    ]\n  )\n\n  inputs = {"images": images, "labels": labels}\n  output = augmenter(inputs)\n  return output["images"], output["labels"]'

In [None]:
# This function returns random (image, label) pairs
"""def sample_random_images(dataset, sample_size=5):
    random_samples = dataset.shuffle(buffer_size=1000).take(sample_size)
    images = []
    labels = []
    for image, label in random_samples:
        images.append(image.numpy())
        labels.append(label.numpy())
    return tf.convert_to_tensor(images), tf.convert_to_tensor(labels)

# Convert the previously constructed data into tf.data.Dataset
x_train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))"""

'def sample_random_images(dataset, sample_size=5):\n    random_samples = dataset.shuffle(buffer_size=1000).take(sample_size)\n    images = []\n    labels = []\n    for image, label in random_samples:\n        images.append(image.numpy())\n        labels.append(label.numpy())\n    return tf.convert_to_tensor(images), tf.convert_to_tensor(labels)\n\n# Convert the previously constructed data into tf.data.Dataset\nx_train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))'

In [None]:
"""# We take 4000 random images and augment them
sample_size = 4000
random_x, random_y = sample_random_images(x_train_dataset, sample_size)
print(f"{len(random_x)} {len(random_y)}")
aug_images, aug_labels = augment(random_x, random_y)

# We concatenate these 4000 new augmented pictures to the originale dataset
x_train = tf.concat([x_train, aug_images], axis=0)
y_train = tf.concat([y_train, aug_labels], axis=0)

print(x_train.shape)
print(y_train.shape)

del random_x
del random_y"""

'# We take 4000 random images and augment them\nsample_size = 4000\nrandom_x, random_y = sample_random_images(x_train_dataset, sample_size)\nprint(f"{len(random_x)} {len(random_y)}")\naug_images, aug_labels = augment(random_x, random_y)\n\n# We concatenate these 4000 new augmented pictures to the originale dataset\nx_train = tf.concat([x_train, aug_images], axis=0)\ny_train = tf.concat([y_train, aug_labels], axis=0)\n\nprint(x_train.shape)\nprint(y_train.shape)\n\ndel random_x\ndel random_y'

In [None]:
"""def apply_augmentation(image, is_randaugment):
    if is_randaugment:
        return apply_randaugment(image)
    else:
        return apply_augmix(image)

def apply_augmix(image):
    return augmix(image)

def apply_randaugment(image):
    return randaugmenter(image)

randaugmenter = tf.keras.Sequential(
    [
        keras_cv.layers.RandAugment([0, 255], 3, 0.5), # it was 0.4 instead of 0.5
    ]
)

augmix = tf.keras.Sequential(
    [
        keras_cv.layers.AugMix([0, 255], 0.4), # no number was set for severity, so it was 0.3
    ]
)

batch_size = 256
half_size = len(x_train) // 2

# We apply the RandAugment augmentation to the first half of the dataset
dataset_randaugment = tf.data.Dataset.from_tensor_slices(x_train[:half_size])
dataset_randaugment = dataset_randaugment.map(lambda x: apply_augmentation(x, is_randaugment=True), num_parallel_calls=tf.data.AUTOTUNE)

# We apply the AugMix augmentation to the second half of the dataset
dataset_augmix = tf.data.Dataset.from_tensor_slices(x_train[half_size:])
dataset_augmix = dataset_augmix.map(lambda x: apply_augmentation(x, is_randaugment=False), num_parallel_calls=tf.data.AUTOTUNE)

dataset = dataset_randaugment.concatenate(dataset_augmix)
dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

x_train = tf.concat([batch for batch in dataset], axis=0)

del x_train_dataset"""

'def apply_augmentation(image, is_randaugment):\n    if is_randaugment:\n        return apply_randaugment(image)\n    else:\n        return apply_augmix(image)\n\ndef apply_augmix(image):\n    return augmix(image)\n\ndef apply_randaugment(image):\n    return randaugmenter(image)\n\nrandaugmenter = tf.keras.Sequential(\n    [\n        keras_cv.layers.RandAugment([0, 255], 3, 0.5), # it was 0.4 instead of 0.5\n    ]\n)\n\naugmix = tf.keras.Sequential(\n    [\n        keras_cv.layers.AugMix([0, 255], 0.4), # no number was set for severity, so it was 0.3\n    ]\n)\n\nbatch_size = 256\nhalf_size = len(x_train) // 2\n\n# We apply the RandAugment augmentation to the first half of the dataset\ndataset_randaugment = tf.data.Dataset.from_tensor_slices(x_train[:half_size])\ndataset_randaugment = dataset_randaugment.map(lambda x: apply_augmentation(x, is_randaugment=True), num_parallel_calls=tf.data.AUTOTUNE)\n\n# We apply the AugMix augmentation to the second half of the dataset\ndataset_augmix =

In [None]:
"""# We define a new set of augmentations 3
augmenter = keras_cv.layers.Augmenter(
  layers = [
    keras_cv.layers.RandAugment([0, 255], 3, 0.5),
    keras_cv.layers.AugMix([0, 255], 0.4),

    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),
    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),
    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),
    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),
    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),
  ]
)

# And again we apply this new augmenter to other 7000 images
sample_size = 7000
random_x, random_y = sample_random_images(dataset, sample_size)
#del dataset
random_y = tf.cast(random_y, dtype=tf.float32)
aug_images = augmenter(random_x)

x_train = tf.concat([x_train, aug_images], axis=0)
y_train = tf.concat([y_train, random_y], axis=0)

x_train = x_train.numpy()
y_train = y_train.numpy()

######### print(x_train.shape)
######### print(aug_images.shape)
######### print(random_y.shape)

x_train=tf.cast(x_train, dtype=tf.float32)

np.savez_compressed('dataset_augmented_default.npz', images=x_train, labels=y_train)

print(x_train.shape)"""

'# We define a new set of augmentations 3\naugmenter = keras_cv.layers.Augmenter(\n  layers = [\n    keras_cv.layers.RandAugment([0, 255], 3, 0.5),\n    keras_cv.layers.AugMix([0, 255], 0.4),\n\n    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),\n    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),\n    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),\n    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),\n    keras_cv.layers.RandomCutout(0.075, 0.075, fill_mode="gaussian_noise"),\n  ]\n)\n\n# And again we apply this new augmenter to other 7000 images\nsample_size = 7000\nrandom_x, random_y = sample_random_images(dataset, sample_size)\n#del dataset\nrandom_y = tf.cast(random_y, dtype=tf.float32)\naug_images = augmenter(random_x)\n\nx_train = tf.concat([x_train, aug_images], axis=0)\ny_train = tf.concat([y_train, random_y], axis=0)\n\nx_train = x_train.numpy()\ny_train = y_train.numpy()\n\n#####

In [None]:
import matplotlib.pyplot as plt

"""# We show an image with the latest augmentation (3)
plt.imshow(x_train[-4] / 255.0)
print('Class', aug_labels[1])"""

"# We show an image with the latest augmentation (3)\nplt.imshow(x_train[-4] / 255.0)\nprint('Class', aug_labels[1])"

In [None]:
import numpy as np
import tensorflow as tf
import keras_cv
from tensorflow import keras as tfk
from tensorflow.keras import layers, models, applications
from tensorflow.keras.callbacks import LambdaCallback
import random
import keras
from keras.saving import register_keras_serializable
from sklearn.utils.class_weight import compute_class_weight

def spatial_attention(input_tensor):
    """
    Spatial Attention Block.

    Args:
        input_tensor (tf.Tensor): Input feature map, shape (batch, height, width, channels).

    Returns:
        tf.Tensor: Output tensor after applying spatial attention.
    """
    # Media e massimo pooling lungo l'asse dei canali come livelli Keras
    avg_pool = tfk.layers.Lambda(lambda x: tf.reduce_mean(x, axis=-1, keepdims=True))(input_tensor)
    max_pool = tfk.layers.Lambda(lambda x: tf.reduce_max(x, axis=-1, keepdims=True))(input_tensor)

    # Concatenazione delle due mappe di attenzione
    concat = tfk.layers.Concatenate(axis=-1)([avg_pool, max_pool])  # Shape: (batch, height, width, 2)

    # Convoluzione per generare la mappa di attenzione spaziale
    attention = tfk.layers.Conv2D(1, kernel_size=7, padding='same', activation='sigmoid')(concat)  # Shape: (batch, height, width, 1)

    # Scala l'input con la mappa di attenzione
    output = tfk.layers.Multiply()([input_tensor, attention])  # Shape: (batch, height, width, channels)

    return output

In [None]:
# These were initially inside the get augmetnation layer

""""""

class MyModel:
    def __init__(self):
        """
        Inizializza lo stato interno del modello pretrained.
        """
        self.strategy = tf.distribute.MirroredStrategy()
        self.neural_network = self.create_model()

    def get_augmentation_layer(self):
        return tf.keras.Sequential([

            # Random rotation
            keras.layers.RandomRotation(0.5, fill_mode='reflect'),

            # Random zoom in height
            keras.layers.RandomZoom(height_factor=(-0.2, 0.7), fill_mode='nearest'),

            # Other types of augmentations
            keras.layers.RandomZoom(height_factor=(0.0, 0.0), width_factor=(-0.3, 0.3), fill_mode='nearest'),
            keras.layers.RandomFlip(mode="horizontal"),
            keras.layers.RandomFlip(mode="vertical"),
            keras.layers.RandomTranslation(height_factor=(-0.2, 0.2), width_factor=(-0.2, 0.2)),

            keras.layers.RandomBrightness(0.3),
            #keras_cv.layers.RandomHue(0.3, [0,255]),
            keras_cv.layers.RandomContrast([0, 255], 0.3),
            keras_cv.layers.RandomGaussianBlur(2, 2),
            #keras_cv.layers.RandomCutout(0.3, 0.3,"gaussian_noise"),

            # Adding Gaussian noise
            keras.layers.GaussianNoise(0.07)

        ])


    def create_model(self):
        """
        Crea e restituisce un modello
        """
        # Definisci i layer di data augmentation
        data_augmentation = self.get_augmentation_layer()

        with self.strategy.scope():

            # Utilizza una rete pre-addestrata
            #model_pretrained = tfk.applications.ConvNeXtBase(
            model_pretrained = tfk.applications.ConvNeXtXLarge(
                input_shape=(96, 96, 3),
                include_top=False,
                weights='imagenet',
                pooling=None
            )
            #self.model_name_pretrained = 'convnext_base'
            self.model_name_pretrained = 'convnext_xlarge'

            print("number of layers:")
            print(len(model_pretrained.layers))

            # Costruisci il modello
            inputs = tfk.Input(shape=(96, 96, 3), name='input_layer')
            x = data_augmentation(inputs)
            x = model_pretrained(x)

            # Batch Normalization dopo il modello pre-addestrato
            x = tfk.layers.BatchNormalization()(x)

            # Squeeze-and-Excitation (SE Block)
            se = tfk.layers.GlobalAveragePooling2D()(x)  # Riduce a (batch_size, channels)
            se = tfk.layers.Dense(se.shape[-1] // 16, activation='relu')(se)  # Compress
            se = tfk.layers.Dense(se.shape[-1] * 16, activation='sigmoid')(se)  # Expand
            x = tfk.layers.Multiply()([x, tfk.layers.Reshape((1, 1, -1))(se)])  # Scala i canali

            # Global Pooling (per ridurre la dimensionalità)
            x = tfk.layers.GlobalAveragePooling2D()(x)

            # Livelli Fully Connected con Batch Normalization e Leaky ReLU
            x = tfk.layers.Dense(512)(x)
            x = tfk.layers.BatchNormalization()(x)
            x = tfk.layers.LeakyReLU(alpha=0.1)(x)
            x = tfk.layers.Dropout(0.3)(x)

            x = tfk.layers.Dense(256)(x)
            x = tfk.layers.BatchNormalization()(x)
            x = tfk.layers.LeakyReLU(alpha=0.1)(x)
            x = tfk.layers.Dropout(0.3)(x)

            outputs = tfk.layers.Dense(8, activation='softmax', name='output_layer')(x)

            model = tfk.Model(inputs=inputs, outputs=outputs, name='model')

            return model

    def train_transfer_learning(self, X_train, y_train, X_test, y_test, epochs=10, batch_size=32):
        """
        Pre-addestra il modello con i layer congelati.
        """

        with self.strategy.scope():  # Ensure training happens inside strategy scope

            self.neural_network.get_layer(self.model_name_pretrained).trainable = False

            # Sblocca i layer convoluzionali dal layer `fine_tune_from` in poi
            for i, layer in enumerate(self.neural_network.get_layer(self.model_name_pretrained).layers):
                layer.trainable = False

            # Ricompila il modello (necessario dopo aver modificato i layer trainabili)
            self.neural_network.compile(
                loss=tfk.losses.CategoricalCrossentropy(),
                optimizer=tfk.optimizers.Lion(learning_rate=1e-3),
                metrics=['accuracy']
            )


            # Callback
            save_every_5 = LambdaCallback(
                on_epoch_end=lambda epoch, logs:
                self.neural_network.save(f'model_epoch_{epoch + 1}.keras') if (epoch + 1) % 5 == 0 else None
            )
            early_stopping = tfk.callbacks.EarlyStopping(
                monitor='val_accuracy',
                mode='max',
                patience=8,
                restore_best_weights=True
            )

            # Riaddestra il modello
            history = self.neural_network.fit(
                X_train,
                y_train,
                batch_size=batch_size,
                epochs=epochs,
                shuffle=True,
                validation_data=(X_test, y_test),
                callbacks=[save_every_5, early_stopping]
            )

    def train_fine_tuning(self, X_train, y_train, X_test, y_test, epochs=10, batch_size=32, fine_tune_from=50):
        """
        Sblocca i layer selezionati e riaddestra il modello.
        """

        with self.strategy.scope():  # Ensure training happens inside strategy scope

            self.neural_network.get_layer(self.model_name_pretrained).trainable = True

            # Sblocca i layer convoluzionali dal layer `fine_tune_from` in poi
            for i, layer in enumerate(self.neural_network.get_layer(self.model_name_pretrained).layers):
                layer.trainable = False
                if i > fine_tune_from:
                  if isinstance(layer, tf.keras.layers.Conv2D) or isinstance(layer, tf.keras.layers.DepthwiseConv2D):
                    print(True)
                    layer.trainable = True


            # Ricompila il modello con un learning rate più basso
            self.neural_network.compile(
                loss=tfk.losses.CategoricalCrossentropy(),
                #optimizer=tfk.optimizers.Lion(learning_rate=1e-4),
                optimizer=tfk.optimizers.Lion(learning_rate=1e-3),
                metrics=['accuracy']
            )

            # Callback
            save_every_5 = LambdaCallback(
                on_epoch_end=lambda epoch, logs:
                self.neural_network.save(f'model_epoch_{epoch + 1}.keras') if (epoch + 1) % 5 == 0 else None
            )
            early_stopping = tfk.callbacks.EarlyStopping(
                monitor='val_accuracy',
                mode='max',
                patience=8,
                restore_best_weights=True
            )


            # Riaddestra il modello
            history = self.neural_network.fit(
                X_train,
                y_train,
                batch_size=batch_size,
                epochs=epochs,
                shuffle=True,
                validation_data=(X_test, y_test),
                callbacks=[save_every_5, early_stopping]
            )

    def test(self, X_test, y_test):
        """
        Valuta il modello sui dati di test X_test e le etichette y_test.
        """
        test_loss, test_acc = self.neural_network.evaluate(X_test, y_test)
        print(f'Test accuracy: {test_acc}')

    def load(self, path):

        # Re-compile the model inside the strategy scope
        with self.strategy.scope():
            self.neural_network = tfk.models.load_model(path)
            self.neural_network.compile(
                loss=tfk.losses.CategoricalCrossentropy(),
                optimizer=tfk.optimizers.Lion(),  # Create optimizer inside the scope
                metrics=['accuracy']
            )

    def save(self):
        """
        Salva il modello senza i layer di data augmentation.
        """
        self.neural_network.save('/gdrive/MyDrive/[2024-2025] AN2DL/Homework 1/Base_3/weights.keras')

    def predict(self, X):
        """
        Predice le etichette corrispondenti all'input X.
        """
        preds = self.neural_network.predict(X)
        preds = np.argmax(preds, axis=1)
        return preds

## 🛠️ Train and Save the Model

In [None]:
model = MyModel()

# Load the saved model
model_path = "model_epoch_15.keras"
with model.strategy.scope():
    loaded_network = tf.keras.models.load_model(model_path)
model.neural_network = loaded_network

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/convnext/convnext_xlarge_notop.h5
[1m1393257616/1393257616[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 0us/step
number of layers:
259




In [None]:
import matplotlib.pyplot as plt
import keras_cv

"""data_augmentation = model.get_augmentation_layer()

# We apply the first set of augmentations to a random picture
rotated_image = data_augmentation(x_train[0])
print(y_train[6001])

# Show the original image as well as the rotated one
plt.figure(figsize=(10, 5))

# Show the rotated image
plt.subplot(1, 2, 1)
plt.imshow(x_train[0] / 255.0)
plt.title("Immagine Ruotata")
plt.axis('off')

'''
# Mostra l'immagine ruotata
plt.subplot(1, 2, 2)
plt.imshow(rotated_image / 255.0)
plt.title("Immagine Ruotata")
plt.axis('off')'''

plt.show()"""

'data_augmentation = model.get_augmentation_layer()\n\n# We apply the first set of augmentations to a random picture\nrotated_image = data_augmentation(x_train[0])\nprint(y_train[6001])\n\n# Show the original image as well as the rotated one\nplt.figure(figsize=(10, 5))\n\n# Show the rotated image\nplt.subplot(1, 2, 1)\nplt.imshow(x_train[0] / 255.0)\nplt.title("Immagine Ruotata")\nplt.axis(\'off\')\n\n\'\'\'\n# Mostra l\'immagine ruotata\nplt.subplot(1, 2, 2)\nplt.imshow(rotated_image / 255.0)\nplt.title("Immagine Ruotata")\nplt.axis(\'off\')\'\'\'\n\nplt.show()'

In [None]:
#model.load('/kaggle/working/model_epoch_10.keras')

In [None]:
#x_test_augmented = model.get_augmentation_layer()(x_test)

In [None]:
print(x_train.dtype)

float32


In [None]:
#model.train_transfer_learning(x_train, y_train, x_test, y_test, 50, 512)
#model.train_transfer_learning(x_train, y_train, x_test, y_test, 1, 512)

In [None]:
#model.neural_network.save('weights_xl_1.keras')

In [None]:
print(f"Len: {len(x_train)}\n Shape: {x_train.shape}\n")
print(f"Len: {len(y_train)}\n Shape: {y_train.shape}\n")
print(f"Len: {len(x_test)}\n Shape: {x_test.shape}\n")
print(f"Len: {len(y_test)}\n Shape: {y_test.shape}\n")

Len: 20776
 Shape: (20776, 96, 96, 3)

Len: 20776
 Shape: (20776, 8)

Len: 11943
 Shape: (11943, 96, 96, 3)

Len: 11943
 Shape: (11943, 8)



In [None]:
#model.train_fine_tuning(x_train, y_train, x_test, y_test, 50, 512, 150)
model.train_fine_tuning(x_train, y_train, x_test, y_test, 50, 128, 150)

True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
Epoch 1/50
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2281s[0m 14s/step - accuracy: 0.8302 - loss: 0.5675 - val_accuracy: 0.9637 - val_loss: 0.1075
Epoch 2/50
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2236s[0m 14s/step - accuracy: 0.8290 - loss: 0.5741 - val_accuracy: 0.9687 - val_loss: 0.0897
Epoch 3/50
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2259s[0m 14s/step - accuracy: 0.8294 - loss: 0.5703 - val_accuracy: 0.9664 - val_loss: 0.0970
Epoch 4/50
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2231s[0m 14s/step - accuracy: 0.8224 - loss: 0.5857 - val_accuracy: 0.9673 - val_loss: 0.0969
Epoch 5/50
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2491s[0m 15s/step - accuracy: 0.8295 - loss: 0.5644 - val_accuracy: 0.9627 - val_loss: 0.1073
Epoch 6/50
[1m163/163[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2230s[0m 14s/step - accuracy:

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-29-4a6438a9a9f0>", line 2, in <cell line: 2>
    model.train_fine_tuning(x_train, y_train, x_test, y_test, 50, 128, 150)
  File "<ipython-input-20-22a9cb61a0d8>", line 181, in train_fine_tuning
    history = self.neural_network.fit(
  File "/usr/local/lib/python3.10/dist-packages/keras/src/utils/traceback_utils.py", line 117, in error_handler
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/keras/src/backend/tensorflow/trainer.py", line 318, in fit
    logs = self.train_function(iterator)
  File "/usr/local/lib/python3.10/dist-packages/tensorflow/python/util/traceback_utils.py", line 150, in error_handler
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/tensorflow/python/eager/polymorphic_function/polymorphic_fun

TypeError: object of type 'NoneType' has no len()

In [None]:
model.neural_network.save('weights_xl_2.keras')

In [None]:
"""model2 = model.neural_network
model2.compile(
  loss=tfk.losses.CategoricalCrossentropy(),
  optimizer=tfk.optimizers.Lion(learning_rate=1e-4),
  metrics=['accuracy']
)

model2.save('weights_squeeze.keras')"""

In [None]:
# Carica il dataset
"""data_original = np.load('training_set.npz')
x_test = data_original['images']
y_test = data_original['labels']"""

x_test = tf.cast(x_train, dtype=tf.float32)  # Convert to float32 if required
#y_test = tf.keras.utils.to_categorical(y_train, num_classes=8)

print(f"{y_train[0]} end")
print(f"{y_test[0]}")

"""# Load the model
model = tf.keras.models.load_model('weights_xl_2.keras')

# Test the model on some data
# Get predictions
predictions = model.predict(x_train)

# Evaluate the model
loss, accuracy = model.evaluate(x_train, y_train)
print(f"Loss: {loss}, Accuracy: {accuracy}")

# Example: print the predicted class for the first sample
predicted_class = tf.argmax(predictions[0]).numpy()
print(f"Predicted class for the first sample: {predicted_class}")"""


model.test(x_test, y_test)

In [None]:
preds = model.predict(x_train)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
y_true = np.argmax(y_train, axis=1)
y_pred = preds
# Genera la confusion matrix
cm = confusion_matrix(y_true, y_pred)
# Visualizza la confusion matrix con le etichette
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues)
# Usa una mappa di colori blu
plt.title("Confusion Matrix")
plt.show()

In [None]:
#model.neural_network.save('/gdrive/MyDrive/weights.keras')

## 📊 Prepare Your SubmissionTo prepare your submission, create a `.zip` file that includes all the necessary code to run your model. It **must** include a `model.py` file with the following class:```python# file: model.pyclass Model:    def __init__(self):        """Initialize the internal state of the model."""    def predict(self, X):        """Return a numpy array with the labels corresponding to the input X."""```The next cell shows an example implementation of the `model.py` file, which includes loading model weights from the `weights.keras` file and conducting predictions on provided input data. The `.zip` file is created and downloaded in the last notebook cell.❗ Feel free to modify the method implementations to better fit your specific requirements, but please ensure that the class name and method interfaces remain unchanged.

In [None]:
%%writefile '/gdrive/My Drive/[2024-2025] AN2DL/Homework 1/Base_3/model.py'

import numpy as np

import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl

class Model:
  def __init__(self):        # Carica il modello senza compilazione
    self.neural_network = tfk.models.load_model('weights.keras')

  def test(self, X_test, y_test):
    test_loss, test_acc = self.neural_network.evaluate(X_test, y_test)
    print(f'Test accuracy: {test_acc}')

  def predict(self, X):
    preds = self.neural_network.predict(X)
    if len(preds.shape) == 2:
      preds = np.argmax(preds, axis=1)
      return preds

In [None]:
#model3 = Model2()
#model3.test(X_test2, y_test2)

In [None]:
from datetime import datetime

#filename1 = f'submission_{datetime.now().strftime("%y%m%d_%H%M%S")}_NOFN.zip'
filename2 = f'submission_{datetime.now().strftime("%y%m%d_%H%M%S")}_FN.zip'

# Add files to the zip command if needed
#!zip {filename1} model.py weights_xl_1.keras
!zip {filename2} model.py weights_xl_2.keras

from google.colab import files
#files.download(filename1)
files.download(filename2)