# Imports and device setup

In [1]:
# Import necessary libraries
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
import zipfile
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from torch.utils.data import random_split, DataLoader

import torch
import torchvision
from PIL import Image
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Check PyTorch and torchvision versions
print("Torch version:", torch.__version__)
print("Torchvision version:", torchvision.__version__)

# Set device to GPU if available else go with cpu
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Device we use:", device)


Torch version: 2.5.1+cu121
Torchvision version: 0.20.1+cu121
Device we use: cuda:0


#### Unzip Datasets(in G.Drive): image_train, image_test

In [2]:
from google.colab import drive
# Mount Google Drive
drive.mount('/content/drive')

# UNZIPPING THE DATASETS
with zipfile.ZipFile('/content/drive/MyDrive/finetuning/images_training_rev1.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/image_train')

# Path to the main zip file
zip_file_path = '/content/drive/MyDrive/finetuning/images_test_rev1.zip'
output_dir = '/content/image_test'

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Function to recursively unzip files
def unzip_all(zip_path, extract_to):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
        for root, _, files in os.walk(extract_to):
            for file in files:
                if file.endswith('.zip'):
                    nested_zip_path = os.path.join(root, file)
                    nested_output_dir = os.path.join(root, file.replace('.zip', ''))
                    os.makedirs(nested_output_dir, exist_ok=True)
                    unzip_all(nested_zip_path, nested_output_dir)  # Recursively unzip nested zip files
                    os.remove(nested_zip_path)  # Optionally remove nested zip file after extraction

# Unzip the main file and handle nested zips
unzip_all(zip_file_path, output_dir)


Mounted at /content/drive


#### Organize unzipped files

In [3]:
# LOADING IMAGE PATH IMAGES AND LABELS
image_dir = '/content/image_train/images_training_rev1'  # Path to training images
labels_path = '/content/training_solutions_rev1.csv'  # Path to labels CSV

# Load labels
labels_df = pd.read_csv(labels_path)
print("Labels loaded:", labels_df.shape)

Labels loaded: (61578, 38)


# 2.PreProcessing Data

In [4]:
# DATA AUGUMENTATIONA AND NORMALIZATION FOR TRAINING
transform = transforms.Compose([
    transforms.Resize((100, 100)),    # Resize to 100x100 if needed
    transforms.ToTensor(),            # Convert to tensor
    transforms.Normalize((0.5,), (0.5,))  # Normalize
])


In [5]:

#CUSTOM DTASET CLASS(GALAXY DATASET) FOR LOADING IMAGES AND LABELS AS TENSORS
class GalaxyDataset(torch.utils.data.Dataset):
    def __init__(self, image_dir, labels_df, transform=None):
        self.image_dir = image_dir
        self.labels_df = labels_df
        self.transform = transform

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        # Assuming the image filename is in the first column
        img_filename = str(self.labels_df.iloc[idx, 0]) + '.jpg'  # Add .jpg extension if needed
        img_name = os.path.join(self.image_dir, img_filename)  # Adjust if there's a different file extension

        try:
            image = Image.open(img_name).convert('RGB')
        except FileNotFoundError:
            print(f"Image file not found: {img_name}")
            raise

        label = torch.tensor(self.labels_df.iloc[idx, 1:].values, dtype=torch.float32)  # labels as tensor
        if self.transform:
            image = self.transform(image)

        return image, label



#### Splitting the Dataset

In [6]:
# Create dataset and SPLIT TO TRAIN AND VALIDATION SETS AND TEST DATA SETS
from torch.utils.data import random_split, DataLoader

# Define the dataset
dataset = GalaxyDataset(image_dir, labels_df, transform=transform)

batch_size = 37  # or any other number you'd like to use for the batch size


# Define sizes for train, validation, and test sets
train_size = int(0.7 * len(dataset))  # 70% for training
valid_size = int(0.15 * len(dataset)) # 15% for validation
test_size = len(dataset) - train_size - valid_size  # Remaining 15% for testing

# Split the dataset
train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])

# Create data loaders for each split
trainloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
validloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, drop_last=True)

testloader = DataLoader(test_dataset, batch_size=37, shuffle=False, num_workers=2)


In [7]:
# Check the shape of a batch from the train loader
dataiter = iter(trainloader)
images, labels = next(dataiter)  # Use next(dataiter) instead of dataiter.next()
print("Train batch shape:", images.shape)
print("Train labels batch shape:", labels.shape)

# Check the shape of a batch from the validation loader
dataiter_valid = iter(validloader)
images_valid, labels_valid = next(dataiter_valid)
print("Validation batch shape:", images_valid.shape)
print("Validation labels batch shape:", labels_valid.shape)

# Check the shape of a batch from the test loader
dataiter_test = iter(testloader)
images_test, labels_test = next(dataiter_test)
print("Test batch shape:", images_test.shape)
print("Test labels batch shape:", labels_test.shape)



Train batch shape: torch.Size([37, 3, 100, 100])
Train labels batch shape: torch.Size([37, 37])
Validation batch shape: torch.Size([37, 3, 100, 100])
Validation labels batch shape: torch.Size([37, 37])
Test batch shape: torch.Size([37, 3, 100, 100])
Test labels batch shape: torch.Size([37, 37])


# 3.Load Pre-Trained ResNet-50

In [9]:
from transformers import AutoModelForImageClassification
# Apply LoRA to the model's last layer


model_checkpoint = "microsoft/resnet-50"
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint,
    num_labels=37,
    ignore_mismatched_sizes=True
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/69.6k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/102M [00:00<?, ?B/s]

Some weights of ResNetForImageClassification were not initialized from the model checkpoint at microsoft/resnet-50 and are newly initialized because the shapes did not match:
- classifier.1.bias: found shape torch.Size([1000]) in the checkpoint and torch.Size([37]) in the model instantiated
- classifier.1.weight: found shape torch.Size([1000, 2048]) in the checkpoint and torch.Size([37, 2048]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


# LoRA  Implementation

In [10]:
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import matplotlib.pyplot as plt
import numpy as np
from transformers import AutoModelForImageClassification, AutoFeatureExtractor


# Define the LoRA adaptation layer
class LORALayer(nn.Module):
    def __init__(self, adapted_layer, rank=16):
        super(LORALayer, self).__init__()
        self.adapted_layer = adapted_layer
        self.rank = rank
        # Initialize low-rank matrices A and B
        self.A = nn.Parameter(torch.randn(adapted_layer.weight.size(1), rank) * 0.01)
        self.B = nn.Parameter(torch.randn(rank, adapted_layer.weight.size(0)) * 0.01)

    def forward(self, x):
        # Calculate the low-rank adaptation
        low_rank_matrix = self.A @ self.B
        adapted_weight = self.adapted_layer.weight + low_rank_matrix.t()
        return nn.functional.linear(x, adapted_weight, self.adapted_layer.bias)

# Replace the final classification layer with a LoRA layer
last_layer = model.classifier[-1]  # Assuming the last layer is linear
model.classifier[-1] = LORALayer(last_layer)


In [11]:
# Move model to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


ResNetForImageClassification(
  (resnet): ResNetModel(
    (embedder): ResNetEmbeddings(
      (embedder): ResNetConvLayer(
        (convolution): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        (normalization): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (activation): ReLU()
      )
      (pooler): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    )
    (encoder): ResNetEncoder(
      (stages): ModuleList(
        (0): ResNetStage(
          (layers): Sequential(
            (0): ResNetBottleNeckLayer(
              (shortcut): ResNetShortCut(
                (convolution): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
                (normalization): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
              )
              (layer): Sequential(
                (0): ResNetConvLayer(
                  (convolution): Conv2d(64

# 4.Training Loop with LoRA

In [12]:
# Define optimizer, criterion, and scheduler
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# Training parameters
num_epochs = 20
best_val_accuracy = 0.0
best_model_wts = model.state_dict()

# Training loop
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []


for epoch in range(num_epochs):

    # Training phase
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in trainloader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(images)
        logits = outputs.logits  # Extract logits
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(logits.data, 1)
        labels = torch.argmax(labels, dim=1) if labels.ndim > 1 else labels
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    scheduler.step()
    train_accuracy = 100 * correct / total
    train_losses.append(running_loss / len(trainloader))
    train_accuracies.append(train_accuracy)

    # Validation phase
    model.eval()
    val_running_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        for images, labels in validloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            logits = outputs.logits
            loss = criterion(logits, labels)
            val_running_loss += loss.item()
            _, predicted = torch.max(logits, 1)
            labels = torch.argmax(labels, dim=1) if labels.ndim > 1 else labels
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    val_accuracy = 100 * correct / total
    val_losses.append(val_running_loss / len(validloader))
    val_accuracies.append(val_accuracy)

    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_model_wts = model.state_dict()

    print(f"Epoch {epoch+1}, Train Loss: {train_losses[-1]}, Train Accuracy: {train_accuracy}%, "
          f"Val Loss: {val_losses[-1]}, Val Accuracy: {val_accuracy}%")


Epoch 1, Train Loss: 14.037919151823955, Train Accuracy: 66.74793350051083%, Val Loss: 13.861040437077902, Val Accuracy: 69.2282644089873%
Epoch 2, Train Loss: 13.73492541591736, Train Accuracy: 72.39481749791028%, Val Loss: 13.79964398763266, Val Accuracy: 66.81862585477043%
Epoch 3, Train Loss: 13.653787304035987, Train Accuracy: 74.21519457601931%, Val Loss: 13.797145150272721, Val Accuracy: 67.9040486269402%
Epoch 4, Train Loss: 13.597147201754384, Train Accuracy: 74.37308442463082%, Val Loss: 13.857313451039264, Val Accuracy: 71.9743840225768%
Epoch 5, Train Loss: 13.542849004883127, Train Accuracy: 75.11841738645862%, Val Loss: 13.919530768949823, Val Accuracy: 71.81157060675133%
Epoch 6, Train Loss: 13.483187540290283, Train Accuracy: 75.42723135506641%, Val Loss: 13.753311869609787, Val Accuracy: 73.08151525018995%
Epoch 7, Train Loss: 13.420162214855967, Train Accuracy: 75.64549085167641%, Val Loss: 13.834002261180954, Val Accuracy: 74.46000217084554%
Epoch 8, Train Loss: 13.2

# 5.Evaluate the Model on Test Split

In [14]:
# Test the model
model.eval()
correct, total = 0, 0
with torch.no_grad():
    for images, labels in testloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        logits = outputs.logits
        _, predicted = torch.max(logits, 1)
        labels = torch.argmax(labels, dim=1) if labels.ndim > 1 else labels
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

test_accuracy = 100 * correct / total
print(f"Test Accuracy with LoRA: {test_accuracy}%")


Test Accuracy with LoRA: 71.70383199826803%
