In [1]:
import os
import glob
import json
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import PIL

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, ConcatDataset

import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
from torchvision.transforms.functional import to_pil_image
from torchvision.datasets import ImageFolder
from PIL import Image

# from google.colab import drive

from collections import Counter

In [2]:
class SemiSupervisedDataset(Dataset):
    def __init__(self, image_dir, mask_dir, label_json, transform=None, mask_transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.mask_transform = mask_transform

        with open(label_json, 'r') as f:
            self.labels = json.load(f)

        self.image_files = [entry["image"] for entry in self.labels]
        self.label_dict = {entry["image"]: entry["class_id"] for entry in self.labels}

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_name = self.image_files[idx]
        image_path = os.path.join(self.image_dir, image_name)
        basename, _ = os.path.splitext(image_name)
        mask_path = os.path.join(self.mask_dir, basename + '.png')

        image = Image.open(image_path).convert("RGB")
        mask = Image.open(mask_path)

        label = self.label_dict[image_name]

        if self.transform:
            image = self.transform(image)
        if self.mask_transform:
            mask = self.mask_transform(mask)

        return image, torch.tensor(label)

image_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.3),
    transforms.RandomRotation(degrees=5), 
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.05),
    transforms.Resize((224, 224)),  # Resize to ResNet-18 input
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

mask_transform = transforms.Compose([
    transforms.Resize((224, 224), interpolation=Image.NEAREST),
    transforms.PILToTensor()
])


In [3]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1,
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        return F.relu(out)

class ResNet18(nn.Module):
    def __init__(self, num_classes=50):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)

        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(64, 2)
        self.layer2 = self._make_layer(128, 2, stride=2)
        self.layer3 = self._make_layer(256, 2, stride=2)
        self.layer4 = self._make_layer(512, 2, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, out_channels, blocks, stride=1):
        downsample = None

        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

        layers = [BasicBlock(self.in_channels, out_channels, stride, downsample)]
        self.in_channels = out_channels

        for _ in range(1, blocks):
            layers.append(BasicBlock(out_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        return self.fc(x)


In [4]:
model = ResNet18(num_classes=50)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

ResNet18(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  

In [5]:
labeled_dataset = SemiSupervisedDataset(
    image_dir="train-semi",
    mask_dir="train-semi-segmentation",
    label_json="train_semi_annotations_with_seg_ids.json",
    transform=image_transform,
    mask_transform=mask_transform
)

labeled_loader = DataLoader(labeled_dataset, batch_size=8, shuffle=True, num_workers=1)

class UnlabeledDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.image_files = sorted(os.listdir(image_dir))
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_name = self.image_files[idx]
        image_path = os.path.join(self.image_dir, image_name)
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image  # Just return image

    
unlabeled_dataset = UnlabeledDataset(
    image_dir="train-unlabeled",  # <- use your unlabeled data path
    transform=image_transform,
)

unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=8, shuffle=False)



In [None]:
num_self_training_rounds = 10
supervised_epochs = 4
semi_supervised_epochs = 3
pseudo_conf_threshold = 0.85

optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for self_training_round in range(num_self_training_rounds):
    print(f"\n=== Self-training Round {self_training_round + 1} ===")

    # --- Step 1: Supervised training on labeled data ---
    model.train()
    for epoch in range(supervised_epochs):
        correct = total = 0
        for images, labels in labeled_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Compute accuracy
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        acc = 100 * correct / total
        print(f"Supervised Epoch {epoch+1}/{supervised_epochs}, Accuracy: {acc:.2f}%")

    # --- Step 2: Generate pseudo-labels on unlabeled data ---
    model.eval()
    pseudo_labeled_data = []  # FIXED: Clear previous pseudo-labels each round
    
    with torch.no_grad():
        for images in unlabeled_loader:
            images = images.to(device)
            outputs = model(images)
            probs = torch.softmax(outputs, dim=1)
            confidence, predicted = torch.max(probs, dim=1)

            for i in range(images.size(0)):
                if confidence[i] >= pseudo_conf_threshold:
                    # FIXED: Store as tuple of (tensor, label) without extra processing
                    pseudo_labeled_data.append((images[i].cpu().clone(), predicted[i].cpu()))

    print(f"Added {len(pseudo_labeled_data)} pseudo-labeled examples.")
    
    # Skip if no pseudo-labels generated
    if len(pseudo_labeled_data) == 0:
        print("No pseudo-labels generated, continuing to next round...")
        continue

    # --- Step 3: Create pseudo-labeled dataset and combine ---
    class PseudoLabelDataset(Dataset):
        def __init__(self, pseudo_data):
            self.data = pseudo_data

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            image, label = self.data[idx]
            return image, label  # FIXED: Return tensors directly, no transform issues

    pseudo_dataset = PseudoLabelDataset(pseudo_labeled_data)
    combined_dataset = ConcatDataset([labeled_dataset, pseudo_dataset])
    combined_loader = DataLoader(combined_dataset, batch_size=8, shuffle=True)

    # --- Step 4: Train on combined labeled + pseudo-labeled data ---
    model.train()
    for epoch in range(semi_supervised_epochs):
        correct = total = 0
        for images, labels in combined_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Compute accuracy
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        acc = 100 * correct / total
        print(f"Semi-Supervised Epoch {epoch+1}/{semi_supervised_epochs}, Accuracy: {acc:.2f}%")

    # Increase confidence threshold
    if len(pseudo_labeled_data) < 20:
        pseudo_conf_threshold -= 0.02
    else:
        pseudo_conf_threshold = max(0.95, pseudo_conf_threshold + 0.02)

print("\n=== Self-training Complete ===")


=== Self-training Round 1 ===
Supervised Epoch 1/4, Accuracy: 1.40%
Supervised Epoch 2/4, Accuracy: 3.40%
Supervised Epoch 3/4, Accuracy: 3.60%
Supervised Epoch 4/4, Accuracy: 4.20%
Added 133 pseudo-labeled examples.
Semi-Supervised Epoch 1/3, Accuracy: 22.59%
Semi-Supervised Epoch 2/3, Accuracy: 24.01%
Semi-Supervised Epoch 3/3, Accuracy: 23.85%

=== Self-training Round 2 ===
Supervised Epoch 1/4, Accuracy: 5.00%
Supervised Epoch 2/4, Accuracy: 8.60%
Supervised Epoch 3/4, Accuracy: 9.40%
Supervised Epoch 4/4, Accuracy: 8.00%
Added 13 pseudo-labeled examples.
Semi-Supervised Epoch 1/3, Accuracy: 7.41%
Semi-Supervised Epoch 2/3, Accuracy: 11.70%
Semi-Supervised Epoch 3/3, Accuracy: 11.31%

=== Self-training Round 3 ===
Supervised Epoch 1/4, Accuracy: 9.40%
Supervised Epoch 2/4, Accuracy: 12.40%
Supervised Epoch 3/4, Accuracy: 11.80%
Supervised Epoch 4/4, Accuracy: 14.00%
Added 48 pseudo-labeled examples.
Semi-Supervised Epoch 1/3, Accuracy: 17.15%
Semi-Supervised Epoch 2/3, Accuracy: 1

In [None]:
from datetime import datetime
def save_model_with_timestamp(model, save_dir="models", model_name="my_model"):
    """
    Saves a PyTorch model to a specified directory with a timestamp in its filename.

    Args:
        model (torch.nn.Module): The PyTorch model to save.
        save_dir (str): The directory where the model should be saved.
                        Defaults to "models".
        model_name (str): The base name for the model file.
                          Defaults to "my_model".
    """
    # 1. Ensure the save directory exists
    os.makedirs(save_dir, exist_ok=True)

    # 2. Generate a timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Format: YYYYMMDD_HHMMSS

    # 3. Create the full filename with path
    filename = f"{model_name}.pth" # Using .pth or .pt extension
    filepath = os.path.join(save_dir, filename)

    # 4. Save the model
    torch.save(model, filepath)
    print(f"Model saved successfully to: {filepath}")

save_model_with_timestamp(model, save_dir="saved_models", model_name="resnet18v6")

Model saved successfully to: saved_models/resnet18v5.pth
