In [None]:
import tensorflow as tf
import os
import matplotlib.pyplot as plt
import numpy as np
import datetime
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from keras.saving import register_keras_serializable

# Set environment variables
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0'  # Enable all logs

# Optimize CPU usage
tf.config.threading.set_intra_op_parallelism_threads(8)
tf.config.threading.set_inter_op_parallelism_threads(8)

# Enable mixed precision
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# Vérifiez si le GPU Metal est disponible
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

# Activez la croissance de mémoire dynamique pour le GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

print("TensorFlow version:", tf.__version__)
print("Compute devices:", tf.config.list_physical_devices())

In [None]:
# Load and preprocess Fashion MNIST data
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

def preprocess_data(images, labels):
    images = tf.cast(images, tf.float32) / 255.0
    images = tf.expand_dims(images, -1)
    return images, labels

def augment(image, label):
    image = tf.cast(image, tf.float32)
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, max_delta=0.1)
    image = tf.image.random_contrast(image, lower=0.9, upper=1.1)
    return image, label

# Create efficient data pipelines
BATCH_SIZE = 512
AUTOTUNE = tf.data.AUTOTUNE

train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_ds = train_ds.map(preprocess_data, num_parallel_calls=AUTOTUNE)
train_ds = train_ds.cache().shuffle(60000).batch(BATCH_SIZE)
train_ds = train_ds.map(augment, num_parallel_calls=AUTOTUNE)
train_ds = train_ds.prefetch(AUTOTUNE)

test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_ds = test_ds.map(preprocess_data, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.batch(BATCH_SIZE).prefetch(AUTOTUNE)

In [None]:
# Build a more complex model using Functional API
def create_model():
    inputs = tf.keras.Input(shape=(28, 28, 1))
    x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same', kernel_regularizer=tf.keras.regularizers.l2(0.01))(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    x = tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    outputs = tf.keras.layers.Dense(10)(x)
    return tf.keras.Model(inputs=inputs, outputs=outputs)

@register_keras_serializable()
class AdjustableLearningRateSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, initial_learning_rate, decay_steps, decay_rate):
        self.initial_learning_rate = tf.cast(initial_learning_rate, tf.float32)
        self.decay_steps = tf.cast(decay_steps, tf.float32)
        self.decay_rate = tf.cast(decay_rate, tf.float32)
        self.current_learning_rate = tf.Variable(self.initial_learning_rate, dtype=tf.float32)

    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        return self.current_learning_rate * tf.math.pow(self.decay_rate, step / self.decay_steps)

    def adjust_learning_rate(self, factor):
        self.current_learning_rate.assign(self.current_learning_rate * tf.cast(factor, tf.float32))

    def get_config(self):
        return {
            "initial_learning_rate": float(self.initial_learning_rate.numpy()),
            "decay_steps": float(self.decay_steps.numpy()),
            "decay_rate": float(self.decay_rate.numpy()),
        }

    @classmethod
    def from_config(cls, config):
        return cls(**config)

# Créez le modèle
model = create_model()

# Définissez le learning rate schedule
initial_learning_rate = 0.001
lr_schedule = AdjustableLearningRateSchedule(
    initial_learning_rate=initial_learning_rate,
    decay_steps=100000.0,
    decay_rate=0.96
)

# Créez l'optimiseur
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

# Compilez le modèle
model.compile(
    optimizer=optimizer,
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)

In [None]:
# Custom callback to adjust learning rate
class AdjustLearningRateCallback(tf.keras.callbacks.Callback):
    def __init__(self, schedule, monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6):
        super(AdjustLearningRateCallback, self).__init__()
        self.schedule = schedule
        self.monitor = monitor
        self.factor = tf.cast(factor, tf.float32)
        self.patience = patience
        self.min_lr = tf.cast(min_lr, tf.float32)
        self.wait = 0
        self.best = float('inf')

    def on_epoch_end(self, epoch, logs=None):
        current = logs.get(self.monitor)
        if current < self.best:
            self.best = current
            self.wait = 0
        else:
            self.wait += 1
            if self.wait >= self.patience:
                new_lr = tf.maximum(self.schedule.current_learning_rate * self.factor, self.min_lr)
                self.schedule.current_learning_rate.assign(new_lr)
                print(f'\nEpoch {epoch+1}: reducing learning rate to {new_lr.numpy():.6f}')
                self.wait = 0

# Callbacks
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True, mode='max'),
    tf.keras.callbacks.ModelCheckpoint('best_model.weights.h5', monitor='val_accuracy', save_best_only=True, save_weights_only=True, mode='max'),
    tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1),
    AdjustLearningRateCallback(lr_schedule)
]

# Train the model
EPOCHS = 100

history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=test_ds,
    callbacks=callbacks,
    verbose=1
)

In [None]:
# Accédez à l'historique d'entraînement
train_loss = history.history['loss']
train_accuracy = history.history['accuracy']
val_loss = history.history['val_loss']
val_accuracy = history.history['val_accuracy']

# Load the best model weights
model.load_weights('best_model.weights.h5')

# Evaluate the model
test_loss, test_acc = model.evaluate(test_ds, verbose=2)
print(f'\nTest accuracy: {test_acc:.4f}')

# Plot and save training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_accuracy, label='Training Accuracy')
plt.plot(val_accuracy, label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.savefig('training_history.png')
plt.show()

print("Training history plot saved as 'training_history.png'")
print(f"Training stopped at epoch: {len(train_accuracy)}")
print(f"Best validation accuracy: {max(val_accuracy):.4f}")

In [None]:
# Test-time augmentation
@tf.function
def tta_predict(model, image, num_augmentations=10):
    predictions = tf.TensorArray(tf.float32, size=num_augmentations)
    for i in tf.range(num_augmentations):
        augmented_image, _ = augment(image, None)
        pred = model(augmented_image, training=False)
        pred = tf.cast(pred, tf.float32)  # Cast to float32
        predictions = predictions.write(i, pred)
    return tf.reduce_mean(predictions.stack(), axis=0)

# Apply test-time augmentation to a few test samples
num_samples = 5
tta_results = []
for i in range(num_samples):
    sample = test_ds.take(1).get_single_element()[0][i:i+1]
    tta_pred = tta_predict(model, sample)
    tta_results.append(tf.argmax(tta_pred, axis=-1).numpy()[0])

print("Test-Time Augmentation predictions for first 5 samples:", tta_results)
print("Actual labels for first 5 samples:", test_labels[:5])

In [None]:
# Function to get class names
def get_class_names():
    return ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
            'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

class_names = get_class_names()

# Function to plot images with predictions
def plot_image(i, predictions_array, true_label, img):
    true_label, img = true_label[i], img[i]
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])

    plt.imshow(img.reshape((28,28)), cmap=plt.cm.binary)

    predicted_label = np.argmax(predictions_array)
    if predicted_label == true_label:
        color = 'blue'
    else:
        color = 'red'

    plt.xlabel("{} {:2.0f}% ({})".format(class_names[predicted_label],
                                100*np.max(predictions_array),
                                class_names[true_label]),
                                color=color)

# Function to plot value array
def plot_value_array(i, predictions_array, true_label):
    true_label = true_label[i]
    plt.grid(False)
    plt.xticks(range(10))
    plt.yticks([])
    thisplot = plt.bar(range(10), predictions_array, color="#777777")
    plt.ylim([0, 1])
    predicted_label = np.argmax(predictions_array)

    thisplot[predicted_label].set_color('red')
    thisplot[true_label].set_color('blue')

# Plot a grid of predictions
num_rows = 5
num_cols = 3
num_images = num_rows*num_cols
plt.figure(figsize=(2*2*num_cols, 2*num_rows))
for i in range(num_images):
    plt.subplot(num_rows, 2*num_cols, 2*i+1)
    plot_image(i, model.predict(test_ds.take(1).get_single_element()[0][i:i+1])[0], test_labels, test_images)
    plt.subplot(num_rows, 2*num_cols, 2*i+2)
    plot_value_array(i, model.predict(test_ds.take(1).get_single_element()[0][i:i+1])[0], test_labels)
plt.tight_layout()
plt.savefig('prediction_grid.png')
plt.show()

print("Prediction grid saved as 'prediction_grid.png'")

In [None]:
# Confusion Matrix
predictions = model.predict(test_ds)
y_pred = np.argmax(predictions, axis=1)
cm = confusion_matrix(test_labels, y_pred)

plt.figure(figsize=(10,10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.savefig('confusion_matrix.png')
plt.show()

print("Confusion matrix saved as 'confusion_matrix.png'")

# Print classification report
print("\nClassification Report:")
print(classification_report(test_labels, y_pred, target_names=class_names))

# Save the model summary
with open('model_summary.txt', 'w') as f:
    model.summary(print_fn=lambda x: f.write(x + '\n'))

print("Model summary saved as 'model_summary.txt'")

print("\nTraining and evaluation complete!")

# Clear the session to free up resources
tf.keras.backend.clear_session()