In [22]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, Input
from tensorflow.keras.utils import Sequence, to_categorical
from tensorflow.keras.preprocessing.image import load_img, img_to_array

DAMAGE_TYPES = ['front', 'back', 'left', 'right']

class DamageDataGenerator(Sequence):
    def __init__(self, base_path, batch_size=8, input_shape=(224,224,3)):
        self.base_path = base_path
        self.batch_size = batch_size
        self.input_shape = input_shape
        self.T0_path = os.path.join(base_path, "T0")
        self.T1_path = os.path.join(base_path, "T1")
        self.damage_types = DAMAGE_TYPES
        self.file_paths = []
        for damage_type in self.damage_types:
            T0_files = os.listdir(os.path.join(self.T0_path, damage_type))
            T1_files = os.listdir(os.path.join(self.T1_path, damage_type))
            for file in T0_files:
                if file in T1_files:
                    self.file_paths.append((os.path.join(self.T0_path, damage_type, file), os.path.join(self.T1_path, damage_type, file)))

    def __len__(self):
        return int(np.ceil(len(self.file_paths) / self.batch_size))

    def __getitem__(self, idx):
        batch_files = self.file_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_T0 = np.zeros((len(batch_files), *self.input_shape), dtype=np.float32)
        batch_T1 = np.zeros((len(batch_files), *self.input_shape), dtype=np.float32)
        batch_labels = np.zeros((len(batch_files),), dtype=np.int32)
        for i, (T0_file, T1_file) in enumerate(batch_files):
            T0_img = load_img(T0_file, target_size=self.input_shape[:2])
            T1_img = load_img(T1_file, target_size=self.input_shape[:2])
            batch_T0[i] = img_to_array(T0_img) / 255.0
            batch_T1[i] = img_to_array(T1_img) / 255.0
            damage_type = os.path.basename(os.path.dirname(T0_file))
            batch_labels[i] = self.damage_types.index(damage_type)
        batch_anomaly = (np.abs(np.mean(batch_T1, axis=(1,2,3)) - np.mean(batch_T0, axis=(1,2,3))) > 0.05).astype(np.float32)
        batch_damage_type_onehot = to_categorical(batch_labels, num_classes=len(self.damage_types))
        return {"input_T0": batch_T0, "input_T1": batch_T1}, {"anomaly_detected": batch_anomaly, "damage_type": batch_damage_type_onehot}

def create_dual_input_model(input_shape=(224,224,3), num_damage_types=4):
    def branch():
        inp = Input(shape=input_shape)
        x = layers.Conv2D(16, (3,3), activation='relu')(inp)
        x = layers.MaxPooling2D((2,2))(x)
        x = layers.Conv2D(32, (3,3), activation='relu')(x)
        x = layers.MaxPooling2D((2,2))(x)
        x = layers.Conv2D(64, (3,3), activation='relu')(x)
        x = layers.MaxPooling2D((2,2))(x)
        x = layers.Flatten()(x)
        x = layers.Dense(128, activation='relu')(x)
        return models.Model(inp, x)

    input_T0 = Input(shape=input_shape, name="input_T0")
    input_T1 = Input(shape=input_shape, name="input_T1")

    extractor = branch()
    feat_T0 = extractor(input_T0)
    feat_T1 = extractor(input_T1)

    x = layers.Subtract()([feat_T1, feat_T0])
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dense(32, activation='relu')(x)

    anomaly_output = layers.Dense(1, activation='sigmoid', name="anomaly_detected")(x)
    damage_output = layers.Dense(num_damage_types, activation='softmax', name="damage_type")(x)

    return models.Model(inputs={"input_T0": input_T0, "input_T1": input_T1}, outputs={"anomaly_detected": anomaly_output, "damage_type": damage_output})

def main():
    base_path = "dataset"  
    batch_size = 8
    input_shape = (224, 224, 3)
    epochs = 8

    train_generator = DamageDataGenerator(base_path, batch_size=batch_size, input_shape=input_shape)

    model = create_dual_input_model(input_shape=input_shape, num_damage_types=len(DAMAGE_TYPES))
    model.compile(
        optimizer='adam',
        loss={
            "anomaly_detected": "binary_crossentropy",
            "damage_type": "categorical_crossentropy"
        },
        metrics={
            "anomaly_detected": "accuracy",
            "damage_type": "accuracy"
        }
    )

    model.fit(train_generator, epochs=epochs)

    model.save("damage_comparison_model.h5")
    print("✅ Modèle sauvegardé")

    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()
    with open("damage_comparison_model.tflite", "wb") as f:
        f.write(tflite_model)
    print("✅ Export TFLite terminé !")

if __name__ == "__main__":
    main()

Epoch 1/8


  self._warn_if_super_not_called()


[1m342/342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m714s[0m 2s/step - anomaly_detected_accuracy: 1.0000 - anomaly_detected_loss: 0.6526 - damage_type_accuracy: 0.3491 - damage_type_loss: 1.3748 - loss: 2.0274
Epoch 2/8
[1m342/342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m613s[0m 2s/step - anomaly_detected_accuracy: 1.0000 - anomaly_detected_loss: 0.5111 - damage_type_accuracy: 0.4437 - damage_type_loss: 1.3254 - loss: 1.8364
Epoch 3/8
[1m342/342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m837s[0m 2s/step - anomaly_detected_accuracy: 1.0000 - anomaly_detected_loss: 0.4032 - damage_type_accuracy: 0.3995 - damage_type_loss: 1.3052 - loss: 1.7084
Epoch 4/8
[1m342/342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4648s[0m 14s/step - anomaly_detected_accuracy: 1.0000 - anomaly_detected_loss: 0.3211 - damage_type_accuracy: 0.4678 - damage_type_loss: 1.2746 - loss: 1.5957
Epoch 5/8
[1m342/342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m734s[0m 2s/step - anomaly_dete



✅ Modèle sauvegardé
INFO:tensorflow:Assets written to: C:\Users\user\AppData\Local\Temp\tmpwi7dvqlo\assets


INFO:tensorflow:Assets written to: C:\Users\user\AppData\Local\Temp\tmpwi7dvqlo\assets


Saved artifact at 'C:\Users\user\AppData\Local\Temp\tmpwi7dvqlo'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): List[TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name='input_T0'), TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name='input_T1')]
Output Type:
  Dict[['anomaly_detected', TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)], ['damage_type', TensorSpec(shape=(None, 4), dtype=tf.float32, name=None)]]
Captures:
  2457242639760: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2457242640144: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2457242640336: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2457242641296: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2457242641488: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2457242642064: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2457242642256: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2457242642832: TensorSpec(shape