In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch import optim
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split


In [None]:
class DigitDataset(Dataset):
    def __init__(self, dataFrame: pd.DataFrame, targetColumn: str, transform= False):
        self.targetColumn = targetColumn
        self.transorm = transform
        
        if targetColumn != None:
            self.X = dataFrame.drop(columns=[targetColumn]).to_numpy(dtype=np.float32) / 255
            self.Y = dataFrame[targetColumn].to_numpy(dtype=np.int64)
        else:
            self.X = dataFrame.to_numpy(dtype=np.float32) / 255
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        if self.targetColumn != None:
            if self.transorm:
                return self.X[idx].reshape(1,28,28), self.Y[idx]
            else:
                return self.X[idx], self.Y[idx]
        else:
            if self.transorm:
                return self.X[idx].reshape(1,28,28)
            else:
                return self.X[idx]

In [None]:
df = pd.read_csv('data/train.csv')
df_train, df_val = train_test_split(df, test_size=0.2, random_state=1337)

df_test = pd.read_csv('data/test.csv')
device = 'mps'
df_train.shape, df_val.shape

In [None]:
import numpy as np
def array_to_image(array):
    array = array.numpy()
    array = array.flatten()  # Ensure it's 1D
    array = array[:784]  # Keep only 784 elements if there is an extra column
    array = np.reshape(array, (28, 28))  # Reshape to 28x28

    plt.imshow(array, cmap="gray")  # Display image in grayscale
    plt.axis("off")  # Hide axes
    plt.show()


In [None]:
BATCH_SIZE = 64
train_dataset = DigitDataset(dataFrame=df_train, targetColumn='label', transform=True)
train_dataloader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)

val_dataset = DigitDataset(dataFrame=df_val, targetColumn='label', transform=True)
val_dataloader = DataLoader(dataset=val_dataset, batch_size=BATCH_SIZE, shuffle=False)

test_dataset = DigitDataset(dataFrame=df_test, targetColumn=None, transform=True)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
x_dummy, y_dummy = next(iter(train_dataloader))
array_to_image(x_dummy[0]), y_dummy[0].item()

In [None]:
f'We have a total of {len(train_dataloader)} Batches in the Train Dataset'

In [None]:
torch.manual_seed(1337)
class BaseLineModel(nn.Module):
    def __init__(self, input_features, hidden_units, out_features):
        super().__init__()
        self.layer_block_1 = nn.Sequential(
            nn.Linear(input_features, hidden_units),
            nn.ReLU()
        )
        self.layer_block_2 = nn.Sequential(
            nn.Linear(hidden_units, hidden_units),
            nn.ReLU()
        )
        self.layer_output = nn.Sequential(
            nn.Linear(hidden_units, out_features),
        )
    def forward(self, x):
        x = self.layer_block_1(x)
        x = self.layer_block_2(x)
        x = self.layer_output(x)
        return x
    
    def predict(self, x_batch):
        preds = []
        for x in x_batch:
            logits = self(x)
            pred = torch.argmax(F.softmax(logits, dim=-1))
            preds.append(pred)
        return torch.tensor(preds, dtype=torch.int32)


input_dim = x_dummy.shape[1]
hidden_dim = 256
output_dim = 10
#b_model = BaseLineModel(input_dim, hidden_dim, output_dim)
#b_model.predict(x_dummy)

In [None]:
class CustomCNNModel(nn.Module):
  def __init__(self, in_features, hidden_units, out_features):
      super().__init__()
      self.conv_block_1 = nn.Sequential(
        nn.Conv2d(
           in_channels=in_features,
           out_channels=hidden_units,
           kernel_size=(3,3),
           padding=1,
           stride=1
        ),
        nn.ReLU(),
        nn.Conv2d(
           in_channels=hidden_units,
           out_channels=hidden_units,
           kernel_size=(3,3),
           padding=1,
           stride=1
        ),
        nn.ReLU(),
        nn.MaxPool2d(
           kernel_size=2,
           stride=2 # Default Stride size is equal to Kernel Size)
        )
      )
      self.conv_block_2 = nn.Sequential(
        nn.Conv2d(
           in_channels=hidden_units,
           out_channels=hidden_units * 4,
           kernel_size=(3,3),
           padding=1,
           stride=1
        ),
        nn.ReLU(),
        nn.Conv2d(
           in_channels=hidden_units * 4,
           out_channels=hidden_units,
           kernel_size=(3,3),
           padding=1,
           stride=1
        ),
        nn.ReLU(),
        nn.MaxPool2d(
           kernel_size=2,
           stride=2 # Default Stride size is equal to Kernel Size)
        )
      )

      self.clasifier = nn.Sequential(
         nn.Flatten(),
         nn.Linear(
            # Calculate and set in_features, depending on the data shape.
            # As the output of Thrid conv block has a shape of  torch.Size([32, 64, 28, 28]).
            # We will simply multiply in_features value with 28 * 28.
            in_features=hidden_units * 7 * 7,
            out_features=out_features,
          )
      )

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    #print(f'Input Shape : {x.shape}')
    x = self.conv_block_1(x)
    #print(f'After First Conv Block Shape : {x.shape}')
    x = self.conv_block_2(x)
    #print(f'After Second Conv Block Shape : {x.shape}')
    x = self.clasifier(x)
    #print(f'After Classifier Shape : {x.shape}')
    return x
  

In [None]:
cnn_model = CustomCNNModel(1, hidden_dim, output_dim)
cnn_model(x_dummy)

In [None]:
cnn_model

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(cnn_model.parameters(),lr= 0.0001)



In [None]:
log = cnn_model(x_dummy)
log

In [None]:
def train_step(model, dataloader, loss_fn, optimizer, device=device):
    train_loss, train_acc = 0, 0
    model.train()
    
    for batch, (X, y) in enumerate(dataloader):
        # Forward Pass
        X, y = X.to(device), y.to(device)
        logits = model(X)
        loss = loss_fn(logits, y)
        optimizer.zero_grad()
        #print(logits.shape)
        train_loss += loss.item()
        pred = torch.argmax(F.softmax(logits, dim=-1), dim=-1)
        #print(pred.shape)
        train_acc += accuracy_score(y_true=y.to('cpu'), y_pred=pred.to('cpu'))
        loss.backward()
        optimizer.step()

        

    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    return train_loss, train_acc

def val_step(model, dataloader, loss_fn, device=device):
    test_loss, test_acc = 0, 0
    model.eval()
    
    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            # Forward Pass
            X, y = X.to(device), y.to(device)
            logits = model(X)
            loss = loss_fn(logits, y)
            
            test_loss += loss.item()
            pred = torch.argmax(F.softmax(logits, dim=-1), dim=-1)
            #print(pred.shape)
            test_acc +=  accuracy_score(y_true=y.to('cpu'), y_pred=pred.to('cpu'))
            

        test_loss /= len(dataloader)
        test_acc /= len(dataloader)
        
        return test_loss, test_acc


In [None]:
from torch.optim.lr_scheduler import ReduceLROnPlateau 

scheduler = ReduceLROnPlateau(optimizer, 'min',patience=5)

epochs = 100
for epoch in range(1, epochs +1):
    print(f'Epoch: {epoch}------\n')
    cnn_model = cnn_model.to(device)
    train_loss , train_acc = train_step(cnn_model, train_dataloader, loss_fn, optimizer, device=device)
    val_loss , val_acc = val_step(cnn_model, val_dataloader, loss_fn, device=device)
    scheduler.step(val_loss)  # Adjust LR based on validation loss
    print(f"Training Loss: {train_loss} | Training Accuracy: {train_acc * 100:.2f}%")
    print(f"Validation Loss: {val_loss} | Validation Accuracy: {val_acc * 100:.2f}%")
    print(f"Current LR: {optimizer.param_groups[0]['lr']}")  # Check the updated LR
    torch.save(cnn_model.state_dict(),f'models/cnn_epoch_{epoch}.pth')



In [None]:
# Load the best Performed Model
#cnn_model = CustomCNNModel(1, hidden_dim, output_dim)
#cnn_model.load_state_dict(torch.load('models/cnn_epoch_47.pth', weights_only=True, map_location='mps:0'))  # Choose 
#cnn_model.to(device)

In [None]:
import numpy as np

def predictTest(model: CustomCNNModel, dataloader):
    preds = []
    model = model.to(device)
    model.eval()
    with torch.inference_mode():
        for batch, (X)  in enumerate(dataloader):
            X = X.to(device)
            logits = model(X)
            pred = torch.argmax(F.softmax(logits, dim=-1), dim=-1)
            preds.append(pred.to('cpu'))  # Append batch predictions
    return np.concatenate(preds, axis=0).astype(int).tolist()  # Convert to int



In [None]:
validation_loss, validation_accuracy = val_step(model=cnn_model, dataloader=val_dataloader, loss_fn=loss_fn, device=device)
print(f'Validation Loss: {val_loss} | Validation Accuracy: {val_acc * 100:.2f}%')

In [None]:


test_pred = predictTest(model=cnn_model, dataloader=test_dataloader)
df_results = pd.DataFrame({'Label': test_pred})

df_results.index = df_results.index + 1
df_results.index.name = 'ImageId'

# Display the first few rows
print(df_results.head())

df_results.to_csv('cnn_results_4.csv')