#**Name: Sameera Madushanka Gamage**
#**"Student Number: 2207590"**
#**Model 4**

This notebook was prepared in the following way;

1.   miniImageNet dataset is downloaded and used for pretraining using "resnet50"
2.   final layer was edited according to the number of classes
3. then the model saved and again loaded for fine tuning using EuroSAT dataset

4. then the model final layer was edited again according to the number of classes in EuroSAT dataset

5. Randomly selected 100 images representing 5 random classes in the dataset

6. then those 100 images were again divided into train_dataset and test_dataset as 25 images for training and 75 for testing (each dataset has 5 classes)

7. then using multiple episodes, the EuroSAT dataset's selected 5 classes was trained for fine tuning.

8. at each episode the average training andn testing accuracies were accumilated and plotted against the episode number.

9. then from among those episodes, the best performing model was automatically saved.

10. that best performing model was then loaded and can be used for testing. Testing accuracy of that model has been recorded.

11. the model's intention is to get the averge accuracy of all the episodes


#Cleaning Data Folder

In [None]:
!rm -rf *

#Importing Libraries

In [None]:
import zipfile
import argparse
import os
import random
!pip install gdown
import gdown
import tarfile

import torch
import torch.nn as nn

import torchvision
from torchvision import datasets
from torchvision import datasets, transforms
import torchvision.utils as vutils
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import time

#Checking the device and selecting

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

print('Device:', device)

#Downloading miniImageNet Dataset

In [None]:
!pip install gdown

link1 = 'https://drive.google.com/uc?id=107FTosYIeBn5QbynR46YG91nHcJ70whs'

if not os.path.exists('./data_project/miniImageNet'):
    print('Downloading dataset')
    gdown.download(link1, output='./train.tar')

    # Extract the tar file
    with tarfile.open('./train.tar', 'r') as tar:
        tar.extractall('./data_project/miniImageNet')

miniImageNet_path='./data_project/miniImageNet'


#Displaying some sample files

In [None]:
# Display some sample images from the training dataset
transform_common = transforms.Compose([
        transforms.Resize((84, 84)),
        transforms.ToTensor(),
    ])
miniimagenet_dataset_show = datasets.ImageFolder(root=os.path.join(miniImageNet_path, 'train'), transform=transform_common)
sample_loader = torch.utils.data.DataLoader(miniimagenet_dataset_show, batch_size=4, shuffle=True)
sample_batch, sample_labels = next(iter(sample_loader))

# Display sample images
plt.figure(figsize=(8, 8))
plt.axis("off")
plt.title("Sample Images")
plt.imshow(np.transpose(vutils.make_grid(sample_batch, padding=2, normalize=True).cpu(), (1, 2, 0)))
plt.show()

#Loading data, Transform defining, Data Splitting, DataLoader definining and Activity showcasing

In [None]:
# Load miniImageNet dataset

# Training transformations with data augmentation
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(size=216, scale=(0.7, 1.0)),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.RandomRotation(degrees=15),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

#downloadingn different instances of data for different transforms
miniimagenet_dataset_training = datasets.ImageFolder(root=os.path.join(miniImageNet_path, 'train'), transform=train_transform)
miniimagenet_dataset_val = datasets.ImageFolder(root=os.path.join(miniImageNet_path, 'train'), transform=val_transform)
miniimagenet_dataset_testing = datasets.ImageFolder(root=os.path.join(miniImageNet_path, 'train'), transform=test_transform)

# Split the miniImageNet dataset into train and validation sets
train_size = int(0.6 * len(miniimagenet_dataset_training))
val_size = int(0.2 * len(miniimagenet_dataset_training))
test_size = len(miniimagenet_dataset_training) - train_size - val_size

#for transform of train
train_dataset, _, _ = torch.utils.data.random_split(miniimagenet_dataset_training,
                                                                         [train_size, val_size, test_size])
#for transform of validation
_, val_dataset, _ = torch.utils.data.random_split(miniimagenet_dataset_val,
                                                                         [train_size, val_size, test_size])
#for transform of testing
_, _, test_dataset = torch.utils.data.random_split(miniimagenet_dataset_testing,
                                                                         [train_size, val_size, test_size])

# Apply the transforms to the datasets
train_dataset.dataset.transform=train_transform
val_dataset.dataset.transform=val_transform
test_dataset.dataset.transform=test_transform

# Create data loaders for miniImageNet
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=128, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False)

#Size Data
num_classes_miniimagenet = len(miniimagenet_dataset_training.classes)
print("Number of Classes: ",num_classes_miniimagenet)
print("Train Size: ", train_size)
print("Val Size: ", val_size)
print("Test Size: ", test_size)

print("Train Dataset Transform: ", train_dataset.dataset.transform)
print("Validation Dataset Transform: ", val_dataset.dataset.transform)
print("Test Dataset Transform: ", test_dataset.dataset.transform)

class_indices = miniimagenet_dataset_training.class_to_idx
print(class_indices)

#Checking the data type of the images after transforming

In [None]:
# Function to check the data type of images in a batch
def check_image_data_type(loader):
    data_iter = iter(loader)
    images, labels = next(data_iter)

    # Check the data type of the images
    data_type = type(images)

    return data_type

# Check the data type for train_loader
train_data_type = check_image_data_type(train_loader)
print("Train Loader Image Data Type:", train_data_type)

# Check the data type for val_loader
val_data_type = check_image_data_type(val_loader)
print("Validation Loader Image Data Type:", val_data_type)

# Check the data type for test_loader
test_data_type = check_image_data_type(test_loader)
print("Test Loader Image Data Type:", test_data_type)


#Displaying images after applying transfoms

In [None]:
# display images in a grid with normalization
def show_images(images, title):
    num_images = len(images)
    fig, axes = plt.subplots(1, num_images, figsize=(num_images * 3, 3))
    fig.suptitle(title, fontsize=16)

    for i in range(num_images):
        ax = axes[i] if num_images > 1 else axes

        # Normalize the image tensor to [0, 1] range
        img = np.transpose(images[i].numpy(), (1, 2, 0))
        img = (img - img.min()) / (img.max() - img.min())

        ax.imshow(img)
        ax.axis('off')
        ax.set_title(f"Sample {i + 1}")

    plt.show()

# get a sample from the data loader
def get_sample(loader):
    data_iter = iter(loader)
    images, labels = next(data_iter)
    return images, labels

# Get a sample from each dataset
train_images, _ = get_sample(train_loader)
val_images, _ = get_sample(val_loader)
test_images, _ = get_sample(test_loader)

# Show the original and transformed images for each dataset
show_images([train_images[0], train_images[1]], "Training Dataset")
show_images([val_images[0], val_images[1]], "Validation Dataset")
show_images([test_images[0], test_images[1]], "Test Dataset")


#Downloading the model resnet50 and its weights

In [None]:
!pip install torch==2.1.0 torchvision==0.16.0 --index-url https://download.pytorch.org/whl/cu118
# selecting deep learning architecture
from torchvision.models import resnet50

# Load pre-trained ResNet50 model
weights="IMAGENET1K_V2"
pretrained_model = resnet50(weights=weights)
pretrained_model = pretrained_model.to(device)

#print(pretrained_model)

#Checking the model summary




In [None]:
!pip install torchsummary
from torchsummary import summary

#summary(pretrained_model, (3, 224, 224))

#Modifying final layer to align with the dataset classes

In [None]:
# unfreeze all layers
for param in pretrained_model.parameters():
    param.requires_grad = True
    param.data = param.data.to(device)

# Modify the final classification layer
num_classes_miniimagenet = len(miniimagenet_dataset_training.classes)
dropout = 0.6
pretrained_model.fc = nn.Sequential(
    nn.Dropout(p=dropout),
    nn.Linear(pretrained_model.fc.in_features, num_classes_miniimagenet)
)

pretrained_model = pretrained_model.to(device)

#pretrained_model

#Training Function

In [None]:
#Training
class Trainer:
    def __init__(self, pretrained_model, train_loader, val_loader, optimizer, criterion, scheduler, num_epochs):
      self.pretrained_model = pretrained_model
      self.train_loader = train_loader
      self.val_loader = val_loader
      self.optimizer = optimizer
      self.criterion = criterion
      self.scheduler = scheduler
      self.num_epochs = num_epochs
    def training(self):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(device)
        self.pretrained_model = self.pretrained_model.to(device)

        history = {'train_loss': [], 'valid_loss': [], 'train_acc': [], 'valid_acc': []}
        best_valid_loss = float('inf')
        early_stopping_counter = 0

        for epoch in range(self.num_epochs):
            self.pretrained_model.train()

            total_loss = 0.0
            true_labels_train = []
            predicted_labels_train = []

            # Loss and Accuracy within the epoch
            train_loss = 0.0
            train_acc = 0.0

            valid_loss = 0.0
            valid_acc = 0.0

            for batch_idx, (inputs, labels) in enumerate(self.train_loader):
                inputs, labels = inputs.to(device), labels.to(device)
                self.optimizer.zero_grad()
                outputs = self.pretrained_model(inputs)
                loss = self.criterion(outputs, labels)

                loss.backward()
                self.optimizer.step()

                total_loss += loss.item()

                # Compute the accuracy
                predictions = torch.max(outputs.data, 1)[1]
                correct_counts = predictions.eq(labels.data.view_as(predictions))
                acc = torch.mean(correct_counts.type(torch.FloatTensor))

                train_loss += loss.item() * inputs.size(0)
                train_acc += acc.item() * inputs.size(0)

                true_labels_train.extend(labels.cpu().numpy())
                predicted_labels_train.extend(predictions.cpu().numpy())

            average_loss = total_loss / len(self.train_loader)
            train_accuracy = accuracy_score(true_labels_train, predicted_labels_train)
            history['train_loss'].append(average_loss)
            history['train_acc'].append(train_accuracy)

            # Validation
            true_labels_val = []
            predicted_labels_val = []

            self.pretrained_model.eval()

            with torch.no_grad():
                for inputs, labels in self.val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = self.pretrained_model(inputs)

                    loss = self.criterion(outputs, labels)
                    valid_loss += loss.item() * inputs.size(0)

                    predictions = torch.max(outputs.data, 1)[1]
                    correct_counts = predictions.eq(labels.data.view_as(predictions))
                    acc = torch.mean(correct_counts.type(torch.FloatTensor))
                    valid_acc += acc.item() * inputs.size(0)

                    true_labels_val.extend(labels.cpu().numpy())
                    predicted_labels_val.extend(predictions.cpu().numpy())

                # Map class indices to class names
                class_idx_to_name = {idx: class_name for class_name, idx in class_indices.items()}
                true_labels_val = [class_idx_to_name[idx] for idx in true_labels_val]
                predicted_labels_val = [class_idx_to_name[idx] for idx in predicted_labels_val]

                # Calculate the average validation loss and validation accuracy
                avg_valid_loss = valid_loss / len(self.val_loader.dataset)
                avg_valid_acc = accuracy_score(true_labels_val, predicted_labels_val)

                history['valid_loss'].append(avg_valid_loss)
                history['valid_acc'].append(avg_valid_acc)

            print(f"Epoch {epoch + 1}/{self.num_epochs}, Training Loss: {average_loss:.4f}, Training Accuracy: {train_accuracy:.4f}, Validation Loss: {avg_valid_loss:.4f}, Validation Accuracy: {avg_valid_acc:.4f}")

            # Update the learning rate after each epoch
            scheduler.step()
        return history

#Optimizer

In [None]:
#Optimizer
optimizer = torch.optim.Adam(pretrained_model.parameters(), lr=0.00001, weight_decay=0.001)
print(optimizer)

#Training Process

In [None]:
#Training process
scheduler = StepLR(optimizer, step_size=10, gamma=0.5)
criterion = nn.CrossEntropyLoss()
trainer = Trainer(pretrained_model, train_loader, val_loader, optimizer, criterion, scheduler, num_epochs=15)
history = trainer.training()


# Plotting loss and accuracy variations

In [None]:
# Plotting loss and accuracy variations
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Training Loss')
plt.plot(history['valid_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Training Accuracy')
plt.plot(history['valid_acc'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

#Testing

In [None]:
#testing
def testSetAccuracy(model, test_loader, criterion):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    test_acc = 0.0
    test_loss = 0.0
    true_labels_test = []
    predicted_labels_test = []

    # Store test accuracy history
    test_acc_history = []

    # Validation - No gradient tracking needed
    with torch.no_grad():

        # Set to evaluation mode
        model.eval()

        # Test loop
        for j, (inputs, labels) in enumerate(test_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Forward pass - compute outputs on input data
            outputs = model(inputs)

            # Compute loss
            loss = criterion(outputs, labels)

            # Accumulate the loss
            test_loss += loss.item() * inputs.size(0)

            #accuracy calculation
            true_labels_test.extend(labels.cpu().numpy())
            predicted_labels_test.extend(torch.argmax(outputs, 1).cpu().numpy())

            # Calculate accuracy
            correct_counts = torch.sum(torch.argmax(outputs, 1) == labels)
            acc = correct_counts.item() / labels.size(0)

            # collect accuracy
            test_acc += acc * inputs.size(0)

            # collect test accuracy at each batch
            test_acc_history.append(acc)

            print("Test Batch number: {:03d}, Test: Loss: {:.4f}, Accuracy: {:.4f}".format(j, loss.item(), acc))

    # Calculate average test loss and test accuracy
    avg_test_loss = test_loss / len(test_loader.dataset)
    avg_test_acc = test_acc / len(test_loader.dataset)

    # Calculate accuracy using scikit-learn
    test_accuracy = accuracy_score(true_labels_test, predicted_labels_test)

    print("Test Accuracy (from sklearn): {:.4f}".format(test_accuracy))
    print("Average Test Loss: {:.4f}, Average Test Accuracy: {:.4f}".format(avg_test_loss, avg_test_acc))

    # Return test accuracy history
    return avg_test_loss, avg_test_acc, test_accuracy, test_acc_history


#Testing Process

In [None]:
# Testing
avg_test_loss, avg_test_acc, test_accuracy, test_acc_history = testSetAccuracy(pretrained_model, test_loader, criterion)
print("avg_test_loss: ", avg_test_loss)
print("avg_test_Accuracy: ", avg_test_acc)
print("test_Accuracy: ", test_accuracy)

# test accuracy variation

In [None]:
# Plot test accuracy variation
plt.figure(figsize=(10, 6))
plt.plot(test_acc_history, label='Test Accuracy')
plt.xlabel('Batch Number')
plt.ylabel('Accuracy')
plt.title('Test Accuracy Variation Over Batches')
plt.legend()
plt.grid(True)
plt.show()

#Saving the pretrained Model

In [None]:
# Save the pretrained model
torch.save(pretrained_model, 'pretrained_model.pth')

#Download EuroSAT data

In [None]:
link2 = 'https://zenodo.org/records/7711810/files/EuroSAT_RGB.zip?download=1'
if not os.path.exists('./data_project/EuroSAT_RGB'):
    print('Downloading dataset')
    gdown.download(link2, output='./EuroSAT_RGB.zip')

    # Unzip
    with zipfile.ZipFile('./EuroSAT_RGB.zip', 'r') as zip_ref:
        zip_ref.extractall('./data_project/EuroSAT_RGB')

EuroSAT_RGB_path='./data_project/EuroSAT_RGB'

#Displaying sample images

In [None]:
# Display some sample images from the training dataset
transform_common = transforms.Compose([
        transforms.Resize((84, 84)),
        transforms.ToTensor(),
    ])
eurosat_dataset_show = datasets.ImageFolder(root=os.path.join(EuroSAT_RGB_path, 'EuroSAT_RGB'), transform=transform_common)
sample_loader = torch.utils.data.DataLoader(eurosat_dataset_show, batch_size=4, shuffle=True)
sample_batch, sample_labels = next(iter(sample_loader))

# Display sample images
plt.figure(figsize=(8, 8))
plt.axis("off")
plt.title("Sample Images")
plt.imshow(np.transpose(vutils.make_grid(sample_batch, padding=2, normalize=True).cpu(), (1, 2, 0)))
plt.show()

#Select 100 images and 5 different classes randomly

In [None]:
#Select 100 images and 5 different classes randomly

import random
from shutil import copyfile

# Path to the root directory containing the 10 folders
root_path = "./data_project/EuroSAT_RGB/EuroSAT_RGB"

# Output path for the new dataset
output_path = "./data_project/EuroSAT_selected_images"
if output_path == './data_project/EuroSAT_selected_images':
  !rm -rf EuroSAT_selected_images

# Get a list of all folders in the root directory
all_folders = os.listdir(root_path)

# Randomly select 5 folders
selected_folders = random.sample(all_folders, 5)

# Create the output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)

# Iterate through selected folders
for folder in selected_folders:
  folder_path = os.path.join(root_path, folder)

  # Get a list of all files in the folder
  all_files = os.listdir(folder_path)

  # Randomly select 20 images
  selected_images = random.sample(all_files, 20)

  # Create a subdirectory in the output path for the current folder
  output_folder_path = os.path.join(output_path, folder)
  os.makedirs(output_folder_path, exist_ok=True)

  # Copy selected images to the output folder
  for image in selected_images:
      src_path = os.path.join(folder_path, image)
      dest_path = os.path.join(output_folder_path, image)
      copyfile(src_path, dest_path)

print("Dataset creation complete.")


#separate image folders into train and test

In [None]:
#separate image folders into train val test

import random
import os
from shutil import copyfile
def train_test_eurosat():
# Path to the root directory containing the folders
  root_path = "./data_project/EuroSAT_selected_images"

  # Output path for the new dataset
  output_path = "./data_project/EuroSAT_tuning"
  if output_path == "./data_project/EuroSAT_tuning":
      !rm -rf EuroSAT_selected_images

  # Get a list of all folders in the root directory
  all_folders = os.listdir(root_path)

  # Create the output directory if it doesn't exist
  os.makedirs(output_path, exist_ok=True)

  # Create subdirectories for train, val, and test datasets
  train_path = os.path.join(output_path, "train_dataset")
  test_path = os.path.join(output_path, "test_dataset")
  os.makedirs(train_path, exist_ok=True)
  os.makedirs(test_path, exist_ok=True)

  # Iterate through selected folders
  for folder in all_folders:
    folder_path = os.path.join(root_path, folder)
    # Create subdirectories for each class within train and test datasets
    train_class_path = os.path.join(train_path, folder)
    test_class_path = os.path.join(test_path, folder)

    os.makedirs(train_class_path, exist_ok=True)
    os.makedirs(test_class_path, exist_ok=True)

    # Get a list of all files in the folder
    all_files = os.listdir(folder_path)

    # Randomly select 5 images for training
    train_images = random.sample(all_files, 5)
    for img in train_images:
        src_path = os.path.join(folder_path, img)
        dest_path = os.path.join(train_class_path, img)
        copyfile(src_path, dest_path)

    # Put remaining images into the test dataset
    test_images = set(all_files) - set(train_images)
    for img in test_images:
        src_path = os.path.join(folder_path, img)
        dest_path = os.path.join(test_class_path, img)
        copyfile(src_path, dest_path)

  print("Dataset creation complete.")

#using
train_test_eurosat()

#Creating transforms and data loaders

In [None]:
def load_eurosat_dataset(data_path='./data_project/EuroSAT_tuning/', batch_size=25):
    # Transforms
    image_transforms = {
        'train': transforms.Compose([
            transforms.RandomResizedCrop(size=216, scale=(0.7, 1.0)),
            transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
            transforms.RandomRotation(degrees=15),
            transforms.RandomHorizontalFlip(),
            transforms.RandomVerticalFlip(),
            transforms.RandomGrayscale(p=0.1),
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
        'test': transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    }

    # Set train and test directory paths
    train_dataset_path = os.path.join(data_path, 'train_dataset')
    test_dataset_path = os.path.join(data_path, 'test_dataset')

    # Number of classes
    num_classes_eurosat = len(os.listdir(train_dataset_path))

    # Load Data from folders
    data = {
        'train': datasets.ImageFolder(root=train_dataset_path, transform=image_transforms['train']),
        'test': datasets.ImageFolder(root=test_dataset_path, transform=image_transforms['test'])
    }

    # mapping of the indices to the class names
    idx_to_class = {v: k for k, v in data['train'].class_to_idx.items()}

    # Size of Data
    train_data_size = len(data['train'])
    test_data_size = len(data['test'])

    # Create iterators for the Data loaded using DataLoader module
    train_loader = torch.utils.data.DataLoader(data['train'], batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(data['test'], batch_size=batch_size, shuffle=False)
    print("Train Data Size:", train_data_size)
    print("Test Data Size:", test_data_size)

    return train_loader, test_loader, idx_to_class, num_classes_eurosat

# Running
train_loader, test_loader, idx_to_class, num_classes = load_eurosat_dataset()
class_indices = idx_to_class
print("Class Indices:", idx_to_class)
print("Number of Classes:", num_classes)


# display images with class names

In [None]:
# Function to display images with class names
def show_images(images, labels, class_names, title):
    plt.figure(figsize=(10, 3))
    plt.suptitle(title, y=1.02, fontsize=16)

    for i in range(min(5, len(images))):
        plt.subplot(1, 5, i + 1)
        plt.imshow(np.transpose(images[i].numpy(), (1, 2, 0)))
        plt.title(class_names[labels[i].item()])
        plt.axis('off')

    plt.show()

# get sample images with class labels from a DataLoader
def get_sample(loader, class_names):
    data_iter = iter(loader)
    images, labels = next(data_iter)
    return images, labels, class_names

# Get a sample from each dataset
class_names_euroSAT = idx_to_class
train_images, train_labels, _ = get_sample(train_loader, class_names_euroSAT)
test_images, test_labels, _ = get_sample(test_loader, class_names_euroSAT)

Train = 'Train'
Test = 'Test'
# Show sample images with class names
def show_sample_images(title, images, labels):
  plt.figure(figsize=(10, 3))
  plt.suptitle(title + "Dataset Samples", y=1.02, fontsize=16)
  for i in range(min(5, len(images))):
        plt.subplot(1, 5, i + 1)
        image_np = np.transpose(images[i].numpy(), (1, 2, 0))
        image_np = np.clip(image_np, 0, 1)  # Clip pixel values to the valid range
        plt.imshow(image_np)
        plt.title(class_names_euroSAT[labels[i].item()])
        plt.axis('off')
  plt.show()

show_sample_images(Train, train_images, train_labels)
show_sample_images(Test, test_images, test_labels)


#Checking Data type

In [None]:
# check the data type of images in a batch
def check_image_data_type(loader):
    data_iter = iter(loader)
    images, labels = next(data_iter)

    # Check the data type of the images
    data_type = type(images)

    return data_type

# Check the data type for train_loader
train_data_type = check_image_data_type(train_loader)
print("Train Loader Image Data Type:", train_data_type)

# Check the data type for test_loader
test_data_type = check_image_data_type(test_loader)
print("Test Loader Image Data Type:", test_data_type)


#Loading the pretrained model

In [None]:
#Loading the model

modified_trained_model = torch.load('pretrained_model.pth')
modified_trained_model = modified_trained_model.to(device)

# Setting to evaluation mode
modified_trained_model.eval()

#Adjusting final layer to match class size

In [None]:
# Freeze all layers
for param in modified_trained_model.parameters():
    param.requires_grad = False
    param.data = param.data.to(device)

# Modify the final classification layer
num_classes_eurosat = len(class_indices)
dropout = 0.7

# Get the number of input features from the last layer in the Sequential module
in_features = modified_trained_model.fc[-1].in_features

# Replace the final fully connected layer with a new layer including dropout
modified_trained_model.fc = nn.Sequential(
    nn.ReLU(),
    nn.Dropout(p=dropout),
    nn.Linear(in_features, num_classes_eurosat)
)

# Unfreeze the final fully connected layer
modified_trained_model.fc.requires_grad = True

# Move the model to the device
modified_trained_model = modified_trained_model.to(device)

#Training

In [None]:
#Training
class Trainer:
    def __init__(self, model, train_loader, optimizer, criterion, scheduler, num_epochs):
      self.model = model
      self.train_loader = train_loader
      self.optimizer = optimizer
      self.criterion = criterion
      self.scheduler = scheduler
      self.num_epochs = num_epochs

    def training(self):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(device)
        self.model = self.model.to(device)

        history = {'train_loss': [], 'train_acc': []}

        for epoch in range(self.num_epochs):
            self.model.train()

            total_loss = 0.0
            true_labels_train = []
            predicted_labels_train = []

            train_loss = 0.0
            train_acc = 0.0

            for batch_idx, (inputs, labels) in enumerate(self.train_loader):
                inputs, labels = inputs.to(device), labels.to(device)
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)

                loss.backward()
                self.optimizer.step()

                total_loss += loss.item()

                # Compute the accuracy
                predictions = torch.max(outputs.data, 1)[1]
                correct_counts = predictions.eq(labels.data.view_as(predictions))
                acc = torch.mean(correct_counts.type(torch.FloatTensor))

                train_loss += loss.item() * inputs.size(0)
                train_acc += acc.item() * inputs.size(0)

                true_labels_train.extend(labels.cpu().numpy())
                predicted_labels_train.extend(predictions.cpu().numpy())

            average_loss = total_loss / len(self.train_loader)
            train_accuracy = accuracy_score(true_labels_train, predicted_labels_train)
            history['train_loss'].append(average_loss)
            history['train_acc'].append(train_accuracy)

            print(f"Epoch {epoch + 1}/{self.num_epochs}, Training Loss: {average_loss:.4f}, Training Accuracy: {train_accuracy:.4f}")
            # Update learning rate after each epoch
            scheduler.step()

        return history

In [None]:
#testing
def testSetAccuracy(model, test_loader, criterion):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    test_acc = 0.0
    test_loss = 0.0
    true_labels_test = []
    predicted_labels_test = []

    # Store test accuracy history
    test_acc_history = []

    # Validation - No gradient tracking needed
    with torch.no_grad():

        # Set to evaluation mode
        model.eval()

        # Test loop
        for j, (inputs, labels) in enumerate(test_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Forward pass - compute outputs on input data using the model
            outputs = model(inputs)

            # Compute loss
            loss = criterion(outputs, labels)

            # Accumulate the loss
            test_loss += loss.item() * inputs.size(0)

            # Convert predictions to numpy arrays for accuracy calculation
            true_labels_test.extend(labels.cpu().numpy())
            predicted_labels_test.extend(torch.argmax(outputs, 1).cpu().numpy())

            # Calculate accuracy
            correct_counts = torch.sum(torch.argmax(outputs, 1) == labels)
            acc = correct_counts.item() / labels.size(0)

            # Accumulate accuracy
            test_acc += acc * inputs.size(0)

            # collect test accuracy at each batch
            test_acc_history.append(acc)

            print("Test Batch number: {:03d}, Test: Loss: {:.4f}, Accuracy: {:.4f}".format(j, loss.item(), acc))

    # Calculate average test loss and test accuracy
    avg_test_loss = test_loss / len(test_loader.dataset)
    avg_test_acc = test_acc / len(test_loader.dataset)

    # Calculate accuracy using scikit-learn
    test_accuracy = accuracy_score(true_labels_test, predicted_labels_test)

    print("Test Accuracy (from sklearn): {:.4f}".format(test_accuracy))
    print("Average Test Loss: {:.4f}, Average Test Accuracy: {:.4f}".format(avg_test_loss, avg_test_acc))

    # Return test accuracy history
    return avg_test_loss, avg_test_acc, test_accuracy, test_acc_history

#Episode training and testing - fine tuning

In [None]:
#Episode training and testing
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Number of epochs
num_epochs = 30

# Number of episodes
num_episodes = 7

# Learning rate
lr = 0.001

# Create an instance of the Trainer
optimizer = torch.optim.Adam(modified_trained_model.parameters(), lr=lr, weight_decay=0.0001)
criterion = nn.CrossEntropyLoss()
scheduler = StepLR(optimizer, step_size=10, gamma=0.5)

best_model = None
best_accuracy = 0.0
episode_history = {'average_train_loss': [], 'average_train_acc': [], 'average_test_loss': [], 'average_test_acc': []}

# Run training for multiple episodes
for episode in range(num_episodes):
    print(f"\nEpisode {episode + 1}/{num_episodes}")

    criterion = nn.CrossEntropyLoss()
    # Run the training process for a one episode
    trainer = Trainer(modified_trained_model, train_loader, optimizer, criterion, scheduler, num_epochs=num_epochs)

    #data loaders
    train_loader, test_loader, idx_to_class, num_classes = load_eurosat_dataset()

    # Get a sample from each dataset to print
    class_names_euroSAT = idx_to_class
    train_images, train_labels, _ = get_sample(train_loader, class_names_euroSAT)
    test_images, test_labels, _ = get_sample(test_loader, class_names_euroSAT)

    #show sample images
    show_sample_images(Train, train_images, train_labels)
    show_sample_images(Test, test_images, test_labels)

    # Train the model
    history = trainer.training()

    # return the average training loss and accuracy
    average_training_loss = sum(history['train_loss']) / len(history['train_loss'])
    average_training_accuracy = sum(history['train_acc']) / len(history['train_acc'])
    episode_history['average_train_loss'].append(average_training_loss)
    episode_history['average_train_acc'].append(average_training_accuracy)

    #Testing
    _, test_loader, _, _  = load_eurosat_dataset()
    avg_test_loss, avg_test_acc, test_accuracy, test_acc_history = testSetAccuracy(modified_trained_model, test_loader, criterion)
    print("avg_test_loss: ", avg_test_loss)
    print("avg_test_Accuracy: ", avg_test_acc)
    print("test_Accuracy: {:.4f}".format(test_accuracy))
    episode_history['average_test_loss'].append(avg_test_loss)
    episode_history['average_test_acc'].append(avg_test_acc)

    # Update the best model if the current accuracy is higher
    if average_training_accuracy > best_accuracy:
        best_accuracy = average_training_accuracy
        best_model = modified_trained_model
    !rm -rf EuroSAT_selected_images
    !rm -rf EuroSAT_tuning

    print(f"Average Training Loss: {average_training_loss:.4f}, Average Training Accuracy: {average_training_accuracy:.4f}")

    # Update the learning rate after each epoch
    scheduler.step()



# Save the best model
torch.save(best_model, 'best_fine_tuned_model.pth')
print("Best model saved.")
print(episode_history)
###################################
# Calculate average for 'average_train_loss'
avg_train_loss = sum(episode_history['average_train_loss']) / len(episode_history['average_train_loss']) if episode_history['average_train_loss'] else 0.0
# Calculate average for 'average_train_acc'
avg_train_acc = sum(episode_history['average_train_acc']) / len(episode_history['average_train_acc']) if episode_history['average_train_acc'] else 0.0
# Calculate average for 'average_test_loss'
avg_test_loss = sum(episode_history['average_test_loss']) / len(episode_history['average_test_loss']) if episode_history['average_test_loss'] else 0.0
# Calculate average for 'average_test_acc'
avg_test_acc = sum(episode_history['average_test_acc']) / len(episode_history['average_test_acc']) if episode_history['average_test_acc'] else 0.0
print(f'Average Train Loss: {avg_train_loss}')
print(f'Average Train Accuracy: {avg_train_acc}')
print(f'Average Test Loss: {avg_test_loss}')
print(f'Average Test Accuracy: {avg_test_acc}')
####################################

# Plotting loss and accuracy variations

In [None]:
###################################
# Calculate average for 'average_train_loss'
avg_train_loss = sum(episode_history['average_train_loss']) / len(episode_history['average_train_loss']) if episode_history['average_train_loss'] else 0.0
# Calculate average for 'average_train_acc'
avg_train_acc = sum(episode_history['average_train_acc']) / len(episode_history['average_train_acc']) if episode_history['average_train_acc'] else 0.0
# Calculate average for 'average_test_loss'
avg_test_loss = sum(episode_history['average_test_loss']) / len(episode_history['average_test_loss']) if episode_history['average_test_loss'] else 0.0
# Calculate average for 'average_test_acc'
avg_test_acc = sum(episode_history['average_test_acc']) / len(episode_history['average_test_acc']) if episode_history['average_test_acc'] else 0.0
print(f'Average Train Loss: {avg_train_loss}')
print(f'Average Train Accuracy: {avg_train_acc}')
print(f'Average Test Loss: {avg_test_loss}')
print(f'Average Test Accuracy: {avg_test_acc}')
####################################
# Plotting loss and accuracy variations
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(episode_history['average_train_loss'], label='Average Training Loss')
plt.xlabel('Episode')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(episode_history['average_train_acc'], label='Average Training Accuracy')
plt.xlabel('Episode')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(episode_history['average_test_loss'], label='Average Test Loss')
plt.xlabel('Episode')
plt.ylabel('Loss')
plt.legend()

plt.subplot(2, 2, 2)
plt.plot(episode_history['average_test_acc'], label='Average Test Accuracy')
plt.xlabel('Episode')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
###################################
# Calculate average for 'average_train_loss'
avg_train_loss = sum(episode_history['average_train_loss']) / len(episode_history['average_train_loss']) if episode_history['average_train_loss'] else 0.0
# Calculate average for 'average_train_acc'
avg_train_acc = sum(episode_history['average_train_acc']) / len(episode_history['average_train_acc']) if episode_history['average_train_acc'] else 0.0
# Calculate average for 'average_test_loss'
avg_test_loss = sum(episode_history['average_test_loss']) / len(episode_history['average_test_loss']) if episode_history['average_test_loss'] else 0.0
# Calculate average for 'average_test_acc'
avg_test_acc = sum(episode_history['average_test_acc']) / len(episode_history['average_test_acc']) if episode_history['average_test_acc'] else 0.0
print(f'Average Train Loss: {avg_train_loss}')
print(f'Average Train Accuracy: {avg_train_acc}')
print(f'Average Test Loss: {avg_test_loss}')
print(f'Average Test Accuracy: {avg_test_acc}')
####################################

plt.figure(figsize=(12, 6))

# Plotting training loss and accuracy
plt.subplot(1, 2, 1)
plt.plot(episode_history['average_train_loss'], label='Training Loss', color='blue')
plt.plot(episode_history['average_test_loss'], label='Test Loss', color='orange')
plt.xlabel('Episode')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(episode_history['average_train_acc'], label='Training Accuracy', color='blue')
plt.plot(episode_history['average_test_acc'], label='Test Accuracy', color='orange')
plt.xlabel('Episode')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()


# Getting the best model from the above and using for prediction

In [None]:
# Load the model
best_fine_tuned_model = torch.load('best_fine_tuned_model.pth')

# Move the model to the device
best_fine_tuned_model = best_fine_tuned_model.to(device)

# Set the model to evaluation mode
best_fine_tuned_model.eval()

#Testing

In [None]:
#testing
def testSetAccuracy(model, test_loader, criterion):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    test_acc = 0.0
    test_loss = 0.0
    true_labels_test = []
    predicted_labels_test = []

    # Store test accuracy history
    test_acc_history = []

    # Validation - No gradient tracking needed
    with torch.no_grad():

        # Set to evaluation mode
        model.eval()

        # Test loop
        for j, (inputs, labels) in enumerate(test_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Forward pass - compute outputs on input data using the model
            outputs = model(inputs)

            # Compute loss
            loss = criterion(outputs, labels)

            # Accumulate the loss
            test_loss += loss.item() * inputs.size(0)

            # Convert predictions to numpy arrays for accuracy calculation
            true_labels_test.extend(labels.cpu().numpy())
            predicted_labels_test.extend(torch.argmax(outputs, 1).cpu().numpy())

            # Calculate accuracy
            correct_counts = torch.sum(torch.argmax(outputs, 1) == labels)
            acc = correct_counts.item() / labels.size(0)

            # Accumulate accuracy
            test_acc += acc * inputs.size(0)

            # collect test accuracy at each batch
            test_acc_history.append(acc)

            print("Test Batch number: {:03d}, Test: Loss: {:.4f}, Accuracy: {:.4f}".format(j, loss.item(), acc))

    # average test loss and test accuracy
    avg_test_loss = test_loss / len(test_loader.dataset)
    avg_test_acc = test_acc / len(test_loader.dataset)

    # accuracy using scikit-learn
    test_accuracy = accuracy_score(true_labels_test, predicted_labels_test)

    print("Test Accuracy (from sklearn): {:.4f}".format(test_accuracy))
    print("Average Test Loss: {:.4f}, Average Test Accuracy: {:.4f}".format(avg_test_loss, avg_test_acc))

    # Return test accuracy history
    return avg_test_loss, avg_test_acc, test_accuracy, test_acc_history


#testing process

In [None]:
# Testing
_, test_loader, _, _  = load_eurosat_dataset()
avg_test_loss, avg_test_acc, test_accuracy, test_acc_history = testSetAccuracy(best_fine_tuned_model, test_loader, criterion)
print("avg_test_loss: ", avg_test_loss)
print("avg_test_Accuracy: ", avg_test_acc)
print("test_Accuracy: {:.4f}".format(test_accuracy))


#plotting the testing accuracy

In [None]:
_, _, _, test_acc_history = testSetAccuracy(best_fine_tuned_model, test_loader, criterion)
# Plot test accuracy variation
plt.figure(figsize=(10, 6))
plt.plot(test_acc_history, label='Test Accuracy')
plt.xlabel('Batch Number')
plt.ylabel('Accuracy')
plt.title('Test Accuracy Variation Over Batches')
plt.legend()
plt.grid(True)
plt.show()

#checking the model summary

In [None]:
!pip install torchsummary
from torchsummary import summary

summary(modified_trained_model, (3, 128, 128))

#Predictions using web images

In [None]:
train_loader, test_loader, class_indices, num_classes = load_eurosat_dataset()

image_transforms = {
        'train': transforms.Compose([
            transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
            transforms.RandomRotation(degrees=15),
            transforms.RandomHorizontalFlip(),
            transforms.RandomVerticalFlip(),
            transforms.Resize((128, 128)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
        'test': transforms.Compose([
            transforms.Resize((128, 128)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    }

def predict(model, test_image_name):
    transform = image_transforms['test']

    test_image = Image.open(test_image_name)

    # Apply transformation
    test_image = transform(test_image)

    plt.imshow(test_image.permute(1, 2, 0))

    test_image_tensor = test_image.unsqueeze(0)
    test_image_tensor = test_image_tensor.to(device)

    with torch.no_grad():
        model.eval()
        # Model outputs probabilities
        out = model(test_image_tensor)
        ps = torch.exp(out)

        topk, topclass = ps.topk(3, dim=1)
        print("Class Indices:", idx_to_class)
        print("Model Output:", topclass.cpu().numpy()[0])

        for i in range(3):
            print("Prediction", i + 1, ":", idx_to_class[topclass.cpu().numpy()[0][i]], ", Score: ", topk.cpu().numpy()[0][i])


# predicting the image using the best tested model

In [None]:
# Test a particular model on a test image
'''
! wget https://cdn.pixabay.com/photo/2017/01/20/00/30/maldives-1993704_960_720.jpg -O image.jpg

model = torch.load("best_fine_tuned_model.pth")
predict(model, 'image.jpg')'''