In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, datasets, transforms
from torch.utils.data import DataLoader

In [57]:
class BasicBlock(nn.Module):
    expansion = 1  # output channels multiplier

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super().__init__()
        # First conv
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        # Second conv
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        # Downsample for residual if needed
        self.downsample = downsample
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)
        return out


In [40]:
class ResNet18(nn.Module):
    def __init__(self, num_classes=1000):
        super().__init__()
        self.in_channels = 64

        # Initial convolution and maxpool
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Residual layers
        self.layer1 = self._make_layer(64, 2)   # 2 blocks
        self.layer2 = self._make_layer(128, 2, stride=2)
        self.layer3 = self._make_layer(256, 2, stride=2)
        self.layer4 = self._make_layer(512, 2, stride=2)

        # Average pooling and FC
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * BasicBlock.expansion, num_classes)

    def _make_layer(self, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * BasicBlock.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * BasicBlock.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * BasicBlock.expansion),
            )

        layers = []
        layers.append(BasicBlock(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * BasicBlock.expansion
        for _ in range(1, blocks):
            layers.append(BasicBlock(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        # Initial
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        # Residual layers
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        # Pool and classify
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


In [4]:
class ResNet14(nn.Module):
    def __init__(self, num_classes=7):
        super().__init__()
        self.in_channels = 64

        # Stem
        self.conv1 = nn.Conv2d(
            3, 64, kernel_size=7, stride=2, padding=3, bias=False
        )
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Residual layers (ResNet-14)
        self.layer1 = self._make_layer(64, blocks=1)
        self.layer2 = self._make_layer(128, blocks=1, stride=2)
        self.layer3 = self._make_layer(256, blocks=2, stride=2)
        self.layer4 = self._make_layer(512, blocks=2, stride=2)

        # Head
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

        self._init_weights()

    def _make_layer(self, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )

        layers = []
        layers.append(
            BasicBlock(self.in_channels, out_channels, stride, downsample)
        )
        self.in_channels = out_channels

        for _ in range(1, blocks):
            layers.append(BasicBlock(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


In [72]:
class ResNet10(nn.Module):
    def __init__(self, num_classes=7):
        super().__init__()
        self.in_channels = 64

        # Stem
        self.conv1 = nn.Conv2d(
            3, 64, kernel_size=7, stride=2, padding=3, bias=False
        )
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Residual layers (ResNet-10)
        self.layer1 = self._make_layer(64, blocks=1)
        self.layer2 = self._make_layer(128, blocks=1, stride=2)
        self.layer3 = self._make_layer(256, blocks=1, stride=2)
        self.layer4 = self._make_layer(512, blocks=1, stride=2)

        # Head
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

        self._init_weights()

    def _make_layer(self, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )

        layers = []
        layers.append(
            BasicBlock(self.in_channels, out_channels, stride, downsample)
        )
        self.in_channels = out_channels

        for _ in range(1, blocks):
            layers.append(BasicBlock(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


In [24]:
class ResNet34(nn.Module):
    def __init__(self, num_classes=7):
        super().__init__()
        self.in_channels = 64

        # Stem
        self.conv1 = nn.Conv2d(
            3, 64, kernel_size=7, stride=2, padding=3, bias=False
        )
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(3, 2, 1)

        # Residual layers (ResNet-34)
        self.layer1 = self._make_layer(64, blocks=3)
        self.layer2 = self._make_layer(128, blocks=4, stride=2)
        self.layer3 = self._make_layer(256, blocks=6, stride=2)
        self.layer4 = self._make_layer(512, blocks=3, stride=2)

        # Head
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

        self._init_weights()

    def _make_layer(self, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )

        layers = []
        layers.append(
            BasicBlock(self.in_channels, out_channels, stride, downsample)
        )
        self.in_channels = out_channels

        for _ in range(1, blocks):
            layers.append(BasicBlock(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


In [59]:
import torch
import torch.nn as nn

num_classes = 7
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = ResNet10(num_classes=num_classes)
model = model.to(device)


In [60]:
from torchvision import transforms

train_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),

    # Mild geometric invariance
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),

    # Preserve facial structure
    transforms.Resize((224, 224)),

    transforms.ToTensor(),

    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])
test_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])


In [61]:
train_dataset = datasets.ImageFolder("/kaggle/input/visionquest/train", transform=train_transform)
test_dataset   = datasets.ImageFolder("/kaggle/input/visionquest/test", transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader   = DataLoader(test_dataset, batch_size=128, shuffle=False)


In [62]:
import torch
import numpy as np
from collections import Counter

# Extract labels from dataset
targets = [label for _, label in train_dataset.samples]

class_counts = Counter(targets)
num_classes = len(class_counts)

# Inverse frequency weighting
class_weights = np.zeros(num_classes, dtype=np.float32)
for cls, count in class_counts.items():
    class_weights[cls] = 1.0 / count

# Normalize (important for stability)
class_weights = class_weights / class_weights.sum() * num_classes

class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)


In [63]:
criterion = nn.CrossEntropyLoss(
    weight=class_weights,
    label_smoothing=0.1
)


In [64]:
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=3e-4,
    weight_decay=1e-4
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',        # Monitored metric is expected to decrease (default)
    factor=0.1,        # Factor by which the learning rate will be reduced (default)
    patience=2        # Number of epochs with no improvement after which learning rate will be reduced (default)
)


In [65]:
from tqdm import tqdm

def train_one_epoch(model, loader):
    model.train()
    total_loss, correct, total = 0.0, 0, 0

    pbar = tqdm(loader, desc="Train", leave=False)
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        preds = outputs.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

        pbar.set_postfix(
            loss=total_loss / (pbar.n + 1),
            acc=correct / total
        )

    return total_loss / len(loader), correct / total


In [66]:
@torch.no_grad()
def validate(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    correct, total = 0, 0

    for images, labels in loader:
        images = images.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        outputs = model(images)
        loss = criterion(outputs, labels)

        total_loss += loss.item() * labels.size(0)
        preds = outputs.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    avg_loss = total_loss / total
    acc = correct / total

    return avg_loss, acc


In [67]:
best_test_acc = 0.0

In [69]:

epochs = 5
for epoch in range(epochs):

    train_loss, train_acc = train_one_epoch(model, train_loader)
    test_loss, test_acc = validate(model, test_loader, criterion)

    scheduler.step(test_loss)

    # Save best model (by test accuracy)
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), "/kaggle/working/custom_resnet10.pth")

    print(
        f"Epoch {epoch+1:02d} | "
        f"Train Loss {train_loss:.4f} | "
        f"Train Acc {train_acc:.3f} | "
        f"Test Loss {test_loss:.4f} | "
        f"Test Acc {test_acc:.3f} | "
        f"Best Test Acc {best_test_acc:.3f}"
    )


                                                                              

Epoch 01 | Train Loss 1.5773 | Train Acc 0.524 | Test Loss 1.7309 | Test Acc 0.531 | Best Test Acc 0.531


                                                                              

Epoch 02 | Train Loss 1.5556 | Train Acc 0.540 | Test Loss 1.7335 | Test Acc 0.509 | Best Test Acc 0.531


                                                                              

Epoch 03 | Train Loss 1.5417 | Train Acc 0.548 | Test Loss 1.7262 | Test Acc 0.525 | Best Test Acc 0.531


                                                                              

Epoch 04 | Train Loss 1.5286 | Train Acc 0.557 | Test Loss 1.7190 | Test Acc 0.540 | Best Test Acc 0.540


                                                                              

Epoch 05 | Train Loss 1.5164 | Train Acc 0.562 | Test Loss 1.7117 | Test Acc 0.541 | Best Test Acc 0.541


In [90]:
import torch
import torch.nn.functional as F
from sklearn.metrics import classification_report
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

num_classes = 7

# Instantiate models
model_r34 = ResNet34(num_classes=num_classes).to(device)
model_r18 = ResNet18(num_classes=num_classes).to(device)
model_r14 = ResNet14(num_classes=num_classes).to(device)
model_r10 = ResNet10(num_classes=num_classes).to(device)

# Load checkpoints
model_r34.load_state_dict(torch.load("/kaggle/working/custom_resnet34.pth", map_location=device))
model_r18.load_state_dict(torch.load("/kaggle/working/custom_resnet18.pth", map_location=device))
model_r14.load_state_dict(torch.load("/kaggle/working/custom_resnet14.pth", map_location=device))
model_r10.load_state_dict(torch.load("/kaggle/working/custom_resnet10.pth", map_location=device))

models = [model_r34, model_r18, model_r14, model_r10]

# Evaluation mode
for m in models:
    m.eval()


In [91]:
import torch.nn.functional as F

@torch.no_grad()
def ensemble_predict(models, dataloader):
    all_preds = []
    all_targets = []

    correct = 0
    total = 0

    for images, labels in dataloader:
        images = images.to(device)
        labels = labels.to(device)

        probs_sum = None
        for model in models:
            logits = model(images)
            probs = F.softmax(logits, dim=1)
            probs_sum = probs if probs_sum is None else probs_sum + probs

        avg_probs = probs_sum / len(models)
        preds = avg_probs.argmax(dim=1)

        correct += (preds == labels).sum().item()
        total += labels.size(0)

        all_preds.append(preds.cpu())
        all_targets.append(labels.cpu())

    accuracy = correct / total
    print(f"Ensemble Test Accuracy: {accuracy:.4f}")

    all_preds = torch.cat(all_preds).numpy()
    all_targets = torch.cat(all_targets).numpy()

    return all_targets, all_preds


In [92]:
y_true, y_pred = ensemble_predict(models, test_loader)

print(classification_report(
    y_true,
    y_pred,
    target_names=[
        "Angry", "Disgust", "Fear",
        "Happy", "Sad", "Surprise", "Neutral"
    ]
))


Ensemble Test Accuracy: 0.6410
              precision    recall  f1-score   support

       Angry       0.55      0.63      0.59       958
     Disgust       0.27      0.77      0.40       111
        Fear       0.50      0.43      0.46      1024
       Happy       0.90      0.80      0.85      1774
         Sad       0.64      0.55      0.59      1233
    Surprise       0.52      0.54      0.53      1247
     Neutral       0.75      0.83      0.78       831

    accuracy                           0.64      7178
   macro avg       0.59      0.65      0.60      7178
weighted avg       0.66      0.64      0.65      7178

