# Digit Classification using neural networks 

In [30]:
import os
import torch

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix

from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
from torchvision.io import read_image

Reading the data


In [31]:
def load_data( data_path ):
    '''
    Loads the data from the specified path.
    
    Inputs:
    data_path: path to the directory containing the data files, each file containing measurement of a digit
    in a N x 3 matrix. 
    
    Returns:
    data: a list of numpy arrays, each array containing the measurement of a digit.
    labels: a list of labels corresponding to the data.
    '''
    files = os.scandir( data_path )

    data = []
    labels = []
    for file in files:
        filename = file.name
        if filename.endswith('.csv'):
            number_data = np.loadtxt(data_path + '/' + filename, delimiter=',')
            label = int( filename.split('_')[1] )
            
            data.append( number_data )
            labels.append( label )      
                    
    return data, np.array(labels)

data_path = os.getcwd() + '/data/digits_3d/training_data'
data, labels = load_data( data_path )

Preprocess

In [32]:
def preprocess_data( data, N_interp = 128 ):
    '''
    Preprocesses the data by interpolating to equal lengths and standardizing.
    
    Inputs:
    Data: list of numpy arrays, each array containing the measurement of a digit.
    N_interp: number of points to interpolate the variable-length data to.
    
    Returns:
    data: a list of numpy arrays, each array containing the measurement of a digit after preprocessing.
    '''
    scaler = StandardScaler()
    
    for ii, sample in enumerate( data ):
        N_sample, dims = sample.shape
        
        tt = np.linspace(0, 1, N_sample)
        tt_interp = np.linspace(0, 1, N_interp)
        
        # First, interpolation (for each dimension separately)
        sample_interp = np.zeros( (N_interp, dims) )
        for dim in range( dims ):          
            dim_interp = np.interp( tt_interp, tt, sample[:, dim] )
            sample_interp[:, dim] = dim_interp
            
        # Then standardization
        data[ii] = scaler.fit_transform( sample_interp )
        
    return data

data = preprocess_data( data )

X_train, X_test, y_train, y_test = train_test_split( data, labels, test_size=0.25, stratify = labels ) 

Create Dataset and Dataloader objects

In [33]:
class NumberDataset(Dataset):
    def __init__(self, data, labels, transform=None, target_transform=None):
        self.number_labels = labels

    def __len__(self):
        return len(self.number_labels)

    def __getitem__(self, idx):
        image = data[idx]
        label = self.number_labels[idx]
        return image, label
    
train_dataset = NumberDataset(X_train, y_train)
test_dataset = NumberDataset(X_test, y_test)

train_dataloader = DataLoader( train_dataset )
test_dataloader = DataLoader( test_dataset )

Building the network, defining loss and optimizer

In [34]:
class NumberNetwork( nn.Module ) :
    def __init__(self, input_size=3*128, num_classes=10):
        super().__init__()
        self.linear_stack = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )
 
    def forward(self, x):
        logits = self.linear_stack(x)
        return logits
    
# Get CPU or GPU 
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)
print(f"Using {device} device")
    
model = NumberNetwork( input_size=3*128, num_classes=10).to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam( model.parameters(), lr=1e-3 )

Using cuda device


Training the network

In [35]:
def train( dataloader, model, loss_fn, optimizer) :
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:

            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [36]:
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")



Epoch 1
-------------------------------


RuntimeError: mat1 and mat2 must have the same dtype, but got Double and Float