In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
import pandas as pd

In [None]:
class ImageDataset(Dataset):
    
    def __init__(self, data_csv, train = True , img_transform=None):
        """
        Dataset init function
        
        INPUT:
        data_csv: Path to csv file containing [data, labels]
        train: 
            True: if the csv file has [labels,data] (Train data and Public Test Data) 
            False: if the csv file has only [data] and labels are not present.
        img_transform: List of preprocessing operations need to performed on image. 
        """
        
        self.data_csv = data_csv
        self.img_transform = img_transform
        self.is_train = train
        
        data = pd.read_csv(data_csv, header=None)
        if self.is_train:
            images = data.iloc[:,1:].to_numpy()
            labels = data.iloc[:,0].astype(int)
        else:
            images = data.iloc[:,:].to_numpy()
            labels = None
        
        self.images = images
        self.labels = labels
        print("Total Images: {}, Data Shape = {}".format(len(self.images), images.shape))
        
    def __len__(self):
        """Returns total number of samples in the dataset"""
        return len(self.images)
    
    def __getitem__(self, idx):
        """
        Loads image of the given index and performs preprocessing.
        
        INPUT: 
        idx: index of the image to be loaded.
        
        OUTPUT:
        sample: dictionary with keys images (Tensor of shape [1,C,H,W]) and labels (Tensor of labels [1]).
        """
        image = self.images[idx]
        image = np.array(image).astype(np.uint8).reshape((32, 32, 3), order= "F")
        
        if self.is_train:
            label = self.labels[idx]
        else:
            label = -1
        
        image = self.img_transform(image)
        
        sample = {"images": image, "labels": label}
        return sample

In [None]:
class NN(nn.Module):
    def __init__(self,n_h1,n_h2,p):
        super(NN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3,stride=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.maxpool = nn.MaxPool2d(2,stride=2)
        self.conv2 = nn.Conv2d(32, 64, 3,stride=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 512, 3,stride=1)
        self.bn3 = nn.BatchNorm2d(512)
        self.conv4 = nn.Conv2d(512, 1024, 2,stride=1)
        self.fc1 = nn.Linear(1024, n_h1)
        self.dropout = nn.Dropout(p)
        self.fc2 = nn.Linear(n_h1, n_h2)
    
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.maxpool(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.maxpool(x)
        x = F.relu(self.conv4(x))
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
# Data Loader Usage
img_transforms = transforms.Compose([transforms.ToPILImage(),transforms.ToTensor()])

train_data = "/content/drive/MyDrive/A2.2/A2.2-Data/train_data.csv"
test_data = "/content/drive/MyDrive/A2.2/A2.2-Data/public_test.csv"

# train_data = "A2.2-Data/train_data.csv"
# test_data = "A2.2-Data/public_test.csv"

train_dataset = ImageDataset(
    data_csv = train_data, 
    train=True, 
    img_transform=img_transforms
)
test_dataset = ImageDataset(
    data_csv = test_data, 
    train=True, 
    img_transform=img_transforms
)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
batch_size = 200
epochs = 5
l_r = 1e-4
NUM_WORKERS = 20
torch.manual_seed(51)
model = NN(256, 10, 0.2)
model.to(device)
loss = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr = l_r)

In [None]:
sum(p.numel() for p in model.parameters())

In [None]:
train_loader = DataLoader(dataset = train_dataset, batch_size = batch_size, shuffle=False, num_workers = NUM_WORKERS)
test_loader = DataLoader(dataset = test_dataset, batch_size = len(test_dataset), shuffle=False, num_workers = NUM_WORKERS)

In [None]:
loss_vals = []
accs = []

In [None]:
for epoch in range(epochs):
    train_loss = 0
    accu_train = 0
    print("Epoch:", epoch+1)
    for batch in train_loader:
        batch_x, batch_y = batch["images"].to(device), batch["labels"].to(device)
        y_hat = model.forward(batch_x)
        loss_val = loss(y_hat, batch_y)
        train_loss+=loss_val.item()
        
        optimizer.zero_grad()
        loss_val.backward()
        optimizer.step()

        predictions = y_hat.argmax(dim=1, keepdim=True).squeeze()
        correct = (predictions == batch_y).sum().item()
        acc = correct / len(batch_x)
        accu_train+=acc
    for batch in test_loader:
        batch_x, batch_y = batch["images"].to(device), batch["labels"].to(device)
        y_hat = model.forward(batch_x)
        predictions = y_hat.argmax(dim=1, keepdim=True).squeeze()
        correct = (predictions == batch_y).sum().item()
        acc = correct / len(batch_y)
        accs.append(acc)
    loss_vals.append(train_loss/len(train_loader))
    print("Train Loss",loss_vals[-1])
    print("Train Accuracy",accu_train/len(train_loader))
    print("Test Accuracy",accs[-1])

In [None]:
model_path = "./model.pth"
loss_path = "./loss.txt"
acc_path = "./accuracy.txt"
pred_path = "./predictions.txt"

In [None]:
torch.save(model.state_dict(), model_path)
np.savetxt(acc_path, accs)
np.savetxt(loss_path, loss_vals)

In [None]:
torch.manual_seed(51)
model2 = NN(256, 10, 0.2)
model2.to(device)
model2.load_state_dict(torch.load(model_path, map_location=device))
for batch in test_loader:
    batch_x, batch_y = batch["images"].to(device), batch["labels"].to(device)
    y_hat = model2.forward(batch_x)
    y_hat = torch.argmax(y_hat, dim=1)
    np.savetxt("./predictions.txt", y_hat.detach().cpu().numpy())

In [None]:
a = np.loadtxt("predictions.txt")
b = np.loadtxt("predictions2.txt")
c = [0 if a[i]==b[i] else 1 for i in range(len(a))]
sum(c)