In [None]:
import os
import shutil
import random
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import layers, models
from tensorflow.keras.applications import InceptionV3, VGG16, ResNet50
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import confusion_matrix, classification_report

# --- 1. Global Configuration ---
# Data and Model Paths
source_dirs = {
    "sunny": "./data/sunny", "cloudy": "./data/cloudy",
    "rainy": "./data/rainy", "snowy": "./data/snowy", "foggy": "./data/foggy",
}
base_train_dir = Path("./data/weather_train")
base_valid_dir = Path("./data/weather_validation")
base_test_dir = Path("./data/weather_test")
SAVED_MODELS_DIR = Path("./saved_models")
RESULTS_DIR = Path("./results")

# Data and Training Hyperparameters
train_pct, validation_pct, test_pct = 0.70, 0.15, 0.15
IMG_SIZE = (150, 150)
BATCH_SIZE = 128
NUM_TRAINING_RUNS = 10

# --- 2. Setup Functions ---
def setup_data_directories():
    """(Onetime) Cleans and creates the directory structure for train/val/test splits."""
    print("--- Setting up data directories ---")
    for base_dir in [base_train_dir, base_valid_dir, base_test_dir]:
        if base_dir.exists(): shutil.rmtree(base_dir)
        for weather_type in source_dirs.keys():
            os.makedirs(base_dir / weather_type, exist_ok=True)
    for weather_type, source_path_str in source_dirs.items():
        source_path = Path(source_path_str)
        if not source_path.exists(): continue
        all_images = [f for f in os.listdir(source_path) if os.path.isfile(source_path / f)]
        random.shuffle(all_images)
        total_images = len(all_images)
        train_amount = int(total_images * train_pct)
        validation_amount = int(total_images * validation_pct)
        train_split = all_images[:train_amount]
        valid_split = all_images[train_amount : train_amount + validation_amount]
        test_split = all_images[train_amount + validation_amount :]
        for image in train_split: shutil.copyfile(source_path / image, base_train_dir / weather_type / image)
        for image in valid_split: shutil.copyfile(source_path / image, base_valid_dir / weather_type / image)
        for image in test_split: shutil.copyfile(source_path / image, base_test_dir / weather_type / image)
    print("Data preparation complete.\n")

def create_data_generators():
    """Creates and returns train, validation, and test data generators."""
    train_datagen = ImageDataGenerator(rescale=1.0 / 255)
    valid_datagen = ImageDataGenerator(rescale=1.0 / 255)
    test_datagen = ImageDataGenerator(rescale=1.0 / 255)
    train_generator = train_datagen.flow_from_directory(directory=base_train_dir, target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode="categorical", seed=63)
    valid_generator = valid_datagen.flow_from_directory(directory=base_valid_dir, target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode="categorical", seed=63)
    test_generator = test_datagen.flow_from_directory(directory=base_test_dir, target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode="categorical", shuffle=False, seed=63)
    return train_generator, valid_generator, test_generator

# --- 3. Model Building Functions ---
def create_custom_cnn(input_shape, num_classes):
    """Builds the custom CNN from scratch."""
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation="relu", input_shape=input_shape),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation="relu"), layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation="relu"), layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation="relu"), layers.MaxPooling2D((2, 2)),
        layers.Flatten(), layers.Dense(512, activation="relu"),
        layers.Dense(num_classes, activation="softmax"),
    ])
    return model

def create_transfer_model(base_model_fn, input_shape, num_classes):
    """Builds a transfer learning model with a given base."""
    conv_base = base_model_fn(weights="imagenet", include_top=False, input_shape=input_shape)
    conv_base.trainable = False
    model = models.Sequential([
        conv_base,
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(512, activation="relu"),
        layers.Dense(num_classes, activation="softmax"),
    ])
    return model

# --- 4. Plotting and Evaluation Functions ---
def save_history_plot(history, title, save_path):
    """Plots training/validation accuracy and loss, then saves the figure."""
    acc, val_acc = history.history["acc"], history.history["val_acc"]
    loss, val_loss = history.history["loss"], history.history["val_loss"]
    epochs = range(1, len(acc) + 1)
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    fig.suptitle(title, fontsize=16)
    ax1.plot(epochs, acc, "bo", label="Training acc"); ax1.plot(epochs, val_acc, "b", label="Validation acc"); ax1.set_title("Training and validation accuracy"); ax1.legend()
    ax2.plot(epochs, loss, "bo", label="Training loss"); ax2.plot(epochs, val_loss, "b", label="Validation loss"); ax2.set_title("Training and validation loss"); ax2.legend()
    plt.savefig(save_path); plt.close(fig)

def evaluate_and_save_results(model, test_generator, run_dir):
    """Evaluates a single model, saves its individual reports, and returns its predictions."""
    print(f"--- Generating individual report and saving results to: {run_dir} ---")
    y_pred_probs = model.predict(test_generator, verbose=0)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = test_generator.classes
    class_labels = list(test_generator.class_indices.keys())
    report = classification_report(y_true, y_pred, target_names=class_labels)
    report_path = run_dir / "classification_report.txt"
    with open(report_path, "w") as f:
        f.write(f"Classification Report for {run_dir.parent.name} - {run_dir.name}\n" + "=" * 50 + "\n" + report)
    print(f"Individual classification report saved to {report_path}")
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_labels, yticklabels=class_labels)
    plt.title(f"Confusion Matrix - {run_dir.parent.name} ({run_dir.name})"); plt.ylabel("True Label"); plt.xlabel("Predicted Label")
    cm_path = run_dir / "confusion_matrix.png"
    plt.savefig(cm_path); plt.close()
    print(f"Individual confusion matrix plot saved to {cm_path}")
    return y_pred

def create_combined_confusion_matrix(predictions_dict, y_true, class_labels, run_number, save_dir):
    """Creates a single plot with 2x2 confusion matrices for all models."""
    print("\n" + "-" * 50 + f"\nGenerating Combined Confusion Matrix for Run {run_number}\n" + "-" * 50)
    fig, axes = plt.subplots(2, 2, figsize=(16, 14))
    axes = axes.flatten()
    fig.suptitle(f"Model Comparison: Confusion Matrices (Run {run_number})", fontsize=20)
    for i, (model_name, y_pred) in enumerate(predictions_dict.items()):
        cm = confusion_matrix(y_true, y_pred)
        sns.heatmap(cm, ax=axes[i], annot=True, fmt="d", cmap="Blues", xticklabels=class_labels, yticklabels=class_labels)
        axes[i].set_title(model_name); axes[i].set_ylabel("True Label"); axes[i].set_xlabel("Predicted Label")
    save_path = save_dir / f"Combined_Matrix_Run_{run_number}.png"
    plt.tight_layout(rect=[0, 0, 1, 0.96]); plt.savefig(save_path); plt.close(fig)
    print(f"Combined confusion matrix plot saved to {save_path}")

# --- 5. Main Execution Block ---
if __name__ == "__main__":
    # setup_data_directories()
    SAVED_MODELS_DIR.mkdir(exist_ok=True); RESULTS_DIR.mkdir(exist_ok=True)
    train_gen, valid_gen, test_gen = create_data_generators()
    num_classes = len(train_gen.class_indices)
    input_shape = IMG_SIZE + (3,)
    y_true_labels = test_gen.classes
    class_names = list(test_gen.class_indices.keys())

    models_to_train = [
        {"name": "Custom_CNN", "builder": create_custom_cnn, "epochs": 50, "optimizer": "rmsprop"},
        {"name": "InceptionV3", "builder": lambda: create_transfer_model(InceptionV3, input_shape, num_classes), "epochs": 50, "optimizer": "rmsprop"},
        {"name": "VGG16", "builder": lambda: create_transfer_model(VGG16, input_shape, num_classes), "epochs": 50, "optimizer": "rmsprop"},
        {"name": "ResNet50", "builder": lambda: create_transfer_model(ResNet50, input_shape, num_classes), "epochs": 50, "optimizer": "rmsprop"},
    ]

    for run_number in range(1, NUM_TRAINING_RUNS + 1):
        run_predictions = {}
        print("\n" + "#" * 70 + f"\n### PROCESSING RUN {run_number}/{NUM_TRAINING_RUNS}\n" + "#" * 70)

        for config in models_to_train:
            model_name = config["name"]
            model_path = SAVED_MODELS_DIR / f"{model_name}.keras"
            run_dir = RESULTS_DIR / model_name / f"Run_{run_number}"
            print("\n" + "=" * 60 + f"\n=== Training {model_name} for Run {run_number} ===\n" + "=" * 60)
            run_dir.mkdir(parents=True, exist_ok=True)

            if model_path.exists():
                print(f"Loading existing model from: {model_path}")
                model = models.load_model(model_path)
            else:
                print("Creating a new model...")
                model = config["builder"](input_shape, num_classes) if model_name == "Custom_CNN" else config["builder"]()
                model.compile(loss="categorical_crossentropy", optimizer=config["optimizer"], metrics=["acc"])

            early_stopping = EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True, verbose=1)
            history = model.fit(train_gen, epochs=config["epochs"], validation_data=valid_gen, callbacks=[early_stopping])
            print(f"Saving model to: {model_path}"); model.save(model_path)
            history_plot_path = run_dir / "training_history.png"
            save_history_plot(history, f"{model_name} - Run {run_number}", history_plot_path)
            print(f"Training history plot saved to {history_plot_path}")
            print("\n--- Performing quick test evaluation ---")
            eval_test = model.evaluate(test_gen, verbose=0)
            print(f"--> Test Accuracy: {eval_test[1] * 100:.2f}%")

            y_pred = evaluate_and_save_results(model, test_gen, run_dir)
            run_predictions[model_name] = y_pred

        create_combined_confusion_matrix(run_predictions, y_true_labels, class_names, run_number, RESULTS_DIR)

    print("\n\nAll training and evaluation cycles complete.")

Found 12623 images belonging to 5 classes.
Found 2705 images belonging to 5 classes.
Found 2710 images belonging to 5 classes.

######################################################################
### PROCESSING RUN 1/10
######################################################################

=== Training Custom_CNN for Run 1 ===
Loading existing model from: saved_models\Custom_CNN.keras


  self._warn_if_super_not_called()


Epoch 1/50
[1m99/99[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 890ms/step - acc: 0.8142 - loss: 0.4893 - val_acc: 0.7135 - val_loss: 0.7942
Epoch 2/50
[1m99/99[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m87s[0m 883ms/step - acc: 0.8352 - loss: 0.4272 - val_acc: 0.7287 - val_loss: 0.8053
Epoch 3/50
[1m99/99[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m84s[0m 844ms/step - acc: 0.8703 - loss: 0.3487 - val_acc: 0.7213 - val_loss: 0.8383
Epoch 4/50
[1m78/99[0m [32m━━━━━━━━━━━━━━━[0m[37m━━━━━[0m [1m16s[0m 792ms/step - acc: 0.9070 - loss: 0.2693