# Prepare Files

In [None]:
import gdown

# Google Drive file ID
file_id = "1ExI4cImHyoFPL0anBBHePui5zhRWKQvj"
output_path = "RS_images_2800.rar"

# Use gdown to download
gdown.download(f"https://drive.google.com/uc?id={file_id}", output_path, quiet=False)

!apt-get install unrar
!unrar x RS_images_2800.rar -o+  # unzip

In [None]:
import os

dataset_path = "/content/RS_images_2800"
print(os.listdir(dataset_path))

# Data preprocessing

In [None]:
import random
import pandas as pd


random.seed(410)

dataset_path = "./RS_images_2800"

# Split the dataset proportionally
train_ratio = 0.7  # Training set
valid_ratio = 0.15  # Validation set
test_ratio = 0.15   # Test set

# Read all categories
categories = sorted(os.listdir(dataset_path))
categories_cleaned = ["".join(c[1:]) for c in categories]
class_mapping = {c: i for i, c in enumerate(categories_cleaned)}

# Store
dataset = {"train": [], "valid": [], "test": []}

for category in categories:
    category_path = os.path.join(dataset_path, category)
    images = [os.path.join(category_path, img) for img in os.listdir(category_path)]

    # Random shuffle
    random.shuffle(images)

    # Calculate partition index
    total_count = len(images)
    train_count = int(total_count * train_ratio)
    valid_count = int(total_count * valid_ratio)

    # Split
    dataset["train"].extend([(img, category[1:], class_mapping[category[1:]]) for img in images[:train_count]])
    dataset["valid"].extend([(img, category[1:], class_mapping[category[1:]]) for img in images[train_count:train_count + valid_count]])
    dataset["test"].extend([(img, category[1:], class_mapping[category[1:]]) for img in images[train_count + valid_count:]])

# Convert to Pandas DataFrame
df_train = pd.DataFrame(dataset["train"], columns=["image", "class_name", "class_num"])
df_valid = pd.DataFrame(dataset["valid"], columns=["image", "class_name", "class_num"])
df_test = pd.DataFrame(dataset["test"], columns=["image", "class_name", "class_num"])

# Save
df_train.to_csv("train_data.csv", index=False)
df_valid.to_csv("valid_data.csv", index=False)
df_test.to_csv("test_data.csv", index=False)

In [None]:
# Check
# print(df_train.shape)
# print(df_valid.shape)
# print(df_test.shape)
# print(df_train['class_name'].value_counts())
# print(df_valid['class_name'].value_counts())
# print(df_test['class_name'].value_counts())

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt

# Set global random seed
seed = 410
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
df_train = pd.read_csv("train_data.csv")
df_valid = pd.read_csv("valid_data.csv")
df_test = pd.read_csv("test_data.csv")

# Data enhancement
train_transform = transforms.Compose([
    transforms.Resize((400, 400)),
    transforms.RandomRotation(30),    # Rotation
    transforms.RandomHorizontalFlip(),  # Horizontal flip
    transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.1),   # Brightness change
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

valid_test_transform = transforms.Compose([
    transforms.Resize((400, 400)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

class LoadDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path, class_name, class_num = self.dataframe.iloc[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, class_num

# Create DataLoader
batch_size = 32

train_dataset = LoadDataset(df_train, transform=train_transform)
valid_dataset = LoadDataset(df_valid, transform=valid_test_transform)
test_dataset = LoadDataset(df_test, transform=valid_test_transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Basic CNN

In [None]:
class CNNBaseline(nn.Module):
    def __init__(self, num_classes):
        super(CNNBaseline, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(128 * 50 * 50, 256)  #400x400 input, after 3 MaxPool iterations, becomes 50x50
        self.fc2 = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


num_classes = len(df_train["class_num"].unique())

model = CNNBaseline(num_classes).to(device)

In [None]:
# !pip install torchview

from torchviz import make_dot

sample_input = torch.randn(1, 3, 400, 400).to(device)
output = model(sample_input)

make_dot(output, params=dict(model.named_parameters())).render("cnn_architecture", format="pdf")

In [None]:
import matplotlib.pyplot as plt

def plot_training_results(train_losses, valid_losses, train_accs, valid_accs, model_name="Model"):
    epochs = range(1, len(train_losses) + 1)

    plt.figure(figsize=(12, 6))

    # Draw loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label="Train Loss")
    plt.plot(epochs, valid_losses, label="Valid Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title(f"{model_name} - Loss")
    plt.legend()

    # Draw accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accs, label="Train Accuracy")
    plt.plot(epochs, valid_accs, label="Valid Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.title(f"{model_name} - Accuracy")
    plt.legend()

    plt.tight_layout()
    plt.savefig(f"{model_name}_training_results.pdf")
    plt.show()

In [None]:
def train_model(model, train_loader, valid_loader, device, num_epochs=30, learning_rate=0.0001, early_stop_patience=5, model_name="Model"):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Early stopping
    best_val_loss = float("inf")
    early_stop_counter = 0

    # Record Loss and Accuracy
    train_losses, valid_losses = [], []
    train_accs, valid_accs = [], []

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total
        train_losses.append(train_loss)
        train_accs.append(train_acc)

        # Validation
        model.eval()
        running_val_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for images, labels in valid_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                running_val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

        val_loss = running_val_loss / len(valid_loader)
        val_acc = correct / total
        valid_losses.append(val_loss)
        valid_accs.append(val_acc)

        print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

        # Early stoping & Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            early_stop_counter = 0
            torch.save(model.state_dict(), f"{model_name}_best.pth")
            print("Best model saved!")
        else:
            early_stop_counter += 1
            if early_stop_counter >= early_stop_patience:
                print("Early stopping triggered!")
                break

    # plot loss and acc
    plot_training_results(train_losses, valid_losses, train_accs, valid_accs, model_name)

In [None]:
train_model(model, train_loader, valid_loader, device, num_epochs=30, model_name="CNN Baseline")

In [None]:
def evaluate_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)

            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    accuracy = correct / total
    print(f"Test Accuracy: {accuracy:.4f}")

    return accuracy, all_predictions, all_labels

In [None]:
# Load model
best_model = CNNBaseline(num_classes).to(device)
best_model.load_state_dict(torch.load("best_model.pth"))

# Compute test accuracy
test_accuracy, predictions, labels = evaluate_model(best_model, test_loader, device)

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(y_true, y_pred, class_names, model_name):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title(f"{model_name} confusion matrix")
    plt.savefig(f"{model_name}_confusion_matrix.pdf")
    plt.show()

# Draw confusion matrix
plot_confusion_matrix(labels, predictions, df_train["class_name"].unique(), model_name="CNN Baseline")

# ResNet50

In [None]:
from torchvision import models

# Load ResNet-50 pre trained model
model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)

# Freeze the first 140 layers (only train the last few layers)
for param in list(model.parameters())[:140]:
    param.requires_grad = False

# Replace the fully connected layer of ResNet-50
num_features = model.fc.in_features
model.fc = nn.Sequential(
    nn.Linear(num_features, 256),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(256, num_classes)
)

model = model.to(device)


In [None]:
make_dot(output, params=dict(model.named_parameters())).render("resnet50_architecture", format="pdf")

In [None]:
train_model(model, train_loader, valid_loader, device, num_epochs=30, model_name="ResNet-50")

In [None]:
# Load best model
best_resnet_model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
best_resnet_model.fc = nn.Sequential(
    nn.Linear(num_features, 256),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(256, num_classes)
)
best_resnet_model.load_state_dict(torch.load("ResNet-50_best.pth"))
best_resnet_model.to(device)

# Compute test accuracy
test_accuracy, predictions, labels = evaluate_model(best_resnet_model, test_loader, device)

# Deaw confusion matrix
plot_confusion_matrix(labels, predictions, df_train["class_name"].unique(), model_name="ResNet-50")


#Vit

In [None]:
from transformers import ViTModel, ViTFeatureExtractor

# Load ViT feature extractor
feature_extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224")
# Load ViT model (without classification head)
class ViTClassifier(nn.Module):
    def __init__(self, num_classes=7):
        super(ViTClassifier, self).__init__()
        self.vit = ViTModel.from_pretrained("google/vit-base-patch16-224")
        self.classifier = nn.Sequential(
            nn.Linear(self.vit.config.hidden_size, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        outputs = self.vit(x)  # Extract features
        cls_token = outputs.last_hidden_state[:, 0, :]  # CLS token
        return self.classifier(cls_token)

# Initialize model
model = ViTClassifier(num_classes=7).to(device)

In [None]:
output = model(torch.randn(1, 3, 224, 224).to(device))
make_dot(output, params=dict(model.named_parameters())).render("vit_architecture", format="pdf")

In [None]:
train_transform_vit = transforms.Compose([
    transforms.Resize((224, 224)),  # Vit needs 224x224 input
    transforms.RandomRotation(30),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

valid_test_transform_vit = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


In [None]:
class ViTDataset(Dataset):
    def __init__(self, dataframe, transform):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path, class_num = self.dataframe.iloc[idx][["image", "class_num"]]

        image = Image.open(img_path).convert("RGB")

        image = self.transform(image)

        # Ensure the shape is (3, 224, 224)
        if image.shape != (3, 224, 224):
            print(f"Shape mismatch: {image.shape} at index {idx}")

        # Convert class_num to PyTorch tensor
        class_num = torch.tensor(class_num, dtype=torch.long)

        return image, class_num


train_dataset_vit = ViTDataset(df_train, transform=train_transform_vit)
valid_dataset_vit = ViTDataset(df_valid, transform=valid_test_transform_vit)
test_dataset_vit = ViTDataset(df_test, transform=valid_test_transform_vit)

train_loader_vit = DataLoader(train_dataset_vit, batch_size=batch_size, shuffle=True)
valid_loader_vit = DataLoader(valid_dataset_vit, batch_size=batch_size, shuffle=False)
test_loader_vit = DataLoader(test_dataset_vit, batch_size=batch_size, shuffle=False)


In [None]:
train_model(model, train_loader_vit, valid_loader_vit, device, num_epochs=30, model_name="ViT-Base")

In [None]:
# Load best model
best_vit_model = ViTClassifier(num_classes=7).to(device)
best_vit_model.load_state_dict(torch.load("ViT-Base_best.pth"))

# Compute test accuracy
test_accuracy, predictions, labels = evaluate_model(best_vit_model, test_loader_vit, device)

# Draw confusion matrix
plot_confusion_matrix(labels, predictions, df_train["class_name"].unique(), model_name="ViT-Base")

# SSL

In [None]:
contrastive_transforms = transforms.Compose([
    transforms.RandomResizedCrop(400, scale=(0.2, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomApply([transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def get_augmented_views(img_path):
    img = Image.open(img_path).convert("RGB")
    view1 = contrastive_transforms(img)
    view2 = contrastive_transforms(img)
    return view1, view2

In [None]:
class SimCLRDataset(Dataset):
    def __init__(self, df):
        self.image_paths = df["image"].values

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img1, img2 = get_augmented_views(img_path)
        return img1, img2

train_dataset_sim = SimCLRDataset(df_train)
train_loader_sim = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)


In [None]:
from torchvision.models import ResNet50_Weights

class SimCLR(nn.Module):
    def __init__(self, feature_dim=128):
        super(SimCLR, self).__init__()
        base_model = models.resnet50(weights=ResNet50_Weights.DEFAULT)

        # Remove ResNet-50 Classification layer
        self.encoder = nn.Sequential(*list(base_model.children())[:-1])

        # Projection Head
        self.projector = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Linear(512, feature_dim)
        )

    def forward(self, x):
        h = self.encoder(x).squeeze()
        z = self.projector(h)
        return z


model = SimCLR().cuda()

In [None]:
def nt_xent_loss(z1, z2, temperature=0.3):
    z1 = F.normalize(z1, dim=1)
    z2 = F.normalize(z2, dim=1)
    logits = torch.matmul(z1, z2.T) / temperature
    labels = torch.arange(z1.shape[0]).cuda()
    loss = F.cross_entropy(logits, labels)
    return loss

In [None]:
import gc
import time
import json
from torch.amp import autocast, GradScaler

# Record time
start_time = time.time()

# Record training history
history = {"train_loss": [], "learning_rate": []}


scaler = GradScaler(device="cuda")
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

best_train_loss = float("inf")
patience = 5
early_stop_counter = 0


for epoch in range(30):
    total_loss = 0
    model.train()

    epoch_start = time.time()

    for img1, img2 in train_loader:
        img1, img2 = img1.cuda(), img2.cuda()

        optimizer.zero_grad()

        with autocast(device_type="cuda"):
            z1, z2 = model(img1), model(img2)
            loss = nt_xent_loss(z1, z2)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()

        del img1, img2, z1, z2, loss
        torch.cuda.empty_cache()
        gc.collect()

    avg_train_loss = total_loss / len(train_loader)
    history["train_loss"].append(avg_train_loss)
    history["learning_rate"].append(optimizer.param_groups[0]['lr'])

    # update learning rate
    scheduler.step()

    # Compute training time
    epoch_time = time.time() - epoch_start

    print(f"Epoch [{epoch+1}/30] | Train Loss: {avg_train_loss:.4f} | LR: {optimizer.param_groups[0]['lr']:.6f} | Time: {epoch_time:.2f}s")

    # Save best model
    if avg_train_loss < best_train_loss:
        best_train_loss = avg_train_loss
        torch.save(model.encoder.state_dict(), "simclr_best_model.pth")
        early_stop_counter = 0
    else:
        early_stop_counter += 1

    # Early Stopping
    if early_stop_counter >= patience:
        break


In [None]:
def balanced_sample(df, frac=0.5, random_state=410):
    return df.groupby("class_name", group_keys=False).apply(lambda x: x.sample(frac=frac, random_state=random_state)).reset_index(drop=True)

df_train_sample = balanced_sample(df_train)

# Check
print(df_train["class_name"].value_counts())
print(df_train_sample["class_name"].value_counts())


In [None]:
train_dataset_full = LoadDataset(df_train, transform=train_transform)
train_dataset_sample = LoadDataset(df_train_sample, transform=train_transform)
valid_dataset = LoadDataset(df_valid, transform=valid_test_transform)
test_dataset = LoadDataset(df_test, transform=valid_test_transform)

train_loader_full = DataLoader(train_dataset_full, batch_size=batch_size, shuffle=True)
train_loader_sample = DataLoader(train_dataset_sample, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Load ResNet-50 pre trained with SimCLR
simclr_model = models.resnet50()
num_features = simclr_model.fc.in_features
simclr_model.fc = nn.Identity()  # Remove the projection head
simclr_model.load_state_dict(torch.load("simclr_best_model.pth"), strict=False)
simclr_model = simclr_model.cuda()


# Add classification head
simclr_model.fc = nn.Sequential(
    nn.Linear(num_features, 256),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(256, num_classes)
)

simclr_model = simclr_model.to(device)

# Full training set
train_model(simclr_model, train_loader_full, valid_loader, device, num_epochs=30, model_name="SimCLR-Finetune_100%")

In [None]:
# Load the best model
best_simclr_model = models.resnet50()
best_simclr_model.fc = nn.Sequential(
    nn.Linear(num_features, 256),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(256, num_classes)
)
best_simclr_model.load_state_dict(torch.load("SimCLR-Finetune_100%_best.pth"))
best_simclr_model.to(device)

# Compute test accuracy
test_accuracy, predictions, labels = evaluate_model(best_simclr_model, test_loader, device)

# Draw confusion matrix
plot_confusion_matrix(labels, predictions, df_train["class_name"].unique(), model_name="SimCLR-Finetune_100%")

In [None]:
# Load ResNet-50 pre trained with SimCLR
simclr_model = models.resnet50()
num_features = simclr_model.fc.in_features
simclr_model.fc = nn.Identity()  # Remove the projection head
simclr_model.load_state_dict(torch.load("simclr_best_model.pth"), strict=False)
simclr_model = simclr_model.cuda()

# Add classification head
simclr_model.fc = nn.Sequential(
    nn.Linear(num_features, 256),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(256, num_classes)
)

simclr_model = simclr_model.to(device)

# Half of training set
train_model(simclr_model, train_loader_sample, valid_loader, device, num_epochs=30, model_name="SimCLR-Finetune_50%")

In [None]:
# Load the best model
best_simclr_model = models.resnet50()
best_simclr_model.fc = nn.Sequential(
    nn.Linear(num_features, 256),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(256, num_classes)
)
best_simclr_model.load_state_dict(torch.load("SimCLR-Finetune_50%_best.pth"))
best_simclr_model.to(device)

# Compute test accuracy
test_accuracy, predictions, labels = evaluate_model(best_simclr_model, test_loader, device)

# Draw confusion matrix
plot_confusion_matrix(labels, predictions, df_train["class_name"].unique(), model_name="SimCLR-Finetune_50%")