In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
import matplotlib.pyplot as plt
import torchvision
import torchvision.transforms as transforms
import os
import matplotlib.pyplot as plt


In [None]:
# Load numpy data
challenge_x_train_np = np.load('challenge_x_train.npy')/255
challenge_x_train_np = (challenge_x_train_np - 0.5) / 0.5  # Equivalent to transforms.Normalize((0.5,), (0.5,))

challenge_y1_train_np = np.load('challenge_y1_train.npy')
challenge_y2_train_np = np.load('challenge_y2_train.npy')
plt.imshow(challenge_x_train_np[1],cmap = 'gray')
plt.title(f"Truth: {challenge_y1_train_np[1]}{challenge_y2_train_np[1]}")

In [None]:


# Convert to torch tensors
challenge_x_train = torch.tensor(challenge_x_train_np, dtype=torch.float32).unsqueeze(1)  # Add channel dimension (1 for grayscale)


challenge_y1_train = torch.tensor(challenge_y1_train_np, dtype=torch.long)
challenge_y2_train = torch.tensor(challenge_y2_train_np, dtype=torch.long)

# Create dataset and split into training and validation
dataset = TensorDataset(challenge_x_train, challenge_y1_train, challenge_y2_train)
train_size = int(0.75 * len(dataset))
valid_size = len(dataset) - train_size
challenge_train, challenge_valid = random_split(dataset, [train_size, valid_size])

challenge_train_loader = DataLoader(challenge_train, batch_size=64, shuffle=True)
challenge_valid_loader = DataLoader(challenge_valid, batch_size=64, shuffle=False)

# Define LeNet-5 model
class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc_y1 = nn.Linear(84, 10)  # Output for y1
        self.fc_y2 = nn.Linear(84, 10)  # Output for y2

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = x.view(x.size(0), -1)  # Flatten
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        y1 = self.fc_y1(x)
        y2 = self.fc_y2(x)
        return y1, y2

# Initialize model, loss, and optimizer
model = LeNet5()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training function
def train_epoch(model, dataloader, optimizer, criterion):
    model.train()
    total_loss = 0
    both_correct = 0
    for x_batch, y1_batch, y2_batch in dataloader:
        optimizer.zero_grad()
        logits1, logits2 = model(x_batch)
        loss_y1 = criterion(logits1, y1_batch)
        loss_y2 = criterion(logits2, y2_batch)
        loss = loss_y1 + loss_y2
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        _, y1_pred = torch.max(logits1, 1)
        _, y2_pred = torch.max(logits2, 1)
        both_correct += torch.sum((y1_pred == y1_batch) & (y2_pred == y2_batch)).item()
    return total_loss / len(dataloader), both_correct / len(dataloader.dataset)

# Validation function
def validate_epoch(model, dataloader, criterion):
    model.eval()
    total_loss = 0
    both_correct = 0
    with torch.no_grad():
        for x_batch, y1_batch, y2_batch in dataloader:
            logits1, logits2 = model(x_batch)
            loss_y1 = criterion(logits1, y1_batch)
            loss_y2 = criterion(logits2, y2_batch)
            loss = loss_y1 + loss_y2
            total_loss += loss.item()
            _, y1_pred = torch.max(logits1, 1)
            _, y2_pred = torch.max(logits2, 1)
            both_correct += torch.sum((y1_pred == y1_batch) & (y2_pred == y2_batch)).item()
    return total_loss / len(dataloader), both_correct / len(dataloader.dataset)

# Training loop
epochs = 100
for epoch in range(epochs):
    train_loss, train_acc = train_epoch(model, challenge_train_loader, optimizer, criterion)
    val_loss, val_acc = validate_epoch(model, challenge_valid_loader, criterion)
    if epoch % 10 == 0:
        print(f'Ep: {epoch + 1}/{epochs}, Tr Loss: {train_loss:.4f}, Tr Acc: {train_acc:.4f}, Va Loss: {val_loss:.4f}, Va Acc: {val_acc:.4f}')

In [None]:
# Load test data
challenge_x_test_tiny_np = np.load('challenge_x_test_tiny.npy')/255
challenge_x_test_tiny_np = (challenge_x_test_tiny_np - 0.5) / 0.5  # Equivalent to transforms.Normalize((0.5,), (0.5,))
challenge_y1_test_tiny_np = np.load('challenge_y1_test_tiny.npy')
challenge_y2_test_tiny_np = np.load('challenge_y2_test_tiny.npy')

# Convert to torch tensors
challenge_x_test_tiny = torch.tensor(challenge_x_test_tiny_np, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
challenge_y1_test_tiny = torch.tensor(challenge_y1_test_tiny_np, dtype=torch.long)
challenge_y2_test_tiny = torch.tensor(challenge_y2_test_tiny_np, dtype=torch.long)

# Create DataLoader for test set
challenge_test_tiny = TensorDataset(challenge_x_test_tiny, challenge_y1_test_tiny, challenge_y2_test_tiny)
challenge_test_tiny_loader = DataLoader(challenge_test_tiny, batch_size=64, shuffle=False)

# Test function (same as validation function)
def test_model(model, dataloader, criterion):
    model.eval()
    total_loss = 0
    both_correct = 0
    with torch.no_grad():
        for x_batch, y1_batch, y2_batch in dataloader:
            logits1, logits2 = model(x_batch)
            loss_y1 = criterion(logits1, y1_batch)
            loss_y2 = criterion(logits2, y2_batch)
            loss = loss_y1 + loss_y2
            total_loss += loss.item()
            _, y1_pred = torch.max(logits1, 1)
            _, y2_pred = torch.max(logits2, 1)
            both_correct += torch.sum((y1_pred == y1_batch) & (y2_pred == y2_batch)).item()
    return total_loss / len(dataloader), both_correct / len(dataloader.dataset)

# After training, evaluate on test set
test_loss, test_acc = test_model(model, challenge_test_tiny_loader, criterion)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')


In [None]:
logits1, logits2 = model(challenge_x_test_tiny.data)

_, y1_pred = torch.max(logits1, 1)
_, y2_pred = torch.max(logits2, 1)
y1_pred = y1_pred.numpy()
y2_pred = y2_pred.numpy()

wrong = (y1_pred == challenge_y1_test_tiny_np) & (y2_pred == challenge_y2_test_tiny_np)
wrong = wrong <0.5
i = np.where(wrong)[0][10]



plt.imshow(challenge_x_test_tiny_np[i], cmap='gray')
plt.title(f"truth: {challenge_y1_test_tiny_np[i]}{challenge_y2_test_tiny_np[i]}, pred: {y1_pred[i]}{y2_pred[i]}")



In [None]:

# Load the original MNIST dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

mnist_train = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
mnist_train_loader = DataLoader(mnist_train, batch_size=64, shuffle=True)

mnist_test = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)
mnist_test_loader = DataLoader(mnist_test, batch_size=64, shuffle=False)

# Define LeNet5 for MNIST
class LeNet5_MNIST(nn.Module):
    def __init__(self):
        super(LeNet5_MNIST, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)  # One output layer for 10 digits

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = x.view(x.size(0), -1)  # Flatten
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Initialize model, loss, and optimizer
model_mnist = LeNet5_MNIST()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_mnist.parameters(), lr=0.001)

# File path for saving and loading model
model_path = './lenet5_mnist.pth'

# Check if model already exists, and load it if so
if os.path.exists(model_path):
    print(f'Loading saved model from {model_path}')
    model_mnist.load_state_dict(torch.load(model_path, weights_only=True))
else:
    print('No saved model found, training from scratch.')

    # Training loop for MNIST
    epochs = 10
    for epoch in range(epochs):
        model_mnist.train()
        total_loss = 0
        for x_batch, y_batch in mnist_train_loader:
            optimizer.zero_grad()
            y_pred = model_mnist(x_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f'Epoch {epoch+1}/{epochs}, MNIST Train Loss: {total_loss / len(mnist_train_loader):.4f}')

    # Save the model after training
    torch.save(model_mnist.state_dict(), model_path)
    print(f'Model saved to {model_path}')

# Evaluate on MNIST test set
model_mnist.eval()
correct = 0
total = 0
with torch.no_grad():
    for x_batch, y_batch in mnist_test_loader:
        y_pred = model_mnist(x_batch)
        _, predicted = torch.max(y_pred, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
mnist_acc = correct / total
print(f'MNIST Test Accuracy: {mnist_acc:.4f}')


In [None]:
# Modify the LeNet5 model by copying the weights from a pretrained model
class LeNet5_Custom(nn.Module):
    def __init__(self, pretrained_model):
        super(LeNet5_Custom, self).__init__()
        # Clone the layers from the pre-trained model
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv1.weight = nn.Parameter(pretrained_model.conv1.weight.clone())
        self.conv1.bias = nn.Parameter(pretrained_model.conv1.bias.clone())

        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.conv2.weight = nn.Parameter(pretrained_model.conv2.weight.clone())
        self.conv2.bias = nn.Parameter(pretrained_model.conv2.bias.clone())

        # Clone fully connected layers
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc1.weight = nn.Parameter(pretrained_model.fc1.weight.clone())
        self.fc1.bias = nn.Parameter(pretrained_model.fc1.bias.clone())

        self.fc2 = nn.Linear(120, 84)
        self.fc2.weight = nn.Parameter(pretrained_model.fc2.weight.clone())
        self.fc2.bias = nn.Parameter(pretrained_model.fc2.bias.clone())

        # New output layers for y1 and y2, cloned from fc3
        self.fc_y1 = nn.Linear(84, 10)
        self.fc_y1.weight = nn.Parameter(pretrained_model.fc3.weight.clone())
        self.fc_y1.bias = nn.Parameter(pretrained_model.fc3.bias.clone())

        self.fc_y2 = nn.Linear(84, 10)
        self.fc_y2.weight = nn.Parameter(pretrained_model.fc3.weight.clone())
        self.fc_y2.bias = nn.Parameter(pretrained_model.fc3.bias.clone())

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = x.view(x.size(0), -1)  # Flatten
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        y1 = self.fc_y1(x)
        y2 = self.fc_y2(x)
        return y1, y2


# File path for saving and loading model
model_custom_path = './custom_lenet5.pth'

# Initialize model with pre-trained layers
model_custom = LeNet5_Custom(model_mnist)
optimizer = optim.Adam(model_custom.parameters(), lr=0.001)

# Check if the model already exists and load it if it does
if os.path.exists(model_custom_path):
    print(f'Loading saved fine-tuned model from {model_custom_path}')
    model_custom.load_state_dict(torch.load(model_custom_path, weights_only=True))
else:
    print('No saved fine-tuned model found, training from scratch.')

    # Training loop
    epochs = 100
    for epoch in range(epochs):
        train_loss, train_acc = train_epoch(model_custom, challenge_train_loader, optimizer, criterion)
        val_loss, val_acc = validate_epoch(model_custom, challenge_valid_loader, criterion)
        print(f'Ep: {epoch + 1}/{epochs}, Tr Loss: {train_loss:.4f}, Tr Acc: {train_acc:.4f}, Va Loss: {val_loss:.4f}, Va Acc: {val_acc:.4f}')

    # Save the fine-tuned model after training
    torch.save(model_custom.state_dict(), model_custom_path)
    print(f'Fine-tuned model saved to {model_custom_path}')
    
test_loss, test_acc = test_model(model_custom, challenge_test_tiny_loader, criterion)
print(f'Test Loss: {test_loss:.4f}, Challenge Test Acc: {test_acc:.4f}')


In [None]:
logits1, logits2 = model_custom(challenge_x_test_tiny.data)

_, y1_pred = torch.max(logits1, 1)
_, y2_pred = torch.max(logits2, 1)
y1_pred = y1_pred.numpy()
y2_pred = y2_pred.numpy()

wrong = (y1_pred == challenge_y1_test_tiny_np) & (y2_pred == challenge_y2_test_tiny_np)
wrong = wrong <0.5
i = np.where(wrong)[0][8]


import matplotlib.pyplot as plt
plt.imshow(challenge_x_test_tiny_np[i],cmap='gray')
plt.title(f"truth: {challenge_y1_test_tiny_np[i]}{challenge_y2_test_tiny_np[i]}, pred: {y1_pred[i]}{y2_pred[i]}")



# Final model

In [None]:
challenge_x_test_np = np.load('challenge_x_test.npy')/255
challenge_x_test_np = (challenge_x_test_tiny_np - 0.5) / 0.5  # Equivalent to transforms.Normalize((0.5,), (0.5,))
challenge_y1_test_np = np.load('challenge_y1_test.npy')
challenge_y2_test_np = np.load('challenge_y2_test.npy')

In [None]:
logits1, logits2 = model_final(challenge_x_test.data)

_, y1_pred = torch.max(logits1, 1)
_, y2_pred = torch.max(logits2, 1)
y1_pred = y1_pred.numpy()
y2_pred = y2_pred.numpy()

wrong = (y1_pred == challenge_y1_test_np) & (y2_pred == challenge_y2_test_np)
wrong = wrong <0.5
i = np.where(wrong)[0][8]


import matplotlib.pyplot as plt
plt.imshow(challenge_x_test_np[i],cmap='gray')
plt.title(f"truth: {challenge_y1_test_np[i]}{challenge_y2_test_np[i]}, pred: {y1_pred[i]}{y2_pred[i]}")

