In [None]:
# Import des bibliothèques nécessaires
import tensorflow as tf
from tensorflow.keras import initializers, layers, models, optimizers, callbacks
from tensorflow.keras import backend as K
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.datasets import mnist
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import os
import numpy as np
import matplotlib.pyplot as plt
import zipfile

In [None]:
class Length(layers.Layer):
    """
    Calcule la longueur des vecteurs.
    Utilisé pour produire un tenseur de même forme que `y_true` dans `margin_loss`.

    Entrée :
        - inputs : Tenseur de forme [dim_1, ..., dim_{n-1}, dim_n].
    Sortie :
        - Tenseur de forme [dim_1, ..., dim_{n-1}].
    """
    def call(self, inputs, **kwargs):
        return K.sqrt(K.sum(K.square(inputs), -1))  # Longueur euclidienne des vecteurs

    def compute_output_shape(self, input_shape):
        return input_shape[:-1]  # La sortie a la même forme que l'entrée sauf la dernière dimension


In [None]:
class Mask(layers.Layer):
    """
    Mask un tenseur avec la forme = [None, num_capsule, dim_vector] en masquant soit la capsule avec la longueur maximale,
    soit en utilisant un masque d'entrée additionnel. Toutes les autres capsules sont mises à zéro.
    """

    def __init__(self, **kwargs):
        super(Mask, self).__init__(**kwargs)

    def call(self, inputs, **kwargs):
        if isinstance(inputs, list):  # true label est fourni avec la forme = [None, n_classes], c'est-à-dire un code one-hot.
            assert len(inputs) == 2
            inputs, mask = inputs
        else:  # si aucun label n'est fourni, on masque par la capsule de longueur maximale. Utilisé principalement pour la prédiction
            # calcul des longueurs des capsules
            x = K.sqrt(K.sum(K.square(inputs), -1))
            # génère le masque qui est un code one-hot.
            # mask.shape = [None, n_classes] = [None, num_capsule]
            mask = tf.one_hot(indices=tf.argmax(x, 1), depth=tf.shape(inputs)[1])

        # inputs.shape=[None, num_capsule, dim_capsule]
        # mask.shape=[None, num_capsule]
        # masked.shape=[None, num_capsule * dim_capsule]
        masked = layers.Flatten()(inputs * K.expand_dims(mask, -1))
        return masked

    def compute_output_shape(self, input_shape):
        if isinstance(input_shape[0], tuple):  # true label fourni
            return (None, input_shape[0][1] * input_shape[0][2])
        else:  # pas de true label fourni
            return (None, input_shape[1] * input_shape[2])

    def get_config(self):
        config = super(Mask, self).get_config()
        return config



In [None]:
def squash(vectors, axis=-1):
    """
    Fonction d'activation non linéaire utilisée dans les capsules.
    Elle rapproche la longueur d'un grand vecteur de 1 et celle d'un petit vecteur de 0.

    :param vectors: Vecteurs à transformer.
    :param axis: Axe selon lequel appliquer la transformation.
    :return: Tenseur de même forme que l'entrée.
    """
    s_squared_norm = K.sum(K.square(vectors), axis, keepdims=True)  # Norme au carré
    scale = s_squared_norm / (1 + s_squared_norm) / K.sqrt(s_squared_norm + K.epsilon())  # Facteur d'échelle
    return scale * vectors

In [None]:
def PrimaryCap(inputs, dim_vector, n_channels, kernel_size, strides, padding):
    """
    Applique Conv2D `n_channels` fois et concatène toutes les capsules.

    :param inputs: Tenseur 4D, de forme [None, largeur, hauteur, canaux].
    :param dim_vector: Dimension des vecteurs de sortie.
    :param n_channels: Nombre de types de capsules.
    :return: Tenseur de sortie, forme [None, num_capsule, dim_vector].
    """
    output = layers.Conv2D(filters=dim_vector * n_channels, kernel_size=kernel_size, strides=strides, padding=padding,
                           name='primarycap_conv2d')(inputs)
    return output

In [None]:
def margin_loss(labels, raw_logits, margin=0.4, downweight=0.5):
    """
    Fonction de perte Margin pour Capsule Network.

    Args:
        labels: Tensor, les labels réels en encoding one-hot (shape: [batch_size, num_classes]).
        raw_logits: Tensor, prédictions du modèle dans l'intervalle [0, 1].
        margin: Scalar, la marge après soustraction de 0.5 des logits (par défaut 0.4).
        downweight: Scalar, facteur d'atténuation pour le coût des valeurs négatives.

    Returns:
        Tensor: La perte totale calculée pour chaque exemple dans le batch.
    """
    logits = raw_logits - 0.5  # Centrage des logits autour de 0 avec une marge de 0.4.

    # Coût pour les valeurs positives
    positive_cost = labels * tf.cast(tf.less(logits, margin), tf.float32) * tf.square(logits - margin)

    # Coût pour les valeurs négatives
    negative_cost = (1 - labels) * tf.cast(tf.greater(logits, -margin), tf.float32) * tf.square(logits + margin)

    # Combinaison des deux composantes de la perte
    loss = 0.5 * positive_cost + downweight * 0.5 * negative_cost

    # Moyenne des pertes sur le batch
    return tf.reduce_mean(tf.reduce_sum(loss, axis=1))

In [None]:
def combined_loss(y_true, y_pred_caps, y_pred_decoder):
    # Perte pour les capsules (Margin Loss)
    caps_loss = margin_loss(y_true, y_pred_caps)

    # Perte de reconstruction (MSE)
    recon_loss = tf.keras.losses.mean_squared_error(y_true, y_pred_decoder)
    recon_loss = tf.reduce_mean(recon_loss)

    # Ponderation des deux pertes
    alpha = 0.0005  # Coefficient pour ajuster l'importance de la reconstruction
    total_loss = caps_loss + alpha * recon_loss

    return total_loss


In [None]:

# Data Preprocessing
def load_dataset(data_dir, img_size, batch_size):
    # Création des générateurs de données avec ImageDataGenerator
    datagen = ImageDataGenerator(
        rescale=1.0 / 255,  # Normalisation des pixels
        validation_split=0.2,  # Séparation de la validation
    )

    # Création du générateur de données pour l'entraînement
    train_generator = datagen.flow_from_directory(
        data_dir,
        target_size=img_size,
        batch_size=batch_size,
        class_mode="categorical",  # Sortie catégorique
        subset="training",  # Subset d'entraînement
    )

    # Création du générateur de données pour la validation
    val_generator = datagen.flow_from_directory(
        data_dir,
        target_size=img_size,
        batch_size=batch_size,
        class_mode="categorical",  # Sortie catégorique
        subset="validation",  # Subset de validation
    )


    # Retourner les données sous forme de tuple
    return train_generator, val_generator

In [None]:
def unzip_file(zip_path, dest_path):
    """
    Décompresse un fichier ZIP dans le répertoire de destination spécifié.

    :param zip_path: Le chemin du fichier ZIP à décompresser
    :param dest_path: Le chemin du répertoire de destination pour l'extraction
    """
    # Vérifier si le fichier ZIP existe
    if not os.path.exists(zip_path):
        print(f"Le fichier {zip_path} n'existe pas.")
        return

    # Créer le répertoire de destination s'il n'existe pas
    if not os.path.exists(dest_path):
        os.makedirs(dest_path)

    # Décompresser le fichier ZIP
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(dest_path)

    print(f"Le fichier ZIP a été décompressé dans {dest_path}")

In [None]:
class CapsuleLayer(layers.Layer):
    def __init__(self, num_capsules, dim_vector, num_routing=3, **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsules = num_capsules
        self.dim_vector = dim_vector
        self.num_routing = num_routing

    def build(self, input_shape):
        self.input_num_capsules = input_shape[1]  # Nombre de capsules d'entrée
        self.input_dim_vector = input_shape[2]  # Dimension de chaque vecteur de capsule d'entrée

        # Matrice de poids W pour transformer les capsules d'entrée
        self.W = self.add_weight(
            shape=[self.input_num_capsules, self.input_dim_vector,self.num_capsules*self.dim_vector],
            #[input_dim, input_atoms, output_dim * output_atoms]
            initializer='glorot_uniform',
            trainable=True,
            name='W'
            #constraint=tf.keras.constraints.MinMaxNorm(min_value=1.0, max_value=2.0)
        )
        # Initialisation des biais
        self.bias = self.add_weight(
            shape=(self.num_capsules,self.dim_vector),
            initializer='zeros',
            trainable=True,
            name='bias'
        )
        #biases = variables.bias_variable([output_dim, output_atoms])
    def call(self, inputs):

        # Étape 1 : Calcul des votes
        # Étendre les dimensions d'entrée et multiplier par les poids
        input_tiled = tf.expand_dims(inputs, -1)  # Ajoute une dimension à la fin
        input_tiled = tf.tile(input_tiled, [1, 1, 1, self.num_capsules*self.dim_vector])  # Répéter pour chaque capsule de sortie
        votes = tf.reduce_sum(input_tiled * self.W , axis=2)  # Multiplier et réduire sur input_atoms
        votes_reshaped = tf.reshape(votes, [-1, self.input_num_capsules, self.num_capsules, self.dim_vector])
        input_shape = tf.shape(inputs)
        logit_shape = tf.stack([input_shape[0], self.input_num_capsules, self.num_capsules])
        # 🚀 Implémentation directe du routage dynamique
        batch_size, input_dim, output_dim, output_atoms = tf.shape(votes_reshaped)[0], tf.shape(votes_reshaped)[1], tf.shape(votes_reshaped)[2], tf.shape(votes_reshaped)[3]
        b_ij = tf.zeros([batch_size, input_dim, output_dim])
        # Itérations pour mise à jour des connexions
        for i in range(self.num_routing):
          c_ij = tf.nn.softmax(b_ij, axis=2)
          s_j = tf.reduce_sum(c_ij[..., tf.newaxis] * votes_reshaped, axis=1) + self.bias
          v_j = squash(s_j)
          if i < self.num_routing - 1:
            delta_b_ij = tf.reduce_sum(votes_reshaped * v_j[:, tf.newaxis, :, :], axis=-1)
            b_ij += delta_b_ij
          return v_j

    @staticmethod
    def squash(vectors, axis=-1):
        """
        Fonction squash pour normaliser les vecteurs des capsules.
        """
        s_squared_norm = tf.reduce_sum(tf.square(vectors), axis=axis, keepdims=True)  # Norme au carré
        scale = s_squared_norm / (1 + s_squared_norm) / tf.sqrt(s_squared_norm + 1e-7)  # Normalisation
        return scale * vectors

In [None]:
# Définition de l'architecture du réseau
def CapsNet(input_shape, n_class, num_routing):
    x = layers.Input(shape=input_shape)

    conv1 = layers.Conv2D(filters=256, kernel_size=9, strides=1, padding='valid', activation='relu', name='conv1')(x)
    primarycaps = layers.Conv2D(32 * 8, kernel_size=9, strides=2, padding='valid', activation='relu')(conv1)
    # Calculer la forme correcte de reshape
    #primarycaps_shape = primarycaps.shape
    #num_capsules = primarycaps_shape[1] * primarycaps_shape[2]  # Calcul du nombre de capsules
    # Reshaper à (num_capsules, dim_vector) qui devrait être (32*6, 8), soit (192, 8)
    primarycaps = layers.Reshape(target_shape=[24*24*32, 8])(primarycaps)
    #primarycaps = layers.Lambda(lambda z: tf.sqrt(tf.reduce_sum(tf.square(z), -1)))(primarycaps)
    #print(primarycaps.shape)
    # Convolutional Layers
    """
    primarycaps = layers.Conv2D(64, (3, 3), activation="relu", padding="valid")(x)
    primarycaps = layers.Conv2D(128, (3, 3), activation="relu", padding="valid")(primarycaps)
    primarycaps = layers.Conv2D(256, (3, 3), activation="relu", padding="valid")(primarycaps)
    # Reshape to Flattened Capsules
    primarycaps = layers.Reshape((-1, 256))(primarycaps)
    """
    # Appliquer squash avec Lambda et spécifier output_shape
    primarycaps = layers.Lambda(squash, name='primarycap_squash')(primarycaps)
    print('squach',primarycaps.shape)
    # Capsule Layer
    digitcaps = CapsuleLayer(num_capsules=n_class, dim_vector=16, num_routing=num_routing, name='digitcaps')(primarycaps)
    # Compute the norm of capsule outputs
    outputs = layers.Lambda(lambda z: tf.norm(z, axis=-1))(digitcaps)
    # Apply softmax activation for classification
    #outputs = layers.Activation('softmax')(outputs)
    model = models.Model(x, outputs)


    return model

In [None]:
# Entraînement du modèle
def train(model, data, args):
    """
    Entraîne le modèle pour la première fois sans utiliser les callbacks supplémentaires.
    """
    (x_train, y_train), (x_test, y_test) = data

    # Compiler le modèle
    model.compile(optimizer=optimizers.Adam(learning_rate=args.lr),
              loss='categorical_crossentropy',
              metrics=['accuracy'],) # Provide metrics for both outputs

    # Entraîner le modèle sans callbacks
    # Change the model.fit call to:
    model.fit(x_train, y_train,
          batch_size=args.batch_size, epochs=args.epochs,
          validation_data=(x_test, y_test))

    # Sauvegarder les poids du modèle après l'entraînement
    model.save_weights(os.path.join(args.save_dir, 'trained_model.weights.h5'))
    print('Modèle entraîné et sauvegardé.')

In [None]:
# Entraînement du modèle sur GPU
def train_GPU(model, data, args):
    """
    Entraîne le modèle pour la première fois sans utiliser les callbacks supplémentaires.
    """
    (x_train, y_train), (x_test, y_test) = data

    # Compile the model
    model.compile(
        optimizer=optimizers.Adam(learning_rate=args.lr),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    # Spécifiez l'entraînement sur GPU
    with tf.device('/GPU:0'):
        model.fit(
            x_train,
            y_train,
            batch_size=args.batch_size,
            epochs=args.epochs,
            validation_data=(x_test, y_test)
        )

    # Sauvegarder les poids du modèle après l'entraînement
    model.save_weights(os.path.join(args.save_dir, 'trained_model.weights.h5'))
    print('Modèle entraîné et sauvegardé.')


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Exemple d'utilisation
zip_path = '/content/drive/MyDrive/EuroSAT_RGB.zip'  # Remplacez par votre chemin ZIP
dest_path = '/content/EuroSAT_RGB'  # Remplacez par votre chemin de destination

unzip_file(zip_path, dest_path)

In [None]:
# Paths et paramètres
#data_dir = '/content/drive/MyDrive/EuroSAT_RGB'
data_dir = '/content/EuroSAT_RGB/EuroSAT_RGB'
img_size = (64, 64)  # Taille de l'image
batch_size = 32

In [None]:
# Exécution principale
class Args:
  batch_size = 32
  epochs = 100
  lam_recon = 0.392
  num_routing = 3
  shift_fraction = 0.1
  debug = 0
  save_dir = '/content/results'
  weights = None
  lr = 0.01
args = Args()

In [None]:
train_generator, val_generator = load_dataset(data_dir, img_size, batch_size)

In [None]:
import tensorflow as tf

# Vérifie la disponibilité des périphériques GPU
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
model = CapsNet(input_shape=[64, 64, 3], n_class=10, num_routing=args.num_routing)
model.summary()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
              loss=margin_loss,
              metrics=['accuracy'])

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
history = model.fit(train_generator,
          batch_size=args.batch_size, epochs=10,
          validation_data=val_generator,callbacks=[callback])

In [None]:
# Affiche l'historique des pertes et des précisions
print("Loss History:", history.history['loss'])
print("Validation Loss History:", history.history['val_loss'])
print("Accuracy History:", history.history['accuracy'])
print("Validation Accuracy History:", history.history['val_accuracy'])

In [None]:
import matplotlib.pyplot as plt

# Affichez la courbe de perte
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.legend()
plt.title('Loss over epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

# Affichez la courbe de précision
if 'accuracy' in history.history:
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.legend()
    plt.title('Accuracy over epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.show()


In [None]:
# Obtenez les prédictions du modèle
predictions = model.predict(val_generator)

# Pour un problème de classification, convertissez les prédictions en classes
predicted_classes = np.argmax(predictions, axis=1)

# Obtenez les vraies étiquettes
true_classes = val_generator.classes
class_labels = list(val_generator.class_indices.keys())

# Afficher quelques prédictions avec les images
plt.figure(figsize=(12, 12))
for i in range(9):
    plt.subplot(3, 3, i + 1)
    img, label = val_generator[i]
    plt.imshow(img[0])
    true_label = class_labels[true_classes[i]]
    predicted_label = class_labels[predicted_classes[i]]
    plt.title(f"True: {true_label}\nPred: {predicted_label}")
    plt.axis('off')
plt.show()


In [None]:
# Sauvegarder les poids du modèle après l'entraînement
model.save_weights(os.path.join('/content/drive/MyDrive/weights', 'trained_model.weights.h5'))
print('Modèle entraîné et sauvegardé.')