## MNIST Handwritten Digit Recognition using Astra and Pytorch

In [None]:
#Import the packages requires for the first step of Data Loader setup.

# From pip install cassandra-driver, necessary to connect to AstraDB, general rather than specific implementations used here
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# From pip install torch we need the Dataset and DataLoader types to create a loader that fetchees data from Astra in a form compatible with Pytorch
import torch
from torch.utils.data import Dataset, DataLoader

# From pip install torchvision, provides the transforms necesarry to make raw pixel values from Astra into proper image format for machine learning
import torchvision.transforms as transforms

# Already installed in Python environment, essentially matrix processing faster than python Arrays
import numpy as np

In [None]:
#Astra Pytorch Dataset Definition

# First we create a class AstraDataset Based on the existing Pytorch Dataset class. Datasets define a method for pulling particular subsets of larger data sources, 
# whether those are internal python objects, files, or external data sources like AstraDB.
class AstraDataset(Dataset):
    # In the init function we describe what arguments are necesarry to create an AstraDataset and how those become it's  internal variables.
    # * cloud_conifg: Specifies the location of the Cassandra database to connect to. Here we essentially pass the secure connect bundle, which contains the necesarry data.
    # * auth_provider: Provides the authentication nmecesarry to connect to the specified database. We create a PlainTextAuthProvider object containing the id and secret from Astra that
    # ** are associated with the Database Adminitrator token we generate.
    # * keyspace: The keyspace of the table that we want to pull data from
    # * table: The name of the table that we want to pull data from.
    # * length: How much data we want to be in this particular AstraDataset. This pulls objects based on an id number in order from 0.
    # * transform: The transform function to be applied to data after it is pulled in from Astra
    def __init__(self,
                cloud_config={},
                auth_provider=None,
                keyspace="",
                table="raw_train",
                length=0,
                transform=None):
        # Here we create the session object corresponding to the Astra database from the components provided above
        self.db = Cluster(cloud=cloud_config, auth_provider=auth_provider).connect()
        self.keyspace = keyspace
        self.table = table
        self.length = length
        self.transform = transform

    # The get item function pulls data from the partiuclar row, separates it into data and label, and applies the transform to the data section.
    def __getitem__(self, index):
        # Effective query is SELECT pixels from keyspace.table WHERE id = index
        # Then .one pulls the single row from the results (since index is the full primary key, no chance of getting more than one row per index)
        # Then [0] pulls out the only column pixels, which is an array of 784 grayscale pixel darkness values from 0 at pure black to 255 at pure white
        # Then we reshape the 1D array into a 2D 28 by 28 image and divide by 255 so the pixel values then range from 0 being pure black and 1 being pure white
        # Then we apply the specifieds transforms
        # We perform a second query to pull the label which is just saved as is
        x = np.float32(np.array([float(pixel) for pixel in self.db.execute("SELECT pixels from "+self.keyspace+"."+self.table+" WHERE id = "+str(index)+";").one()[0]]).reshape(28,28)/255)
        y = self.db.execute("SELECT label from "+self.keyspace+"."+self.table+" WHERE id = "+str(index)+";").one()[0]
        if self.transform:
            x = self.transform(x)
        return x, y
    # The len function just returns the length value specified at creation, the Dataset can cause errors if there is not enough rows in the table to furnish the entire lengths worth of data
    def __len__(self):
        return self.length



In [None]:
# More imports, this time for the acutal model definition and training
# From Pytorch importing the Neural Net type for model defintions, the mathmatical funcions for it as F and the optimization functions
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# Import pickle (python built-in module) for model serialization and storage
import pickle as pkl
#Auth is custom package containing the actual details for connecting to the AstraDB
import auth

In [None]:
#Using auth module to create the cloud_config and auth_provider objects that the loaders need and creating a session so we can submit queries manually as well
cloud_config = {'secure_connect_bundle': auth.scb_path}
auth_provider = PlainTextAuthProvider(auth.auth_id, auth.auth_token)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

In [None]:
# Creating our datasets for the train set and test set, and their associated loaders, which pytorch will use to feed in data for the trianing process.
#The transforms we provide here are a composition of the toTensor tranform which takes the numpy array of data that we have and turns it into a pytorch tensor object
# and the Nornalize .1307,.3081 tranform, which ensures that the data has 0 mean and unit standard deviation
train_dataset = AstraDataset(   
                    cloud_config, 
                    auth_provider, 
                    "mnist_digits", 
                    "raw_train", 
                    100, 
                    transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,))])
                             )
train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)

test_dataset = AstraDataset(
                    cloud_config, 
                    auth_provider, 
                    "mnist_digits", 
                    "raw_test", 
                    100, 
                    transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,))])
                             )
test_loader = DataLoader(test_dataset, batch_size=10, shuffle=True)

In [None]:
# Constant associated with the training of our neural net
# Number of epochs to train the model for
n_epochs = 1
# How many examples to use for training during a single epoch
batch_size_train = 64
#How many examples to use for testing during a single epoch
batch_size_test = 1000
# How big of a change a single backprogatation step is allowed to make to the model
learning_rate = 0.01
# How much the change in model weights gets carried between backprogatiton steps
momentum = 0.5
# Logging settings
log_interval = 10

# Staring seed value, change this to get a different outcome from even a single epoch of training
random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)

# Pull one set of example data from the loader
examples = enumerate(test_loader)
# Extract one example from that set
batch_idx, (example_data, example_targets) = next(examples)
# Show the shape of the training example
print(example_data.shape)

In [None]:
#Defining the structure and forward propagation steps of our neural network
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # Defining the shapes of the individual layers used. Here we define two convolutional layers (specialized for image processing), a dropout layer, and two linear layers.
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)
    # Defining how those layers are connected and what functions are being used internal to the neruons. First the first convolutional layer, then the second followed by the dropout layer,
    # followed by linear 1 and then linear 2.
    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)

In [None]:
# Create our network object and the Optimizer that has the backprogatation setting included.
network = Net()
optimizer = optim.SGD(network.parameters(), lr=learning_rate,
                      momentum=momentum)
# These lists store tje loss value functions over the training epoch
train_losses = []
train_counter = []
test_losses = []
test_counter = [i*len(train_loader.dataset) for i in range(n_epochs + 1)]

In [None]:
# Defines our training function for a single epoch
# For everyu piece of data in the trianing loader we calculate the loss function and then do a back propagation step, before outputting our trianing loss and saving the model to Astra
def train(epoch):
    network.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            description_string = 'Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item())
            train_losses.append(loss.item())
            train_counter.append(
                (batch_idx*64) + ((epoch-1)*len(train_loader.dataset)))
            # Here we save the model to Astra. We take the network state and optimizer state dictionaries and pickle them, dumop them to a bytes object which is cast to a bytearray which
            # the cassandra connected can understand. Then we insert them along with the time, a uuid, and the loss function description into Astra.
            network_state = bytearray(pkl.dumps(network.state_dict()))
            optimizer_state = bytearray(pkl.dumps(optimizer.state_dict()))
            print(type(network_state))
            #Original save functions used torch.save to save data into .pth files on the disk
            #torch.save(network.state_dict(), 'results/model.pth')
            #torch.save(optimizer.state_dict(), 'results/optimizer.pth')
            query = "INSERT INTO mnist_digits.models (id, network, optimizer, upload_date, comments) VALUES (uuid(), %s, %s, toTimestamp(now()), %s);"
            values = [network_state, optimizer_state, description_string]
            session.execute(query, values)

In [None]:
# Defines the test function which takes a network and tests it on the data in the test loader, and returns a loss and accuracy just like during training, but on different data.
def test():
    network.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            output = network(data)
            test_loss += F.nll_loss(output, target, size_average=False).item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(target.data.view_as(pred)).sum()
    test_loss /= len(test_loader.dataset)
    test_losses.append(test_loss)
    print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [None]:
#First we call test for our uninitialized model (has random model weight), then for however many epochs were specified we train for an epoch and then test again
test()
for epoch in range(1, n_epochs + 1):
  train(epoch)
  test()