Import All required packages


In [None]:
# Basic Setup
#pytorch
import torch
import numpy as np
from torch import nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, ConcatDataset
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR

#torchvision
import torchvision
from torchvision import models, transforms
from torchvision.models import MobileNet_V2_Weights

#misc
from PIL import Image
from tqdm import tqdm
import os
from sklearn.metrics import precision_score, recall_score, f1_score

#plotting
import matplotlib.pyplot as plt
print(f"Pytorch Version: {torch.__version__}")
print(f"Torchvision Version: {torchvision.__version__}")

#Device agnostic
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device Type: {device}")

try:
  import torchinfo
except:
  !pip install torchinfo
  import torchinfo

from torchinfo import summary

Pytorch Version: 2.6.0+cu124
Torchvision Version: 0.21.0+cu124
Device Type: cuda
Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


Setup WandB connection with API key ❗**REMOVE API KEY IF PUBLISHED** ❗


In [None]:
!pip install --upgrade wandb
import wandb
key = ""
wandb.login(key=key)



[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33maweismann[0m ([33mcic-truthseeker[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


True

## CHANGE THESE EVERY RUN:
---
* config -> architecture
* train_data_dir -> location
* test_data_dir -> location
* augmented_data_dir -> change drive location to the test type
* train_dataset_augmented -> update location
* test_dataset_augmented -> update location

~test~


In [None]:
# Initialize W&B
wandb.init(
    settings=wandb.Settings(init_timeout=90),
    project="Synthetic-Data-Augmentation-Base",
    config={
        "learning_rate": 0.001,
        "batch_size": 8,
        "epochs": 15,
        "architecture": "MobileNetV2",
        "image_size": 224,
        "k_folds": 5,
    },
)
config = wandb.config

Create the transforms and augment the original data. Creating a new dataset that contains the transformed images in it.


In [None]:
# pre augment the training data to create additional images for the dataset
train_data_dir = "/content/drive/MyDrive/exp5/train"
test_data_dir = "/content/drive/MyDrive/exp5/test"

#transforms
augmentation_transform = transforms.Compose([
    transforms.Resize((config['image_size'],config['image_size'])),
    transforms.RandomRotation(degrees= 10),
    transforms.RandomResizedCrop(224, scale=(0.8,1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
])

train_transform = transforms.Compose([
    transforms.Resize((config['image_size'], config['image_size'])),
    transforms.RandomRotation(degrees=10),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomResizedCrop(config['image_size'], scale=(0.8,1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
])

test_transform = transforms.Compose([
    transforms.Resize((config['image_size'], config['image_size'])),
    transforms.ToTensor(),
])


In [None]:
# created augmented data directory for future use
for dir in [train_data_dir,test_data_dir]:
  os.makedirs(f"/content/drive/MyDrive/exp5/augmented_{os.path.basename(dir)}",exist_ok= True)
  for label_folder in os.listdir(dir):
    original_folder = os.path.join(dir,label_folder)

    augmented_folder_path = os.path.join(f"/content/drive/MyDrive/exp5/augmented_{os.path.basename(dir)}",label_folder)
    os.makedirs(augmented_folder_path,exist_ok=True)

    for image_name in tqdm(os.listdir(original_folder)):
      image_path = os.path.join(original_folder,image_name)
      image = Image.open(image_path).convert("RGB")

      for i in range(5):
        augmented_image = augmentation_transform(image)
        augmented_image_path = os.path.join(augmented_folder_path,f"{os.path.splitext(image_name)[0]}_aug_{i}.png")
        transforms.ToPILImage()(augmented_image).save(augmented_image_path)


100%|██████████| 252/252 [01:04<00:00,  3.92it/s]
100%|██████████| 252/252 [01:11<00:00,  3.54it/s]
100%|██████████| 251/251 [01:08<00:00,  3.67it/s]
100%|██████████| 249/249 [01:06<00:00,  3.72it/s]


Define the SytheticDataset class for use later when loading the datasets

In [None]:
# Custom Dataset Class
class SyntheticDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Traverse 'hybrid' and 'local' subdirectories
        for label_folder in ['hybrid', 'local']:
            folder_path = os.path.join(data_dir, label_folder)
            label = 1 if label_folder == 'hybrid' else 0  # 1 for hybrid, 0 for local

            for image_name in os.listdir(folder_path):
                image_path = os.path.join(folder_path, image_name)
                self.image_paths.append(image_path)
                self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        image_path = self.image_paths[index]
        image = Image.open(image_path).convert("RGB")
        label = self.labels[index]

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(label, dtype=torch.float32)  # For BCEWithLogitsLoss


Create two datasets:

*   Base dataset train and test splits
*   Augmented data train and test splits



In [None]:
train_dataset_original = SyntheticDataset(data_dir= train_data_dir,transform= train_transform)
test_dataset_original = SyntheticDataset(data_dir= test_data_dir, transform= test_transform)

train_dataset_augmented = SyntheticDataset(data_dir= "/content/drive/MyDrive/exp5/augmented_train", transform= train_transform)
test_dataset_augmented = SyntheticDataset(data_dir= "/content/drive/MyDrive/exp5/augmented_test", transform= test_transform)

print(f"Original Train Size: {len(train_dataset_original)} || Augmented Train Size: {len(train_dataset_augmented)}")
print(f"Original Test Size: {len(test_dataset_original)} || Augmented Test Size: {len(test_dataset_augmented)}")

Original Train Size: 504 || Augmented Train Size: 2520
Original Test Size: 500 || Augmented Test Size: 2500


In [None]:
# train_idx_set = set(train_idx)
# val_idx_set = set(val_idx)
# overlap = train_idx_set.intersection(val_idx_set)

# print(f"Overlap between training and validation sets: {len(overlap)} samples")


Overlap between training and validation sets: 0 samples


In [None]:
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, SubsetRandomSampler
from torchvision import models
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.model_selection import StratifiedKFold



# Create datasets
train_dataset = train_dataset_augmented
test_dataset = test_dataset_augmented

# ================================
# K-Fold Cross-Validation Setup
# ================================
k_folds = config.k_folds
kf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

train_labels = [label for _, label in train_dataset]


dataset_size = len(train_dataset)
indices = list(range(dataset_size))

fold_results = []  # Store per-fold metrics

for fold, (train_idx, val_idx) in enumerate(kf.split(indices,train_labels)):
    print(f"\n===== Fold {fold + 1}/{k_folds} =====")

    # Create Subsets and DataLoader
    train_subset = torch.utils.data.Subset(train_dataset, train_idx)
    val_subset = torch.utils.data.Subset(train_dataset, val_idx)

    train_loader = DataLoader(train_subset, batch_size=config.batch_size, shuffle= True)
    val_loader = DataLoader(val_subset, batch_size=config.batch_size, shuffle= True)


    # ==============  Model Setup (Change if needed) ================
    model = models.mobilenet_v2(weights=MobileNet_V2_Weights.IMAGENET1K_V1)

    # Freeze all layers
    for param in model.parameters():
        param.requires_grad = False

    # # unfreeze the last 5 layers
    for param in model.features[-5:].parameters():
      param.requires_grad = True

    # Unfreeze the classifier layer
    model.classifier = nn.Sequential(
        nn.Dropout(0.7), # we can change the dropout layer here's percentage also
        nn.Linear(model.last_channel, 1)
    )

    model.to(device)


    # criterion and optimizer init
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=config.learning_rate)

    # ====================
    # Training the Fold
    # ====================
    for epoch in range(config.epochs):
        model.train()
        train_loss = 0.0
        correct_train = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device).float()

            optimizer.zero_grad()
            outputs = model(images).squeeze(-1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * images.size(0)
            preds = torch.sigmoid(outputs).round()
            correct_train += (preds == labels).sum().item()

        # Average training loss and accuracy
        train_loss /= len(train_loader.dataset)
        train_accuracy = 100.0 * correct_train / len(train_loader.dataset)


        # Validation on the Fold
        model.eval()
        val_loss = 0.0
        correct_val = 0
        all_labels = []
        all_preds = []

        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images).squeeze(-1)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * images.size(0)
                preds = torch.sigmoid(outputs).round()
                correct_val += (preds == labels).sum().item()

                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(preds.cpu().numpy())

        # Average validation loss and accuracy
        val_loss /= len(val_loader.dataset)
        val_accuracy = 100.0 * correct_val / len(val_loader.dataset)

        # Additional metrics
        precision = precision_score(all_labels, all_preds, average='binary', zero_division=1)
        recall = recall_score(all_labels, all_preds, average='binary', zero_division=1)
        f1 = f1_score(all_labels, all_preds, average='binary', zero_division=1)

        # Log metrics to W&B
        wandb.log({
            "fold": fold + 1,
            "epoch": epoch + 1,
            "train_loss": train_loss,
            "train_accuracy": train_accuracy,
            "val_loss": val_loss,
            "val_accuracy": val_accuracy,
            "val_precision": precision,
            "val_recall": recall,
            "val_f1": f1,
            "learning_rate": optimizer.param_groups[0]['lr'],
        })

    # Store final fold metrics (from the last epoch)
    fold_results.append([val_accuracy, precision, recall, f1])
    print(f"Fold {fold+1} Results - Accuracy: {val_accuracy:.2f}%, Precision: {precision:.4f}, "
          f"Recall: {recall:.4f}, F1: {f1:.4f}")

avg_metrics = np.mean(fold_results, axis=0)
print("\n===== Final K-Fold Results =====")
print(f"Avg Accuracy: {avg_metrics[0]:.2f}%, Avg Precision: {avg_metrics[1]:.4f}, "
      f"Avg Recall: {avg_metrics[2]:.4f}, Avg F1: {avg_metrics[3]:.4f}")

# ========================
# Final Evaluation on Test
# ========================
print("\n===== Final Evaluation on Test Set =====")
test_loader = DataLoader(test_dataset, batch_size=config.batch_size, shuffle=False)

model.eval()
test_loss = 0.0
correct_test = 0
all_test_labels = []
all_test_preds = []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images).squeeze(-1)
        loss = criterion(outputs, labels)
        test_loss += loss.item() * images.size(0)

        preds = torch.sigmoid(outputs).round()
        correct_test += (preds == labels).sum().item()
        all_test_labels.extend(labels.cpu().numpy())
        all_test_preds.extend(preds.cpu().numpy())

test_loss /= len(test_loader.dataset)
test_accuracy = 100.0 * correct_test / len(test_loader.dataset)

test_precision = precision_score(all_test_labels, all_test_preds, average='binary', zero_division=1)
test_recall = recall_score(all_test_labels, all_test_preds, average='binary', zero_division=1)
test_f1 = f1_score(all_test_labels, all_test_preds, average='binary', zero_division=1)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.2f}%")
print(f"Test Precision: {test_precision:.4f}")
print(f"Test Recall: {test_recall:.4f}")
print(f"Test F1: {test_f1:.4f}")

wandb.finish()
