In [2]:
import os
import torch
import numpy as np
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import torch.nn.functional as F

In [5]:
from google.colab import drive
import zipfile
import os


# Remount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install rarfile


Collecting rarfile
  Downloading rarfile-4.2-py3-none-any.whl.metadata (4.4 kB)
Downloading rarfile-4.2-py3-none-any.whl (29 kB)
Installing collected packages: rarfile
Successfully installed rarfile-4.2


In [None]:
import rarfile

rar_file_path = '/content/drive/MyDrive/ASAL NWPU DATA/DATA.rar'  # Path to the RAR file
extract_to = '/content/drive/MyDrive/ASAL NWPU DATA/'  # Path to extract everything

# List the contents of the RAR file to check its structure (optional)
with rarfile.RarFile(rar_file_path) as rf:
    for f in rf.infolist():
        print(f.filename)

# Extract everything from the RAR archive
with rarfile.RarFile(rar_file_path) as rf:
    rf.extractall(extract_to)

print("Extraction of the entire RAR file complete!")


[1;30;43mLe flux de sortie a été tronqué et ne contient que les 5000 dernières lignes.[0m
NWPU-RESISC45_split_val_only/train/wetland/wetland_494.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_495.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_496.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_497.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_498.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_499.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_500.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_501.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_502.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_503.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_504.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_505.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_506.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_507.jpg
NWPU-RESISC45_split_val_only/train/wetland/wetland_508.jpg
NWPU-RESISC45_split_val

In [None]:
# Custom Dataset class
class ImageFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        self.images = self._load_images()

    def _load_images(self):
        images = []
        for cls in self.classes:
            class_dir = os.path.join(self.root_dir, cls)
            for img_name in os.listdir(class_dir):
                images.append((os.path.join(class_dir, img_name), self.class_to_idx[cls]))
        return images

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path, label = self.images[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label


In [None]:
# Test-time augmentation (center + corners with horizontal flips)
def tta_augment(image):
    crop_size = 224
    crops = []
    # Center crop
    crops.append(transforms.functional.center_crop(image, crop_size))
    # Corner crops
    crops.append(transforms.functional.crop(image, 0, 0, crop_size, crop_size))  # top-left
    crops.append(transforms.functional.crop(image, 0, image.size[1] - crop_size, crop_size, crop_size))  # top-right
    crops.append(transforms.functional.crop(image, image.size[0] - crop_size, 0, crop_size, crop_size))  # bottom-left
    crops.append(transforms.functional.crop(image, image.size[0] - crop_size, image.size[1] - crop_size, crop_size, crop_size))  # bottom-right
    # Horizontal flips
    flips = [transforms.functional.hflip(crop) for crop in crops]
    crops.extend(flips)
    return crops

# Feature extraction using pre-trained CNN
def extract_features(model, dataloader, device):
    model.eval()
    features = []
    labels = []
    for inputs, targets in dataloader:
        inputs = inputs.to(device)
        with torch.no_grad():
            # Extract 4096-dimensional features from FC layer
            fc_features = model(inputs)
            # Apply ReLU activation
            fc_features = F.relu(fc_features)
            features.append(fc_features.cpu().numpy())
            labels.append(targets.cpu().numpy())
    return np.vstack(features), np.hstack(labels)


In [None]:
# Data augmentation and normalization
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}



In [None]:
# Load datasets
data_dir = '/content/drive/MyDrive/ASAL NWPU DATA/NWPU-RESISC45_split_val_only'
image_datasets = {x: ImageFolderDataset(os.path.join(data_dir, x), data_transforms[x])
                  for x in ['train', 'val']}
dataloaders = {x: DataLoader(image_datasets[x], batch_size=32, shuffle=True, num_workers=4)
               for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")




In [None]:
# Load pre-trained CNN and remove the last FC layers
def create_feature_extractor():
    model = models.vgg16(pretrained=True)
    model.classifier = nn.Sequential(*list(model.classifier.children())[:-1])  # Remove the last FC layer
    model = model.to(device)
    return model

# Train a Linear SVM with extracted features
def train_svm(features, labels):
    scaler = StandardScaler()
    features = scaler.fit_transform(features)
    svm = SVC(kernel='linear')
    svm.fit(features, labels)
    return svm, scaler

# Testing and evaluation with SVM
def test_svm(svm, scaler, features, labels):
    features = scaler.transform(features)
    predictions = svm.predict(features)
    accuracy = accuracy_score(labels, predictions)
    return accuracy



use gan

In [None]:
    num_classes = len(class_names)

    # Step 1: Create feature extractor model
    model = create_feature_extractor()

    # Step 2: Extract features for training and validation sets
    train_features, train_labels = extract_features(model, dataloaders['train'], device)
    val_features, val_labels = extract_features(model, dataloaders['val'], device)

    # Step 3: Train a Linear SVM
    svm, scaler = train_svm(train_features, train_labels)

    # Step 4: Evaluate on validation set
    val_accuracy = test_svm(svm, scaler, val_features, val_labels)
    print(f'Validation Accuracy: {val_accuracy:.4f}')

Validation Accuracy: 0.8201


##  Improved : Using a deep learning pretrained model as our classifier in addition to sub image augmentation

**Differences from the last approach:**


*Current Approach:* Fine-tunes the entire pre-trained model with basic augmentations.


*Previous Approach:* Used the pre-trained model as a feature extractor and trained a separate SVM classifier on the extracted features.

In [None]:
import torch
import torch.nn as nn
from torchvision import models

# Define a function to load a pre-trained model and modify it for fine-tuning
def create_model(model_name, num_classes):
    if model_name == 'resnet':
        model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)  # Load ResNet50 with pre-trained weights
        # Modify the final fully connected layer (fc) to match the number of classes
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, num_classes)

    elif model_name == 'vgg':
        model = models.vgg16(weights=models.VGG16_Weights.DEFAULT)  # Load VGG16 with pre-trained weights
        # Modify the classifier for VGG
        in_features = model.classifier[6].in_features
        model.classifier[6] = nn.Linear(in_features, num_classes)

    elif model_name == 'densenet':
        model = models.densenet121(weights=models.DenseNet121_Weights.DEFAULT)  # Load DenseNet121 with pre-trained weights
        # Modify the classifier for DenseNet
        in_features = model.classifier.in_features
        model.classifier = nn.Linear(in_features, num_classes)

    else:
        raise ValueError("Model name should be 'resnet', 'vgg', or 'densenet'")

    return model

# Function to load the model and perform training/validation
def main(model_name, num_classes, train_dataset, val_dataset):
    # Load the model
    model = create_model(model_name, num_classes)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

    # DataLoaders
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False)

    # Training and Validation loop
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        # Training loop
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_accuracy = 100 * correct / total
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Train Accuracy: {train_accuracy:.2f}%")

        # Validation loop
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_accuracy = 100 * correct / total
        print(f"Validation Accuracy after Epoch {epoch+1}: {val_accuracy:.2f}%")

# Example usage:
train_dir = '/content/drive/MyDrive/ASAL NWPU DATA/NWPU-RESISC45_split_val_only/train'
val_dir = '/content/drive/MyDrive/ASAL NWPU DATA/NWPU-RESISC45_split_val_only/val'

# Define transformations and load datasets
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = datasets.ImageFolder(train_dir, transform=transform)
val_dataset = datasets.ImageFolder(val_dir, transform=transform)

# Call main function with 'resnet'
main('resnet', num_classes=45, train_dataset=train_dataset, val_dataset=val_dataset)


Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 145MB/s]


Epoch [1/10], Loss: 0.7715, Train Accuracy: 80.57%
Validation Accuracy after Epoch 1: 93.12%
Epoch [2/10], Loss: 0.1769, Train Accuracy: 94.34%
Validation Accuracy after Epoch 2: 94.31%
Epoch [3/10], Loss: 0.0871, Train Accuracy: 97.38%
Validation Accuracy after Epoch 3: 94.98%
Epoch [4/10], Loss: 0.0606, Train Accuracy: 98.16%
Validation Accuracy after Epoch 4: 94.35%
Epoch [5/10], Loss: 0.0539, Train Accuracy: 98.34%
Validation Accuracy after Epoch 5: 94.24%
Epoch [6/10], Loss: 0.0430, Train Accuracy: 98.67%
Validation Accuracy after Epoch 6: 93.69%
Epoch [7/10], Loss: 0.0366, Train Accuracy: 98.91%
Validation Accuracy after Epoch 7: 94.54%
Epoch [8/10], Loss: 0.0372, Train Accuracy: 98.82%
Validation Accuracy after Epoch 8: 94.84%
Epoch [9/10], Loss: 0.0307, Train Accuracy: 99.10%
Validation Accuracy after Epoch 9: 94.92%
Epoch [10/10], Loss: 0.0298, Train Accuracy: 99.09%
Validation Accuracy after Epoch 10: 94.60%


**Differences from the last approach:**


*Current Approach:* Fine-tunes the entire pre-trained model with 10 sub-image complex augmentations.


*Previous Approach:* Fine-tunes the entire pre-trained model with basic augmentations.

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [None]:
torch.cuda.empty_cache()


In [6]:
# Call main function with 'resnet'
trained_model, best_model_path = main('vgg', num_classes=45, train_dataset=train_dataset, val_dataset=val_dataset)


Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth
100%|██████████| 528M/528M [00:04<00:00, 123MB/s]


KeyboardInterrupt: 

##TTA augmentation approach :

In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms, datasets
import matplotlib.pyplot as plt

# Define the Test-Time Augmentation function
def tta_augment(image):
    crop_size = 224
    crops = []
    # Center crop
    crops.append(transforms.functional.center_crop(image, crop_size))
    # Corner crops
    crops.append(transforms.functional.crop(image, 0, 0, crop_size, crop_size))  # top-left
    crops.append(transforms.functional.crop(image, 0, image.size[1] - crop_size, crop_size, crop_size))  # top-right
    crops.append(transforms.functional.crop(image, image.size[0] - crop_size, 0, crop_size, crop_size))  # bottom-left
    crops.append(transforms.functional.crop(image, image.size[0] - crop_size, image.size[1] - crop_size, crop_size, crop_size))  # bottom-right
    # Horizontal flips
    flips = [transforms.functional.hflip(crop) for crop in crops]
    crops.extend(flips)
    return crops


# Function to save the model with the best validation accuracy
def save_best_model(model, path):
    torch.save(model.state_dict(), path)
    print(f"Saved Best Model to {path}")

# Function to plot training and validation metrics
def plot_metrics(train_losses, val_accuracies):
    plt.figure(figsize=(12, 5))

    # Plot training loss
    plt.subplot(1, 2, 1)
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.legend()

    # Plot validation accuracy
    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(val_accuracies) + 1), val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Validation Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Function to apply TTA during validation
def apply_tta(model, loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            preds = torch.zeros(len(labels), num_classes).to(device)
            crops = [tta_augment(transform(img)) for img in inputs]

            for crop_set in crops:
                crop_tensor = torch.stack([transforms.ToTensor()(crop) for crop in crop_set])
                crop_tensor = crop_tensor.to(device)
                outputs = model(crop_tensor)
                preds += torch.mean(outputs, dim=0)

            preds /= len(crops)
            _, predicted = torch.max(preds, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return all_preds, all_labels

# Function to load the model and perform training/validation
def main(model_name, num_classes, train_dataset, val_dataset):
    # Load the model
    model = create_model(model_name, num_classes)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

    # DataLoaders
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False)

    # Initialize metrics
    train_losses = []
    val_accuracies = []
    best_val_accuracy = 0.0
    best_model_path = 'best_model.pth'

    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        # Training loop
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_accuracy = 100 * correct / total
        train_losses.append(running_loss / len(train_loader))
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_losses[-1]:.4f}, Train Accuracy: {train_accuracy:.2f}%")

        # Validation loop with TTA
        val_preds, val_labels = apply_tta(model, val_loader, device)
        val_accuracy = 100 * sum([p == l for p, l in zip(val_preds, val_labels)]) / len(val_labels)
        val_accuracies.append(val_accuracy)
        print(f"Validation Accuracy after Epoch {epoch+1}: {val_accuracy:.2f}%")

        # Save the model with the best validation accuracy
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            save_best_model(model, best_model_path)

    plot_metrics(train_losses, val_accuracies)
    return model, best_model_path

# Example usage
train_dir = '/content/drive/MyDrive/NWPU-RESISC45_split_val_only/train'
val_dir = '/content/drive/MyDrive/NWPU-RESISC45_split_val_only/val'

# Define transformations and load datasets
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = datasets.ImageFolder(train_dir, transform=transform)
val_dataset = datasets.ImageFolder(val_dir, transform=transform)

# Call main function with 'densenet'
trained_model, best_model_path = main('densenet', num_classes=45, train_dataset=train_dataset, val_dataset=val_dataset)
