In [1]:
#
# Compute neural network classification rate.
#
#   Inputs: 
#      Xtrain = training feature data
#               numpy array rows are obs and columns are features
#      Ytrain = vector of coded classes coded 0,1,...,nclasses-1     
#      Xtest = test feature data
#      Ytest =  vector of coded classes (labels need to be 0,1,...,nclasses-1)
#      hidden_layer_sizes = list with numbers of nodes in hidden layers
#      batchsize = number of obs in each batch
#      nepochs = number of epochs to train on
#      lrate = learning rate to use to train
#
#
import os
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch import nn
import numpy as np
#
# This function takes as input the following:
#
#   Xtrain = training data predictor variables
#            a np 2d array with rows=observations and columns=variables
#   Ytrain = training data response variable a np array 1d array
#   Xtest = test predictor vars
#   Ytest = test response var
#   hidden_layer_sizes = list with the number of nodes at intermediate layers
#   batchsize = observations are broken up into batches so this is number of obs/batch
#   nepochs = when fitting, we go through the entire dataset nepochs times
#   lrate = parameter that determine stepsize (this is a bit complicated to describe for the 
#           optimizer employed (Adam)
#
def NeuralNetwork(Xtrain,Ytrain,Xtest,Ytest,hidden_layer_sizes,batchsize, nepochs,lrate):
    nobs,nfeatures=Xtrain.shape
    ntest=Xtest.shape[0]
    nclasses=len(set(Ytrain))
    
    #device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    device="cpu" # don't use gpu - when I run this in jupyter I get an error that does not come up 
    
    #
    # Dataset is a torch class - we **inherit** from this class to make our own 
    # custom MyDataset objects. The algorithm for fitting a neural network to the 
    # data needs a couple of functions.
    #
    # 1) __len__() giving the number of observations in the dataset
    # 2) __getitem__() - enabling one to get a pair/row of X,Y values by using square brackets
    # 
    class MyDataSet(Dataset):
        def __init__(self,X,Y):
            self.X=X
            self.Y=Y
            self.N=self.X.shape[0]
            self.K=self.X.shape[1]
        def __len__(self):
            return(self.N)
        def __getitem__(self, idx):
            return self.X[idx],self.Y[idx]
    #
    # Here we construct two Dataset objects
    #
    mydata_train=MyDataSet(Xtrain,Ytrain)
    mydata_test=MyDataSet(Xtest,Ytest)
    #
    # Create a network inheriting from torch.nn.Module
    # I've introduced my own constructor.
    #
    class MyNetwork(torch.nn.Module):
        #
        # the constructor takes as arguments:
        # 
        #    number of features
        #    number of classes
        #    a list of sizes of numbers of nodes in the hidden layers
        #
        def __init__(self,nfeatures,nclasses, hiddenlayersizes):
            nhiddenlayers=len(hiddenlayersizes)
            #
            # the following is used since the constructor for the base class
            # was over-ridden, but we still need some attributes that
            # are contained in the super class (torch.nn.Module)
            #
            super().__init__()
            #
            # create the layers
            #
            if nhiddenlayers==0:
                layers=[]
                layers.append(nn.Linear(nfeatures,nclasses))
            else:
                layers=[]
                layers.append(nn.Linear(nfeatures,hiddenlayersizes[0])) # fully connected
                layers.append(nn.Sigmoid()) # activation function 
                for i in range(1,nhiddenlayers):
                    layers.append(nn.Linear(hiddenlayersizes[i-1],hiddenlayersizes[i]))
                    layers.append(nn.Sigmoid()) # activation
                layers.append(nn.Linear(hiddenlayersizes[nhiddenlayers-1],nclasses))
                #
                # note that when using cross-entropy loss function 
                # we don't need to include an activation function after the last linear
                # transformation
                #
                # cross entropy loss calculates the output to exp(output)/sum(exp(output))
                # to convert to probability vector and compares with true class value
                #
                self.layers=nn.ModuleList(layers)
        #
        # Calculates the output from the input feature vector
        # This function is needed when model() is called.
        #
        def forward(self, x):
            for L in self.layers:
                x=L(x)
            return x
        #
        # Function to compute output probabilities from input.
        # This should be the same as y=model(x) followed by softmax
        # i.e. exp(y)/sum(exp(y))
        #
        def probability_vector(self,x):
            for L in self.layers:
                x=L(x)
            x=torch.exp(x)
            s=torch.sum(x)
            x=x/s
            return x
        #
        # Print the output for given x at each layer.
        #
        def output_in_stages(self,x):
            print("input = ")
            print(x)
            print("\n")
            ctr=0
            for L in self.layers:
                x=L(x)
                print("output "+str(ctr))
                print(x)
                print("\n")
                ctr+=1
            x=torch.exp(x)
            s=torch.sum(x)
            x=x/s
            print("output "+str(ctr))
            print(x)
            print("\n")
        
    #
    # instantiate the network
    #
    mynetwork=MyNetwork(nfeatures,nclasses,hidden_layer_sizes)
    #
    # training requires a data loader in which we specify a batch size.
    # when we use shuffle=True the rows of the dataset are permuted for each epoch.
    #
    mydataloader_train=torch.utils.data.DataLoader(dataset=mydata_train, batch_size=batchsize,shuffle=True)
    #
    # to validate on a test setm we create a data loader for the test set that uses the entire 
    # test set as a single batch (since we don't train on the test set)
    #
    mydataloader_test=torch.utils.data.DataLoader(dataset=mydata_test, batch_size=ntest,shuffle=False)

    learning_rate = lrate
    num_epochs =nepochs

    model=mynetwork
    #
    # move the model to the device
    #
    model = model.to(device)
    criterion = torch.nn.CrossEntropyLoss()
    #
    # Adam is an optimizer that maintains a different learning 
    # rate for every weight/parameter in the network - so the learning rate 
    # is a relative one
    #
    # See the gentle introduction:
    #        e.g. https://machinelearningmastery.com/adam-optimization-algorithm-for-deep-learning/
    #
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    #
    # Now we are at the training loop
    #
    # num_epochs = number of times we run throug the entire dataset
    #
    for epoch in range(num_epochs):
        #
        # for each epoch the data loader loads the data one batch at a time
        #
        for i,(x,y) in enumerate(mydataloader_train):
            
            x=x.float()
            x=x.to(device)
            y=y.to(device,dtype=torch.long)
            
            optimizer.zero_grad()
            
            outputs=model(x)
            
            loss=criterion(outputs,y)
            loss_value = loss.item()
            
            loss.backward() # compute gradients
            
            # move in direction of minus the gradient by a learning_rate amount 
            # here because we are using Adam, step is more complicated than -epsilon*Gradient
            optimizer.step() 
            
        if epoch==10*int(epoch/10):
            print("epoch = {0:5d} loss = {1:8.5f}".format(epoch,loss_value))
    print("epoch = {0:5d} loss = {1:8.5f}".format(epoch,loss_value))
    #
    # Compute the confusion matrix based on test data 
    # i.e. for each pair of classes c1, c2,
    # how often when true class is c1 we predict c2. 
    #
    # Here, for a given test (x,Y) when we calculate the class prediction
    # probabilities, we predict the class with the highest prediction probability.
    #
    # A good confusion matrix should be close to a diagonal matrix.
    #
    Confusion=np.zeros(shape=(nclasses,nclasses))
    for i in range(ntest):
        
        # get an x value
        d=mydata_test[i]
        x=torch.tensor(d[0])
        x=torch.reshape(x,(1,nfeatures))
        x=x.to(device).float()
        
        # compute the prediction probs for this x
        ypred=model(x)
        ypred=model(x).cpu().detach().numpy()[0]
        
        # compute the prediction 
        # coordinate of ypred taking maximum value
        yp=np.argmax(ypred)
        
        # get the true class
        ytrue=int(d[1])
        
        # update the confusion matrix count
        Confusion[ytrue,yp]+=1
    #
    # accuracy is the proportion of correct predictions
    #
    accuracy=np.sum(np.diag(Confusion))/np.sum(Confusion)
    return(Confusion,accuracy,model)

#
# use pandas to read the data in as a pandas data frame
#
import pandas as pd
import numpy as np
df=pd.read_csv("seeds_dataset.csv")
df.head()

Unnamed: 0,Area,Perimeter,Compactness,KernelLen,KernelWidth,AsymmetryCoef,KernelGrooveLen,Class
0,15.26,14.84,0.871,5.763,3.312,2.221,5.22,1
1,14.88,14.57,0.8811,5.554,3.333,1.018,4.956,1
2,14.29,14.09,0.905,5.291,3.337,2.699,4.825,1
3,13.84,13.94,0.8955,5.324,3.379,2.259,4.805,1
4,16.14,14.99,0.9034,5.658,3.562,1.355,5.175,1


In [2]:
#
# in pandas, shape gives (nrows,ncolums)
#
N=df.shape[0]
I=np.random.permutation(range(N))
#
# the pandas loc method can be used to reorder the rows
#
df=df.loc[I]
#
# print out the result - note that the index stores the original row 
# number for each observation
#
df.head()

Unnamed: 0,Area,Perimeter,Compactness,KernelLen,KernelWidth,AsymmetryCoef,KernelGrooveLen,Class
193,10.82,12.83,0.8256,5.18,2.63,4.853,5.089,3
137,15.57,15.15,0.8527,5.92,3.231,2.64,5.879,2
140,13.07,13.92,0.848,5.472,2.994,5.304,5.395,3
171,11.55,13.1,0.8455,5.167,2.845,6.715,4.956,3
27,12.74,13.67,0.8564,5.395,2.956,2.504,4.869,1


In [3]:
#
# Convert to numpy arrays and extract the feature matrix X and class column Y
# Note that we subtract 1 from class value so that classes are coded 0,1,2 rather than 1,2,3
#
X=np.array(df)[:,0:7]
Y=np.array(df)[:,7]-1 
Y=Y.astype(int)
print(df.shape)

#
# Create separate training and testing (X,Y) pairs using 75% train, 25% test
# and call these (Xtrain,Ytrain), (Xtest,Ytest)
#
N=df.shape[0]
Ntrain=int(3*N/4)
Ntest=N-Ntrain
Xtrain=X[0:Ntrain,:]
Ytrain=Y[0:Ntrain]
Xtest=X[Ntrain:N,:]
Ytest=Y[Ntrain:N]

(210, 8)


Train the network

In [8]:
C,acc, model=NeuralNetwork(Xtrain=Xtrain,
              Ytrain=Ytrain,
              Xtest=Xtest,
              Ytest=Ytest,
              hidden_layer_sizes=[10,10],
              batchsize=50, 
              nepochs=1000,
              lrate=.01)

print("accuracy on test set = "+str(acc))
print("confusion matrix = ")
print(C)

epoch =     0 loss =  1.11884
epoch =    10 loss =  1.00155
epoch =    20 loss =  0.83730
epoch =    30 loss =  0.62720
epoch =    40 loss =  0.71249
epoch =    50 loss =  1.01006
epoch =    60 loss =  0.48256
epoch =    70 loss =  0.58043
epoch =    80 loss =  0.56708
epoch =    90 loss =  0.26316
epoch =   100 loss =  0.44697
epoch =   110 loss =  0.16667
epoch =   120 loss =  0.26902
epoch =   130 loss =  0.15659
epoch =   140 loss =  0.08375
epoch =   150 loss =  0.04787
epoch =   160 loss =  0.06816
epoch =   170 loss =  0.19029
epoch =   180 loss =  0.07357
epoch =   190 loss =  0.08524
epoch =   200 loss =  0.19502
epoch =   210 loss =  0.06657
epoch =   220 loss =  0.14985
epoch =   230 loss =  0.44791
epoch =   240 loss =  0.15642
epoch =   250 loss =  0.71647
epoch =   260 loss =  0.04306
epoch =   270 loss =  0.05104
epoch =   280 loss =  0.08886
epoch =   290 loss =  0.12419
epoch =   300 loss =  0.01655
epoch =   310 loss =  0.11556
epoch =   320 loss =  0.18254
epoch =   

Note that we can save the model which has all information about the trained network.
I've added a module to the model that computes the output probabilities.

In [5]:
#
# Print output probabilities and true class for each test feature vector
#
for i in range(Xtest.shape[0]):
    x=torch.tensor(Xtest[i,:],dtype=torch.float)
    p=model.probability_vector(x).detach().numpy().round(3)
    y=Ytest[i]
    print(str(p)+" " +str(y))


[0.872 0.114 0.014] 0
[0.229 0.002 0.769] 2
[0.123 0.874 0.003] 1
[0.947 0.024 0.03 ] 0
[0.012 0.    0.988] 2
[0.247 0.001 0.752] 2
[0.5   0.002 0.497] 2
[0.532 0.    0.468] 2
[0.155 0.841 0.004] 1
[0.106 0.892 0.002] 1
[0.016 0.    0.984] 2
[0.875 0.072 0.053] 0
[0.05  0.    0.949] 2
[0.064 0.936 0.   ] 1
[0.069 0.93  0.002] 1
[0.09  0.908 0.002] 1
[0.859 0.118 0.023] 0
[0.05  0.95  0.001] 1
[0.842 0.132 0.026] 0
[0.008 0.    0.992] 2
[0.014 0.985 0.   ] 1
[0.057 0.942 0.001] 1
[0.402 0.005 0.593] 2
[0.911 0.085 0.004] 0
[0.068 0.93  0.001] 1
[0.909 0.083 0.008] 0
[0.092 0.001 0.907] 2
[0.627 0.356 0.018] 0
[0.106 0.893 0.001] 1
[0.069 0.93  0.001] 1
[0.408 0.586 0.006] 0
[0.249 0.005 0.747] 2
[0.962 0.024 0.015] 0
[0.056 0.    0.944] 2
[0.029 0.971 0.   ] 1
[0.743 0.227 0.03 ] 0
[0.03 0.97 0.  ] 1
[0.165 0.833 0.002] 1
[0.037 0.962 0.   ] 1
[0.887 0.054 0.058] 0
[0.03 0.97 0.  ] 1
[0.577 0.419 0.005] 0
[0.132 0.867 0.001] 1
[0.163 0.835 0.002] 0
[0.094 0.    0.906] 2
[0.67  0.234 0.0

We can also interrogate the model and determine the parameters that define it.

In [None]:
model

In [None]:
model.layers

model.named_parameters() gives a generator for getting all of the parameters.

In [None]:
type(model.named_parameters())

In [None]:
for name, param in model.named_parameters():
    print(name)
    print(param.data)

In [None]:
x=torch.tensor(Xtest[0],dtype=torch.float)
model.output_in_stages(x)

Copy all of the parameters into tensors

In [None]:
LTENSORS=[]
p=model.named_parameters()
for u,v in p:
    LTENSORS.append(v.data)
print(LTENSORS)

Try to reproduce what the linear transformation does in the first layer.

In [None]:
W=LTENSORS[0]
B=LTENSORS[1]
x=torch.tensor(Xtest[0],dtype=torch.float)
print(W.size())
print(B.size())
print(x.size())
y=torch.matmul(W,x)+B
print(y)

Check the next tensor in which the sigmoid function is applied.

In [None]:
1/(1+torch.exp(-y))

Finally, check that when we apply the softmax transformation to the final model output we get the probability vector.

In [None]:
y=model(x)
torch.exp(y)/torch.sum(torch.exp(y))