In [30]:
import torch
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.metrics import accuracy_score

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = None
        self.fc_input_features = None
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 128)
        self.fc5 = nn.Linear(128, 10)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.pool(F.relu(self.bn4(self.conv4(x))))

        x = x.view(x.size(0), -1)
        if self.fc_input_features is None:
            self.fc_input_features = x.size(1)
            self.fc1 = nn.Linear(self.fc_input_features, 1024).to(x.device)

        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.dropout(x)
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        output = F.log_softmax(x, dim=1)
        return output


def train(model, optimizer, train_dataloader, device):
    model.train()
    for data, target in train_dataloader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()



def test(model, test_dataloader, device):
    model.eval()
    predictions = []
    with torch.no_grad():
        for data in test_dataloader:
            data = data[0].to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            predictions.extend(pred.cpu().numpy())
    return np.array(predictions).flatten()

'''def cross_validate(X, y, Xtest, ytest, n_splits=5, n_epochs=10, batch_size=32, lr=0.001, weight_decay=1e-5, patience=5):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    k_fold = KFold(n_splits=n_splits, shuffle = True, random_state=42)
    accuracies = []

    for fold, (train_idx, val_idx) in enumerate(k_fold.split(X)):
        X_train, y_train = X[train_idx], y[train_idx]
        X_val, y_val = X[val_idx], y[val_idx]

        #train model on current fold's training data
        model = Net().to(device)
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

        best_val_loss = np.inf
        bad_epochs = 0

        for epoch in range(n_epochs):
            train(model, optimizer, DataLoader(TensorDataset(torch.Tensor(X_train), torch.LongTensor(y_train)), batch_size=batch_size, shuffle=True), device, epoch)
            prediction_val = classify(X_val, model)
            val_loss = accuracy_score(y_val, prediction_val)

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                bad_epochs = 0
            else:
                bad_epochs += 1

            if bad_epochs >= patience:
                break

        #test model on current fold's validation data

        yhat = classify(Xtest, model)
        test_accuracy = accuracy_score(ytest, yhat)
        accuracies.append(test_accuracy)
        print(f"Fold {fold + 1} Test Accuracy: {test_accuracy}")

    print(f"Average Accuracy: {np.mean(accuracies)}")
    #return np.mean(accuracies)'''

def cross_validate(X, y, n_splits=5, n_epochs=10, batch_size=32, lr=0.001, weight_decay=1e-5, patience=3):
    device = "cuda" if torch.cuda.is_available() else "cpu"

    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)


    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"Training fold {fold+1}/{n_splits}")


        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]


        X_train = X_train.reshape(-1, 1, 28, 84)
        X_val = X_val.reshape(-1, 1, 28, 84)
        y_train = y_train.astype(int)
        y_val = y_val.astype(int)

        train_dataset = TensorDataset(torch.Tensor(X_train), torch.LongTensor(y_train))
        val_dataset = TensorDataset(torch.Tensor(X_val), torch.LongTensor(y_val))

        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        model = Net().to(device)
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

        best_val_loss = float('inf')
        bad_epochs = 0
        best_model_wts = None

        #train model
        for epoch in range(n_epochs):
            model.train()
            running_loss = 0.0
            for batch_idx, (data, target) in enumerate(train_dataloader):
                data, target = data.to(device), target.to(device)
                optimizer.zero_grad()
                output = model(data)
                loss = F.nll_loss(output, target)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()

            avg_train_loss = running_loss / len(train_dataloader)
            print(f"Epoch {epoch+1}/{n_epochs}, Training Loss: {avg_train_loss:.4f}")

            #validate model
            model.eval()
            val_loss = 0.0
            with torch.no_grad():
                for data, target in val_dataloader:
                    data, target = data.to(device), target.to(device)
                    output = model(data)
                    loss = F.nll_loss(output, target)
                    val_loss += loss.item()

            avg_val_loss = val_loss / len(val_dataloader)
            print(f"Epoch {epoch+1}/{n_epochs}, Validation Loss: {avg_val_loss:.4f}")

            #early stopping
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                bad_epochs = 0
                best_model_wts = model.state_dict()
            else:
                bad_epochs += 1
                print(f"Epoch {epoch+1}: Validation loss did not improve.")


            if bad_epochs >= patience:
                print(f"Early stopping at epoch {epoch+1} due to no improvement.")
                break


        model.load_state_dict(best_model_wts)


    return model


def learn(X, y, n_epochs=10, batch_size=32, lr=0.001, weight_decay=1e-5):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    X = X.reshape(-1, 1, 28, 84)
    y = y.astype(int)

    dataset = TensorDataset(torch.Tensor(X), torch.LongTensor(y))
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    model = Net().to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

    for epoch in range(n_epochs):
        train(model, optimizer, dataloader, device)

    return model


def classify(Xtest, model):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    Xtest = Xtest.reshape(-1, 1, 28, 84)
    test_dataset = TensorDataset(torch.Tensor(Xtest))
    test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    return test(model, test_dataloader, device)



In [6]:
def prepareData(file_name):
    train_data = np.loadtxt(file_name, delimiter=',')
    y = train_data[:, 0]
    X = train_data[:, 1:] / 255.0
    X = X.reshape(-1, 1, 28, 84)
    tensor_X = torch.Tensor(X)
    tensor_y = torch.LongTensor(y)
    dataset = TensorDataset(tensor_X, tensor_y)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
    return dataset, dataloader

In [26]:
def main():

    dataset, _ = prepareData('sample_data/A4train.csv')
    test_dataset, test_dataloader = prepareData('sample_data/A4val.csv')


    X, y = dataset.tensors[0].numpy(), dataset.tensors[1].numpy()
    Xtest, ytest = test_dataset.tensors[0].numpy(), test_dataset.tensors[1].numpy()

    model = cross_validate(X, y)
    yhat = classify(Xtest, model)
    print("Cross Validate Predictions:", yhat)
    print("Cross Validate Accuracy:", accuracy_score(ytest, yhat))

    model = learn(X, y)

    # Classify test data
    yhat = classify(Xtest, model)
    print("CNN Predictions:", yhat)
    print("CNN Accuracy:", accuracy_score(ytest, yhat))





In [31]:
main()

Training fold 1/5
Epoch 1/10, Training Loss: 1.9359
Epoch 1/10, Validation Loss: 1.5653
Epoch 2/10, Training Loss: 0.9678
Epoch 2/10, Validation Loss: 1.6677
Epoch 2: Validation loss did not improve.
Epoch 3/10, Training Loss: 0.5044
Epoch 3/10, Validation Loss: 0.4103
Epoch 4/10, Training Loss: 0.3315
Epoch 4/10, Validation Loss: 0.3724
Epoch 5/10, Training Loss: 0.2421
Epoch 5/10, Validation Loss: 0.2887
Epoch 6/10, Training Loss: 0.1903
Epoch 6/10, Validation Loss: 0.3175
Epoch 6: Validation loss did not improve.
Epoch 7/10, Training Loss: 0.1713
Epoch 7/10, Validation Loss: 0.4066
Epoch 7: Validation loss did not improve.
Epoch 8/10, Training Loss: 0.1269
Epoch 8/10, Validation Loss: 0.2476
Epoch 9/10, Training Loss: 0.1281
Epoch 9/10, Validation Loss: 0.2800
Epoch 9: Validation loss did not improve.
Epoch 10/10, Training Loss: 0.1038
Epoch 10/10, Validation Loss: 0.3410
Epoch 10: Validation loss did not improve.
Training fold 2/5
Epoch 1/10, Training Loss: 1.7663
Epoch 1/10, Valid