In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import random
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from torchvision.utils import make_grid

In [None]:
# Set random seeds for reproducibility
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Define paths
base_dir = '../data/DAWN_processed'
train_dir = os.path.join(base_dir, 'train')
val_dir = os.path.join(base_dir, 'val')
test_dir = os.path.join(base_dir, 'test')

# Define classes (weather conditions)
classes = ['Fog', 'Rain', 'Snow', 'Sand']

# Define parameters
img_height, img_width = 224, 224
batch_size = 32
epochs = 20  # Increased epochs for better training
learning_rate = 0.0001
weight_decay = 1e-4  # L2 regularization

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:

# Data transformations
train_transforms = transforms.Compose([
    transforms.Resize((img_height, img_width)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.2, scale=(0.02, 0.25), ratio=(0.3, 3.3))  # Added RandomErasing for robust training.
])

val_test_transforms = transforms.Compose([
    transforms.Resize((img_height, img_width)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load datasets
train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
val_dataset = datasets.ImageFolder(val_dir, transform=val_test_transforms)
test_dataset = datasets.ImageFolder(test_dir, transform=val_test_transforms)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=False)

print(f"Classes: {train_dataset.classes}")
print(f"Class to idx mapping: {train_dataset.class_to_idx}")
print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")



In [None]:
# Define CNN model (using pre-trained ResNet18 with modifications)
def create_pretrained_model(num_classes=4):
    model = models.resnet18(weights='IMAGENET1K_V1')
    # Freeze early layers
    for param in list(model.parameters())[:-4]:
        param.requires_grad = False
    # Add dropout and batch normalization after ResNet's feature extractor
    model.fc = nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(model.fc.in_features, 512),
        nn.ReLU(inplace=True),
        nn.BatchNorm1d(512),
        nn.Dropout(0.3),
        nn.Linear(512, num_classes)
    )
    return model


model = create_pretrained_model(num_classes=len(classes)).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)  # Added weight decay.
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3,
                                                 verbose=True)  # increased patience



In [None]:
# Training function (same as before)
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs):
    model.train()
    history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': []}
    best_val_loss = float('inf')
    best_model_state = None

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0

        model.train()
        train_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} [Train]")
        for inputs, labels in train_bar:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            train_bar.set_postfix({'loss': loss.item(), 'acc': 100 * correct / total})

        train_loss = running_loss / len(train_loader.dataset)
        train_acc = 100 * correct / total

        val_loss, val_acc = evaluate_model(model, val_loader, criterion)
        scheduler.step(val_loss)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict().copy()

        print(f'Epoch {epoch + 1}/{num_epochs}: '
              f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, '
              f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')

        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['train_acc'].append(train_acc)
        history['val_acc'].append(val_acc)

    if best_model_state:
        model.load_state_dict(best_model_state)
        print(f"Loaded best model with validation loss: {best_val_loss:.4f}")

    return model, history



In [None]:
# Evaluation function (same as before)
def evaluate_model(model, data_loader, criterion=None):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

            if criterion:
                loss = criterion(outputs, labels)
                running_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = running_loss / len(data_loader.dataset) if criterion else 0
    acc = 100 * correct / total

    return avg_loss, acc


# Plot function
def plot_training_history(history):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

    # Plot accuracy
    ax1.plot(history['train_acc'])
    ax1.plot(history['val_acc'])
    ax1.set_title('Model accuracy')
    ax1.set_ylabel('Accuracy (%)')
    ax1.set_xlabel('Epoch')
    ax1.legend(['Train', 'Validation'], loc='lower right')

    # Plot loss
    ax2.plot(history['train_loss'])
    ax2.plot(history['val_loss'])
    ax2.set_title('Model loss')
    ax2.set_ylabel('Loss')
    ax2.set_xlabel('Epoch')
    ax2.legend(['Train', 'Validation'], loc='upper right')

    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.show()


# Confusion matrix function
def plot_confusion_matrix(model, data_loader, classes):
    y_true = []
    y_pred = []

    model.eval()
    with torch.no_grad():
        for inputs, labels in tqdm(data_loader, desc="Generating predictions"):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

    # Generate confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=classes,
                yticklabels=classes)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.savefig('confusion_matrix.png')
    plt.show()

    # Print classification report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=classes))

    return y_true, y_pred



In [None]:
# Visualize predictions function
def visualize_predictions(model, data_loader, classes, num_images=12):
    model.eval()

    # Get a batch of images
    dataiter = iter(data_loader)
    images, labels = next(dataiter)
    images, labels = images[:num_images], labels[:num_images]

    # Make predictions
    with torch.no_grad():
        outputs = model(images.to(device))
        _, predicted = torch.max(outputs, 1)
        predicted = predicted.cpu()

    # Plot images with predictions
    fig = plt.figure(figsize=(15, 10))

    for i in range(num_images):
        # Add subplot
        ax = fig.add_subplot(3, 4, i + 1)

        # Un-normalize the image for display
        img = images[i].cpu().numpy().transpose((1, 2, 0))
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        img = std * img + mean
        img = np.clip(img, 0, 1)

        # Display the image
        ax.imshow(img)

        # Add titles
        true_label = classes[labels[i]]
        pred_label = classes[predicted[i]]
        title_color = 'green' if labels[i] == predicted[i] else 'red'
        ax.set_title(f"True: {true_label}\nPred: {pred_label}", color=title_color)
        plt.axis('off')

    plt.tight_layout()
    plt.savefig('prediction_examples.png')
    plt.show()



In [None]:
# Function to predict a single image
def predict_image(model, image_path, classes, transform=None):
    if transform is None:
        transform = val_test_transforms

    # Load and preprocess the image
    from PIL import Image
    image = Image.open(image_path).convert('RGB')
    image_tensor = transform(image).unsqueeze(0).to(device)

    # Predict
    model.eval()
    with torch.no_grad():
        outputs = model(image_tensor)
        probabilities = F.softmax(outputs, dim=1)[0]
        _, predicted_idx = torch.max(probabilities, 0)

    # Get the predicted class and confidence
    predicted_class = classes[predicted_idx.item()]
    confidence = probabilities[predicted_idx].item()

    # Get all class probabilities
    all_probs = {classes[i]: prob.item() for i, prob in enumerate(probabilities)}

    return {
        'class': predicted_class,
        'confidence': confidence,
        'all_probabilities': all_probs
    }


# Function to load the saved model for later use
def load_model(model_path, num_classes=4):
    checkpoint = torch.load(model_path)

    # Create a new model instance
    model = create_pretrained_model(num_classes=num_classes)

    # Load the saved state dict
    model.load_state_dict(checkpoint['model_state_dict'])
    model = model.to(device)
    model.eval()

    return model, checkpoint['classes']



In [None]:
# Main execution
if __name__ == "__main__":
    # Define multiple optimizers for experimentation
    optimizers = {
        "Adam": lambda params: optim.Adam(params, lr=learning_rate, weight_decay=weight_decay),
        "SGD": lambda params: optim.SGD(params, lr=learning_rate, momentum=0.9, weight_decay=weight_decay),
        "RMSprop": lambda params: optim.RMSprop(params, lr=learning_rate, alpha=0.9, weight_decay=weight_decay),
        "Adagrad": lambda params: optim.Adagrad(params, lr=learning_rate, weight_decay=weight_decay),
        "AdamW": lambda params: optim.AdamW(params, lr=learning_rate, weight_decay=weight_decay),
    }

    results = {}

    for opt_name, opt_func in optimizers.items():
        print(f"\nTraining with {opt_name} optimizer...\n")

        # Create a fresh model instance
        model = create_pretrained_model(num_classes=len(classes)).to(device)

        # Define optimizer and scheduler
        optimizer = opt_func(model.parameters())
        criterion = nn.CrossEntropyLoss()
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)

        # Train model
        trained_model, history = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs)

        # Evaluate model on test data
        test_loss, test_acc = evaluate_model(trained_model, test_loader, criterion)

        # Generate confusion matrix and classification report
        y_true, y_pred = plot_confusion_matrix(trained_model, test_loader, classes)
        report = classification_report(y_true, y_pred, target_names=classes, output_dict=True)

        # Store results
        results[opt_name] = {
            "train_history": history,
            "test_loss": test_loss,
            "test_accuracy": test_acc,
            "classification_report": report
        }

        print(f"Completed training with {opt_name}. Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.2f}%")

    # Print summary of all optimizer results
    for opt_name, res in results.items():
        print(f"\nOptimizer: {opt_name}")
        print(f"Test Loss: {res['test_loss']:.4f}, Test Accuracy: {res['test_accuracy']:.2f}%")
        print(f"Precision, Recall, and F1-score:\n{res['classification_report']}\n")