In [29]:
import os
import numpy as np
import torch
import torch.utils.data
from typing import Sequence
from sklearn.model_selection import train_test_split


In [30]:
data_dir = 'input'

def load_data(which: str):
    """
    Loads data from a csv file
    :param which: str
        Which data to load, train or test
    """
    assert which in ['train', 'test']
    
    if which == 'train':
        data = np.loadtxt(fname=os.path.join(data_dir, 'train_data.csv'), delimiter=',', skiprows=1)
        labels = np.loadtxt(fname=os.path.join(data_dir, 'train_labels.csv'), delimiter=',', skiprows=1)
        return data, labels
    elif which == 'test':
        data = np.loadtxt(fname=os.path.join(data_dir, 'test_data.csv'), delimiter=',', skiprows=1)
        return data
    
def save_prediction(prediction: Sequence[int], 
                    path: str = 'submission.csv'):
    """
    Saves a sequence of predictions into a csv file with additional index column
    :param prediction: Sequence of ints
        Predictions to save
    :param path: str
        Path to a file to save into
    """
    
    pred_with_id = np.stack([np.arange(len(prediction)), prediction], axis=1)
    np.savetxt(fname=path, X=pred_with_id, fmt='%d', delimiter=',', header='id,label', comments='')

In [31]:
train_data, train_labels = load_data(which='train')
print(train_data.shape)
print(train_labels.shape)

test_data = load_data(which='test')
print(test_data.shape)

pred = np.random.randint(0, 10, size=test_data.shape[0])

save_prediction(prediction=pred, path='random.csv')

(59580, 342)
(59580,)
(10000, 342)


In [47]:
class TorchNetwork(torch.nn.Module):
    """
    Simple 2-hidden layer non-linear neural network
    """
    
    def __init__(self, input_size, activation, n_classes, dropout_rate=0.33):
        super(TorchNetwork, self).__init__()
        self.h1 = torch.nn.Sequential(torch.nn.Linear(input_size, 500),
                                      torch.nn.Dropout(p=dropout_rate))
        self.h2 = torch.nn.Sequential(torch.nn.Linear(500, 300),
                                      torch.nn.Dropout(p=dropout_rate))
        self.out = torch.nn.Linear(300, n_classes)
        self.act = activation
        
    def __call__(self, x):
        out_h1 = self.act(self.h1(x))
        out_h2 = self.act(self.h2(out_h1))
        return self.out(out_h2)

In [71]:
from torch.optim import Adam
from torch.nn.functional import cross_entropy, relu

# some hyperparams
batch_size: int = 64
epoch: int = 10
lr: float = 0.01


X_train, X_test, y_train, y_test = train_test_split(train_data, train_labels, test_size=0.15, random_state=42)
    
# prepare data loaders, base don the already loaded datasets
train_tensor_x = torch.stack([torch.Tensor(i) for i in X_train])
train_tensor_y = torch.stack([torch.LongTensor(np.array(i)) for i in y_train])

train_dataset = torch.utils.data.TensorDataset(train_tensor_x, train_tensor_y)

test_tensor_x = torch.stack([torch.Tensor(i) for i in X_test])
test_tensor_y = torch.stack([torch.LongTensor(np.array(i)) for i in y_test])

test_dataset = torch.utils.data.TensorDataset(test_tensor_x, test_tensor_y)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

input_size = 342
n_classes = 10

# initialize the model
model: TorchNetwork = TorchNetwork(input_size=input_size, activation=relu, n_classes=n_classes)

# initialize the optimizer
optimizer: torch.optim.Optimizer = Adam(model.parameters())

# training loop
for e in range(epoch):
    for i, (x, y) in enumerate(train_loader):
        
        # reset the gradients from previouis iteration
        optimizer.zero_grad()
        # pass through the network
        output: torch.Tensor = model(x)
        # calculate loss
        loss: torch.Tensor = cross_entropy(output, y)
        # backward pass thorught the network
        loss.backward()
        # apply the gradients
        optimizer.step()
        
        # log the loss value
        if (i + 1) % 100 == 0:
            print(f"Epoch {e} iter {i+1}/{len(train_data) // batch_size} loss: {loss.item()}", end="\r")
            
    # at the end of an epoch run evaluation on the test set
    with torch.no_grad():
        # initialize the number of correct predictions
        correct: int = 0
        for i, (x, y) in enumerate(test_loader):
            # pass through the network
            output: torch.Tensor = model(x)
            # update the number of correctly predicted examples
            p = torch.argmax(output, dim=1)
            correct += torch.sum((p == y).float())
        print(f"\nTest accuracy: {correct / len(y_test)}")
            
            
# this is your test       
assert correct / len(y_test) > 0.79, "Subject to random seed you should be able to get >79% accuracy"

Epoch 0 iter 700/930 loss: 0.6838169097900391
Test accuracy: 0.7974711656570435
Epoch 1 iter 700/930 loss: 0.53925353288650517
Test accuracy: 0.8182835578918457
Epoch 2 iter 700/930 loss: 0.50870561599731454
Test accuracy: 0.823206901550293
Epoch 3 iter 700/930 loss: 0.5006780028343201
Test accuracy: 0.8314870595932007
Epoch 4 iter 700/930 loss: 0.45400130748748785
Test accuracy: 0.8340606689453125
Epoch 5 iter 700/930 loss: 0.39808204770088196
Test accuracy: 0.8276826739311218
Epoch 6 iter 700/930 loss: 0.46072721481323245
Test accuracy: 0.8301443457603455
Epoch 7 iter 700/930 loss: 0.36390855908393864
Test accuracy: 0.8302562236785889
Epoch 8 iter 700/930 loss: 0.26639658212661743
Test accuracy: 0.8339487314224243
Epoch 9 iter 700/930 loss: 0.33843779563903815
Test accuracy: 0.8312633037567139


In [72]:
pred = []
with torch.no_grad():
    for x in test_data:
        output = model(torch.Tensor(x))
        p = torch.argmax(output)
        pred.append(p)


save_prediction(prediction=pred, path='prediction.csv')