In [None]:
'''Train CIFAR10 with PyTorch.'''
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn

import torchvision
import torchvision.transforms as transforms

import os


In [None]:
# Transformations
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

# Load CIFAR-10 dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

cifar10_classes = {
    0: 'airplane',
    1: 'automobile',
    2: 'bird',
    3: 'cat',
    4: 'deer',
    5: 'dog',
    6: 'frog',
    7: 'horse',
    8: 'ship',
    9: 'truck'
}
print("CIFAR-10 classes:")
for label, class_name in cifar10_classes.items():
    print(f"{label}: {class_name}")

Files already downloaded and verified
Files already downloaded and verified
CIFAR-10 classes:
0: airplane
1: automobile
2: bird
3: cat
4: deer
5: dog
6: frog
7: horse
8: ship
9: truck


In [None]:
'''ResNet in PyTorch.

Original source: https://github.com/kuangliu/pytorch-cifar/blob/master/models/resnet.py
Reference:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
    Deep Residual Learning for Image Recognition. arXiv:1512.03385
'''
import torch
import torch.nn as nn
import torch.nn.functional as F


class ResidualBlock(nn.Module):
    """
    """
    expansion = 1  # Bottleneck uses a different expansion. Not included here.

    def __init__(self, input_channels, output_channels, stride=1,
                 act_fn=None):
        super(ResidualBlock, self).__init__()
        self.act_fn = nn.ReLU() if act_fn is None else act_fn

        self.conv1 = nn.Conv2d(input_channels, output_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(output_channels)
        self.conv2 = nn.Conv2d(output_channels, output_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(output_channels)
        self.convnet = nn.Sequential(self.conv1, self.bn1, self.act_fn,
                                     self.conv2, self.bn2)

        # The logic here is actually backwards:
        # if our stride is 1 and input_channels == output_channels, we use identity.
        # Otherwise we use Conv2d with k = 1 and the input stride.
        self.shortcut = nn.Sequential()
        if stride != 1 or input_channels != output_channels:
            # Adjusting if the dimensionality changes after the conv networks.
            self.shortcut = nn.Sequential(
                nn.Conv2d(input_channels, output_channels,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(output_channels)
            )

    def forward(self, x):
        out = self.convnet(x) + self.shortcut(x)
        out = self.act_fn(out)
        return out


class ResidualLayer(nn.Module):
    def __init__(self, Block, num_blocks: int,
                 input_channels: int, output_channels: int,
                 stride=1, act_fn=None):
        super(ResidualLayer, self).__init__()
        self.act_fn = nn.ReLU() if act_fn is None else act_fn

        layers = []
        layers.append(Block(input_channels, output_channels,
                            stride=stride, act_fn=act_fn))
        # Subsequent blocks have stride=1.
        for _ in range(1, num_blocks):
            layers.append(Block(output_channels, output_channels,
                                act_fn=act_fn))

        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)


class ResNet(nn.Module):
    """
    This is a ResNet. It is composed of a TRUNK and LAYERS.
    Each LAYER is composed of several BLOCKS.
    """
    resnet_act_fn = nn.ReLU()
    block_act_fn = None

    def __init__(self, block, num_blocks, num_classes=10,
                 image_channels=3, channels=64):
        super(ResNet, self).__init__()
        if self.block_act_fn is None:
            self.block_act_fn = nn.ReLU()

        # Note: Different block structures aren't supported just yet.
        # They can be hacked in by just multiplying the linear output dimension by 4.
        assert(len(num_blocks) == 3)
        self.channels = channels

        # k = 3, p = 1
        self.conv1 = nn.Conv2d(image_channels, self.channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(self.channels)
        layer_list = []

        for i, block_count in enumerate(num_blocks):
            # The first layer is 32x32, same as the input.
            # Then subsequent layers halve it: 16x16, 8x8, 4x4.
            if i == 0:
                block_stride = 1
                output_channels = self.channels
            else:
                block_stride = 2
                output_channels = 2 * self.channels

            layer_list.append(ResidualLayer(
                ResidualBlock, block_count, self.channels, output_channels,
                stride=block_stride, act_fn=self.block_act_fn
            ))
            self.channels = output_channels

        self.layers = nn.Sequential(*layer_list)
        # In 3-blocks, this should be 4 * self.channels.
        self.linear = nn.Linear(4*self.channels, num_classes)

    @property
    def trunk(self):
        return nn.Sequential(self.conv1, self.bn1, self.resnet_act_fn)

    @property
    def parameter_count(self):
        return sum(p.numel() for p in self.parameters())

    def forward(self, x):
        # Trunk is the first conv layer, batch, and activation function.
        out = self.trunk(x)
        out = self.layers(out)
        # Hyperparameter: Pooling strategy.
        # out = F.max_pool2d(out, 4)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


class ELUResNet(ResNet):
    resnet_act_fn = nn.ELU()
    block_act_fn = nn.ELU()

class SiLUResNet(ResNet):
    resnet_act_fn = nn.SiLU()
    block_act_fn = nn.SiLU()


class FatResNet(ResNet):
    """ResNet that uses a 5x5 kernel at the beginning."""
    def __init__(self, *args, image_channels=3, channels=64, **kwargs):
        super(FatResNet, self).__init__(*args,
                                        image_channels=image_channels,
                                        channels=channels,
                                        **kwargs)
        self.conv1 = nn.Conv2d(image_channels, channels,
                               kernel_size=5, stride=1, padding=2, bias=False)
        self.bn1 = nn.BatchNorm2d(channels)

In [None]:
device = torch.device("mps")

### SiLu ResNet

* Residual Block [5,4,3]
* SiLU activation function
* channels [64,128,256]
* kernal = 3
* stride = [1, 2, 2]
* padding = 1


In [None]:
net2 = SiLUResNet(ResidualBlock, [5, 4, 3], channels=64).to(device)

net2.parameter_count

4778826

In [None]:
from torchsummary import summary

summary(net2, (3,32,32))

### Training

In [None]:
import time

def train_model(model, trainloader, testloader, criterion, optimizer, num_epochs=10):
    device = next(model.parameters()).device

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        total = 0
        correct = 0

        start_time = time.time()
        for i, (images, labels) in enumerate(trainloader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(trainloader)
        train_acc = 100 * correct / total

        model.eval()  # eval
        with torch.no_grad():
            total = 0
            correct = 0
            for images, labels in testloader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        test_acc = 100 * correct / total
        elapsed_time = time.time() - start_time

        print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Train Loss: {train_loss:.4f}, '
              f'Train Accuracy: {train_acc:.2f}%, '
              f'Test Accuracy: {test_acc:.2f}%, '
              f'Time: {elapsed_time:.2f}s')




In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net2.parameters(), lr=0.001)
num_epochs = 50

In [None]:
#train_model(net2, trainloader, testloader, criterion, optimizer, num_epochs=num_epochs)

### Testing

In [None]:
import pickle
import os

current_directory = os.getcwd()
print("Current directory:", current_directory)

file_path = './cifar_test_nolabels.pkl'
if os.path.exists(file_path):
    print('file does exist')
else:
    print("File does not exist.")



def unpickle(file):
    if not os.path.exists(file):
        print(f"File {file} not found.")
        return None

    # Check the file size
    file_size = os.path.getsize(file)
    print(f"File size of {file}: {file_size} bytes")
    if file_size == 0:
        print("The file is empty.")
        return None

    with open(file, 'rb') as fo:
        data = pickle.load(fo, encoding='bytes')
    return data

data = unpickle('./cifar_test_nolabels.pkl')
if data is not None:
    print("File loaded successfully")



Current directory: /Users/ianadler/Desktop/Py Prog/Deep Learning/pytorch-cifar
file does exist
File size of ./cifar_test_nolabels.pkl: 30800227 bytes
File loaded successfully


In [None]:
# Load and preprocess the unlabeled data
images = torch.tensor(data[b'data']).float()
images = images.view(-1, 3, 32, 32)
images = images / 255.0

# Normalize images as done during training
images = transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))(images)

In [None]:
net2cpu =  net2.to(torch.device('cpu'))
# Predict
net2cpu.eval()
with torch.no_grad():
    images = images.to(torch.device('cpu'))
    outputs = net2cpu(images)
    _, predicted = torch.max(outputs, 1)

# Save predictions to CSV
predictions = predicted.cpu().numpy()
ids = np.arange(len(predictions))
submission = pd.DataFrame({'ID': ids, 'Label': predictions})
submission.to_csv('predictions.csv', index=False)
print('csv prediction saved')

