In [1]:
import os
import cv2
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
from sklearn.utils import shuffle

# Configuration
class_names = ['buildings','forest','glacier','mountain', 'sea','street']
class_names_label = {class_name:i for i, class_name in enumerate(class_names)}
nb_classes = len(class_names)
IMAGE_SIZE = (150, 150)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Data Loading Function
def load_data():
    datasets = ["data/train", "data/test"]
    output = []

    for dataset in datasets:
        images = []
        labels = []
        print(f"Loading {dataset}")

        for folder in os.listdir(dataset):
            label = class_names_label[folder]

            for file in tqdm(os.listdir(os.path.join(dataset, folder))):
                # Get the path name of the image
                img_path = os.path.join(os.path.join(dataset, folder), file)
                
                # Open and resize the img
                image = cv2.imread(img_path)
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                image = cv2.resize(image, IMAGE_SIZE) 
        
                images.append(image)
                labels.append(label)
                
        images = np.array(images, dtype='float32')
        labels = np.array(labels, dtype='int32')   
        
        output.append((images, labels))

    return output

# CNN Model Definition
class CNNModel(nn.Module):
    def __init__(self, num_filters=32):
        super(CNNModel, self).__init__()
        
        # Convolutional layers
        self.conv1 = nn.Conv2d(3, num_filters, kernel_size=3, padding=0)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv2 = nn.Conv2d(num_filters, num_filters * 2, kernel_size=3, padding=0)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv3 = nn.Conv2d(num_filters * 2, num_filters * 4, kernel_size=3, padding=0)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv4 = nn.Conv2d(num_filters * 4, num_filters * 2, kernel_size=3, padding=0)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Calculate size after convolutions: 150->148->74->72->36->34->17->15->7
        self.fc1 = nn.Linear(num_filters * 2 * 7 * 7, 64)
        self.fc2 = nn.Linear(64, 6)
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.pool3(F.relu(self.conv3(x)))
        x = self.pool4(F.relu(self.conv4(x)))
        
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)  # No softmax here, CrossEntropyLoss handles it
        
        return x

def create_model(learning_rate=0.001, num_filters=32):
    model = CNNModel(num_filters=num_filters)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    return model, optimizer, criterion

# Data preparation function
def prepare_data(train_images, train_labels, test_images, test_labels, batch_size=64):
    # Convert from (N, H, W, C) to (N, C, H, W) for PyTorch
    train_images = np.transpose(train_images, (0, 3, 1, 2))
    test_images = np.transpose(test_images, (0, 3, 1, 2))
    
    # Convert to PyTorch tensors
    train_images = torch.FloatTensor(train_images)
    train_labels = torch.LongTensor(train_labels)
    test_images = torch.FloatTensor(test_images)
    test_labels = torch.LongTensor(test_labels)
    
    # Create datasets and dataloaders
    train_dataset = TensorDataset(train_images, train_labels)
    test_dataset = TensorDataset(test_images, test_labels)
    
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_dataloader, test_dataloader

# Training function
def train_model(model, optimizer, criterion, train_dataloader, epochs=10, verbose=True):
    model.to(device)
    model.train()
    
    accuracies = []
    
    for epoch in range(epochs):
        total_correct = 0
        total_samples = 0
        total_loss = 0
        
        for batch_images, batch_labels in train_dataloader:
            batch_images, batch_labels = batch_images.to(device), batch_labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_images)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()
            
            # Calculate accuracy
            _, predicted = torch.max(outputs.data, 1)
            total_samples += batch_labels.size(0)
            total_correct += (predicted == batch_labels).sum().item()
            total_loss += loss.item()
        
        epoch_accuracy = total_correct / total_samples
        epoch_loss = total_loss / len(train_dataloader)
        accuracies.append(epoch_accuracy)
        
        if verbose:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')
    
    return accuracies

# Evaluation function
def evaluate_model(model, test_dataloader):
    model.to(device)
    model.eval()
    
    total_correct = 0
    total_samples = 0
    
    with torch.no_grad():
        for batch_images, batch_labels in test_dataloader:
            batch_images, batch_labels = batch_images.to(device), batch_labels.to(device)
            outputs = model(batch_images)
            _, predicted = torch.max(outputs.data, 1)
            total_samples += batch_labels.size(0)
            total_correct += (predicted == batch_labels).sum().item()
    
    accuracy = total_correct / total_samples
    return accuracy

# Fitness function for PSO
def fitness_function(params, train_dataloader):
    lr = params[0]
    filters = int(params[1])
    
    model, optimizer, criterion = create_model(learning_rate=lr, num_filters=filters)
    accuracies = train_model(model, optimizer, criterion, train_dataloader, epochs=10, verbose=False)
    
    # Clean up GPU memory
    del model, optimizer, criterion
    if device.type == 'cuda':
        torch.cuda.empty_cache()
    
    # Return the final accuracy (last epoch)
    return accuracies[-1]

# PSO Algorithm
def PSO(bounds, train_dataloader, n_particles=10, max_iter=2):
    dim = len(bounds)
    particles = [np.array([random.uniform(low, high) for low, high in bounds]) for _ in range(n_particles)]
    velocities = [np.zeros(dim) for _ in range(n_particles)]
    personal_best = particles.copy()
    personal_best_scores = [0] * n_particles
    global_best = None
    global_best_score = 0

    w = 0.9  # inertia weight
    c1 = 2.0  # cognitive parameter
    c2 = 2.0  # social parameter

    for it in range(max_iter):
        print(f"PSO Iteration {it+1}/{max_iter}")
        
        for i in range(n_particles):
            print(f"  Evaluating particle {i+1}/{n_particles} - LR: {particles[i][0]:.6f}, Filters: {int(particles[i][1])}")
            score = fitness_function(particles[i], train_dataloader)
            print(f"    Accuracy: {score:.4f}")
            
            if score > personal_best_scores[i]:
                personal_best_scores[i] = score
                personal_best[i] = particles[i].copy()
            if score > global_best_score:
                global_best_score = score
                global_best = particles[i].copy()

        # Update particle velocities and positions
        for i in range(n_particles):
            r1, r2 = np.random.rand(dim), np.random.rand(dim)
            cognitive = c1 * r1 * (personal_best[i] - particles[i])
            social = c2 * r2 * (global_best - particles[i])
            velocities[i] = w * velocities[i] + cognitive + social
            particles[i] += velocities[i]

            # Apply bounds constraints
            for d in range(dim):
                low, high = bounds[d]
                particles[i][d] = np.clip(particles[i][d], low, high)
        
        # Decay inertia weight
        w *= 0.95

    print("\n" + "="*50)
    print("PSO Optimization Results:")
    print("="*50)
    print(f"Best Learning Rate: {global_best[0]:.6f}")
    print(f"Best Filters: {int(global_best[1])}")
    print(f"Best Training Accuracy: {global_best_score:.4f}")
    print("="*50)
    
    return global_best, global_best_score

# Main execution
if __name__ == "__main__":
    # Load and prepare data
    print("Loading data...")
    (train_images, train_labels), (test_images, test_labels) = load_data()
    
    # Shuffle training data
    train_images, train_labels = shuffle(train_images, train_labels, random_state=25)
    
    # Normalize images
    train_images = train_images / 255.0 
    test_images = test_images / 255.0
    
    n_train = train_labels.shape[0]
    n_test = test_labels.shape[0]
    
    print(f"Number of training examples: {n_train}")
    print(f"Number of testing examples: {n_test}")
    print(f"Each image is of size: {IMAGE_SIZE}")
    
    # Prepare dataloaders
    print("Preparing data loaders...")
    train_dataloader, test_dataloader = prepare_data(train_images, train_labels, 
                                                   test_images, test_labels, batch_size=64)
    
    # Define PSO bounds [learning_rate, num_filters]
    bounds = [(0.0001, 0.01), (16, 64)]
    
    # Run PSO optimization
    print("Starting PSO optimization...")
    best_params, best_score = PSO(bounds=bounds, train_dataloader=train_dataloader, 
                                 n_particles=10, max_iter=2)
    
    # Train final model with best parameters
    print("\nTraining final model with optimized parameters...")
    final_model, final_optimizer, final_criterion = create_model(
        learning_rate=best_params[0], 
        num_filters=int(best_params[1])
    )
    
    print(f"Final model - LR: {best_params[0]:.6f}, Filters: {int(best_params[1])}")
    final_accuracies = train_model(final_model, final_optimizer, final_criterion, 
                                 train_dataloader, epochs=20, verbose=True)
    
    # Evaluate on test set
    print("Evaluating on test set...")
    test_accuracy = evaluate_model(final_model, test_dataloader)
    print(f"Final Test Accuracy: {test_accuracy:.4f}")
    
    # Save the final model
    torch.save({
        'model_state_dict': final_model.state_dict(),
        'optimizer_state_dict': final_optimizer.state_dict(),
        'learning_rate': best_params[0],
        'num_filters': int(best_params[1]),
        'test_accuracy': test_accuracy,
        'class_names': class_names
    }, 'best_model_pso.pth')
    
    print("Model saved as 'best_model_pso.pth'")

Using device: cuda
Loading data...
Loading data/train


100%|██████████| 2191/2191 [00:00<00:00, 5121.64it/s]
100%|██████████| 2271/2271 [00:00<00:00, 4688.95it/s]
100%|██████████| 2404/2404 [00:00<00:00, 5411.67it/s]
100%|██████████| 2512/2512 [00:00<00:00, 5671.29it/s]
100%|██████████| 2274/2274 [00:00<00:00, 5596.29it/s]
100%|██████████| 2382/2382 [00:00<00:00, 5002.49it/s]


Loading data/test


100%|██████████| 437/437 [00:00<00:00, 4920.37it/s]
100%|██████████| 474/474 [00:00<00:00, 4561.68it/s]
100%|██████████| 553/553 [00:00<00:00, 5324.15it/s]
100%|██████████| 525/525 [00:00<00:00, 4990.65it/s]
100%|██████████| 510/510 [00:00<00:00, 5391.01it/s]
100%|██████████| 501/501 [00:00<00:00, 4504.90it/s]


Number of training examples: 14034
Number of testing examples: 3000
Each image is of size: (150, 150)
Preparing data loaders...
Starting PSO optimization...
PSO Iteration 1/2
  Evaluating particle 1/10 - LR: 0.009245, Filters: 30
    Accuracy: 0.5868
  Evaluating particle 2/10 - LR: 0.009022, Filters: 51
    Accuracy: 0.6853
  Evaluating particle 3/10 - LR: 0.006173, Filters: 40
    Accuracy: 0.6893
  Evaluating particle 4/10 - LR: 0.005334, Filters: 56
    Accuracy: 0.7446
  Evaluating particle 5/10 - LR: 0.004039, Filters: 27
    Accuracy: 0.7390
  Evaluating particle 6/10 - LR: 0.008859, Filters: 52
    Accuracy: 0.6637
  Evaluating particle 7/10 - LR: 0.005945, Filters: 61
    Accuracy: 0.7319
  Evaluating particle 8/10 - LR: 0.007329, Filters: 58
    Accuracy: 0.6896
  Evaluating particle 9/10 - LR: 0.008836, Filters: 38
    Accuracy: 0.1741
  Evaluating particle 10/10 - LR: 0.003843, Filters: 28
    Accuracy: 0.7852
PSO Iteration 2/2
  Evaluating particle 1/10 - LR: 0.004030, Fil

In [20]:
def load_trained_model(model_path='best_model_pso.pth'):
    """Load a trained model from file"""
    import os
    
    # Check if model file exists
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file '{model_path}' not found. Make sure you've trained and saved the model first.")
    
    try:
        # Fix for PyTorch 2.6+ - set weights_only=False to allow loading numpy objects
        checkpoint = torch.load(model_path, map_location=device, weights_only=False)
        print(f"Loading model from: {model_path}")
        
        # Create model with same parameters used during training
        model = CNNModel(num_filters=checkpoint['num_filters'])
        model.load_state_dict(checkpoint['model_state_dict'])
        model.to(device)
        model.eval()  # Set to evaluation mode
        
        print(f"Model loaded successfully!")
        print(f"- Filters: {checkpoint['num_filters']}")
        print(f"- Learning Rate: {checkpoint['learning_rate']}")
        print(f"- Test Accuracy: {checkpoint['test_accuracy']:.4f}")
        
        return model
        
    except Exception as e:
        print(f"Error loading model: {str(e)}")
        raise


def predict_single_image(model, image_path):
    """Predict class for a single image"""
    try:
        # Preprocess image
        image_tensor = preprocess_image(image_path)
        
        # Make prediction
        with torch.no_grad():
            outputs = model(image_tensor)
            probabilities = F.softmax(outputs, dim=1)
            predicted_class_idx = torch.argmax(probabilities, dim=1).item()
            confidence = probabilities[0][predicted_class_idx].item()
        
        predicted_class = class_names[predicted_class_idx]
        
        return predicted_class, confidence, probabilities[0].cpu().numpy()
    
    except Exception as e:
        print(f"Error predicting image {image_path}: {str(e)}")
        return None, None, None


def preprocess_image(image_path):
    """Preprocess a single image for prediction"""
    # Read image
    image = cv2.imread(image_path)
    if image is None:
        raise ValueError(f"Could not read image from {image_path}")
    
    # Convert BGR to RGB
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Resize to model input size
    image = cv2.resize(image, IMAGE_SIZE)
    
    # Normalize pixel values
    image = image.astype('float32') / 255.0
    
    # Convert from (H, W, C) to (C, H, W)
    image = np.transpose(image, (2, 0, 1))
    
    # Add batch dimension: (C, H, W) -> (1, C, H, W)
    image = np.expand_dims(image, axis=0)
    
    # Convert to torch tensor
    image_tensor = torch.FloatTensor(image).to(device)
    
    return image_tensor




def predict_with_all_probabilities(model, image_path):
    """Predict class and return all class probabilities"""
    try:
        image_tensor = preprocess_image(image_path)
        
        with torch.no_grad():
            outputs = model(image_tensor)
            probabilities = F.softmax(outputs, dim=1)[0].cpu().numpy()
        
        # Create results dictionary
        results = {}
        for i, class_name in enumerate(class_names):
            results[class_name] = probabilities[i]
        
        # Sort by probability (highest first)
        sorted_results = dict(sorted(results.items(), key=lambda x: x[1], reverse=True))
        
        return sorted_results
    
    except Exception as e:
        print(f"Error predicting image {image_path}: {str(e)}")
        return None

In [15]:
  predicted_class, confidence, all_probs = predict_single_image(final_model, "data/pred/24144.jpg")

In [16]:
predicted_class

'glacier'

In [21]:
model = load_trained_model("best_model_pso.pth")

Loading model from: best_model_pso.pth
Model loaded successfully!
- Filters: 47
- Learning Rate: 0.0016538263873647147
- Test Accuracy: 0.8253
