In [None]:
# 设置 Kaggle Notebook 启用自动代码补全
%config Completer.use_jedi = False

In [None]:
import os
import torch

import numpy as np
import pandas as pd
import torch

import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader

# Load Data

In [None]:
# Load data from .csv by pandas
base_path = '/kaggle/input/digit-recognizer'
train_data = pd.read_csv(os.path.join(base_path, 'train.csv'))
submit_data = pd.read_csv(os.path.join(base_path, 'test.csv'))

In [None]:
# Transforms to torch.Tensor
train_data = torch.tensor(train_data.values)
submit_data = torch.tensor(submit_data.values)

In [None]:
print(train_data.shape, submit_data.shape)

In [None]:
# Split training data to: 80% for training; 20% for testing;
train_size = int(0.8 * len(train_data))
train_features, train_labels = train_data[:train_size, 1:], train_data[:train_size, 0]
test_features, test_labels = train_data[train_size:, 1:], train_data[train_size:, 0]

# Pretreating submit data (normalize & type_conversion)
submit_data = torch.as_tensor(submit_data / 255.0, dtype=torch.float32)

In [None]:
print(train_features.shape, train_labels.shape)
print(test_features.shape, test_labels.shape)

In [None]:
# Build custom datasets
class SelfBuildDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.as_tensor(features / 255.0, dtype=torch.float32)
        self.labels = torch.as_tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        X = self.features[idx]
        y = self.labels[idx]
        return X, y

train_dataset = SelfBuildDataset(train_features, train_labels)
test_dataset = SelfBuildDataset(test_features, test_labels)

In [None]:
# Build DataLoader
batch_size = 512
num_loader_workers = 4

train_data_iter = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_loader_workers)
test_data_iter = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_loader_workers)

# Design Model Architecture

In [None]:
# Model Architecture
def build_mlp(layer_dims, activation=None):
    layers = []
    for i in range(len(layer_dims) - 1):
        layers.append(nn.Linear(layer_dims[i], layer_dims[i + 1]))
        if i < (len(layer_dims) - 1) - 1 and activation != None:
            layers.append(activation())
    return nn.Sequential(*layers)

# linear_neural_network = build_mlp([784, 10])
linear_neural_network = build_mlp([784, 64, 10], activation=nn.ReLU)

In [None]:
# Init model parameters
def init_weights(layer):
    if type(layer) == nn.Linear:
        nn.init.normal_(layer.weight, mean=0, std=1e-2)
        nn.init.zeros_(layer.bias)

_ = linear_neural_network.apply(init_weights)

# Train Model

In [None]:
# Set loss function
cross_entropy = nn.CrossEntropyLoss(reduction='mean')

In [None]:
# Set optimizer
optimizer = torch.optim.SGD

In [None]:
# Set hyper-parameters
num_epochs = 30
lr = 1e-2
net = linear_neural_network
loss = cross_entropy
trainer = optimizer(net.parameters(), lr=lr)

In [None]:
# Start training
def right_predictions_num(y_hat, y):
    y_hat = y_hat.argmax(axis=1)
    cmp = (y_hat.type(y.dtype) == y)
    return float(cmp.type(y.dtype).sum())

def evaluate_acc_in_test_dataset(net, test_data_iter):
    net.eval()   # Set model to evaluating mode
    trace_data = torch.tensor([0, 0]).type(torch.float32)   # num[right_predict]; num[test_sample];
    with torch.no_grad():
        for X, y in test_data_iter:
            trace_data += torch.tensor([right_predictions_num(net(X), y), y.numel()])
    return trace_data[0] / trace_data[1]

def train_epoch(net, train_data_iter, loss, trainer):
    net.train()    # Set model to training mode
    trace_data = torch.tensor([0, 0, 0]).type(torch.float32)    # sum[loss]; sum[acc]; num[sample];
    for X, y in train_data_iter:
        y_hat = net(X)
        l = loss(y_hat, y)
        trainer.zero_grad()
        l.backward()
        trainer.step()
        trace_data += torch.tensor([float(l) * y.numel(), right_predictions_num(y_hat, y), y.numel()])
    return trace_data[0] / trace_data[2], trace_data[1] / trace_data[2]    # average[loss]; acc;

def train(net, train_data_iter, test_data_iter, loss, trainer, num_epochs):
    for epoch in range(num_epochs):
        train_loss, train_acc = train_epoch(net, train_data_iter, loss, trainer)
        test_acc = evaluate_acc_in_test_dataset(net, test_data_iter)
        print(f"epoch {epoch + 1}, train_loss {train_loss}, train_acc {train_acc}, test_acc {test_acc}")

train(net, train_data_iter, test_data_iter, loss, trainer, num_epochs)

# Generate Submission

In [None]:
# Model prediction
def predict(net, input_tensor):
    net.eval()
    with torch.no_grad():
        rst = net(input_tensor).argmax(axis=1).type(torch.long)
    return rst

prediction_rst = predict(net, submit_data).reshape(-1, 1)
print(prediction_rst.shape)

In [None]:
# Generate idxs and concat it with rst
idxs = torch.arange(1, len(prediction_rst) + 1).reshape(-1, 1)
prediction_rst = torch.cat((idxs, prediction_rst), 1)
print(prediction_rst.shape)

In [None]:
# Save result
np_prediction_rst = prediction_rst.numpy()
df = pd.DataFrame(np_prediction_rst, columns=['ImageId', 'Label'])
df.to_csv('submission.csv', index=False)