In [1]:
#IMPORT THE REQUIRED PACKAGES

import scipy.io as sio
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

#GPU

useCuda = False
if torch.cuda.is_available():
  useCuda = True
  torch.set_default_tensor_type(torch.cuda.FloatTensor)
print('I use cuda:',useCuda)

#processes

RC = False
print('I perform the random search:',RC)

Final = False
print('I perform the final training:',Final)

Acc = False
print('I compute the accuracy:',Acc)

RF = False
print('I compute the receptive fields:',RF)

FV = False
print('I perform the feature visualization:',FV)

I use cuda: False
I perform the random search: False
I perform the final training: False
I compute the accuracy: False
I compute the receptive fields: False
I perform the feature visualization: False


In [2]:
#READ THE DATA
mat = sio.loadmat("./MNIST.mat")

X = mat['input_images']
Y = mat['output_labels']

In [3]:
#DIVIDE INTO TRAIN AND TEST

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.20, random_state=1234)

print('The training and test sets have shapes:')
print('Shape of X_train:', X_train.shape)
print('Shape of Y_train:', Y_train.shape)
print('Shape of X_test:', X_test.shape)
print('Shape of Y_test:', Y_test.shape, '\n')

The training and test sets have shapes:
Shape of X_train: (48000, 784)
Shape of Y_train: (48000, 1)
Shape of X_test: (12000, 784)
Shape of Y_test: (12000, 1) 



In [4]:
#DEFINE THE NETOWRK

class Net(nn.Module):
    
    def __init__(self, LN):
        super(Net, self).__init__()
        #number of hidden layers
        numberL = len(LN)
        self.network = nn.Sequential()
        #first layer
        self.network.add_module('net1',nn.Linear(in_features=784, out_features=LN[0]))  
        self.network.add_module('act1',nn.LeakyReLU())   
   
        #hidden layers
        j=2
        for i in range(numberL-1):
            name = 'net'+str(i+2)
            self.network.add_module(name,nn.Linear(in_features=LN[i], out_features=LN[i+1]))
            name = 'act'+str(i+2) 
            self.network.add_module(name,nn.LeakyReLU())
            j += 1

        name = 'net'+str(j)
        self.network.add_module(name,nn.Linear(in_features=LN[-1], out_features=10))

    def forward(self, x):
        #compute up the last layer
        out = self.network(x)
        return out

    def output(self, x):
        #returns the class 
        with torch.no_grad():
            out = self.forward(x)
            out = nn.functional.softmax(out,dim=1)
            _, predicted = torch.max(out, 1)

        return predicted

In [5]:
#DEFINE THE TRAINING

def Printlr(opt):
    for p in opt.param_groups:
        return p['lr']

def Training(net, opt, gamma, cool, num_epochs, Xt, Yt, Xv, Yv, verbose):

    #optimizer
    optimizer = opt
    #scheduler
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer,'min',factor=gamma, patience=50, cooldown=cool, min_lr=1e-6, threshold=1e-10, verbose=verbose)

    #train and test loss
    train_loss_log = []
    val_loss_log = []

    #define the loss function (the most used are already implemented in pytorch, see the doc!)
    lossfn = nn.CrossEntropyLoss()

    if useCuda:
        torch.cuda.empty_cache()
        net.cuda()
        lossfn.cuda()
        Xt = Xt.cuda()
        Yt = Yt.cuda()
        Xv = Xv.cuda()
        Yv = Yv.cuda()

    for num_ep in range(num_epochs):

        #update
        net.train()

        # IMPORTANT! zeroes the gradient buffers of all parameters
        optimizer.zero_grad()    
        # Forward pass
        out = net(Xt)
        # Evaluate loss
        loss = lossfn(out, Yt)
        # Backward pass
        loss.backward()
        #optimizer step
        optimizer.step()

        #evaluate network
        net.eval()
        with torch.no_grad(): # Avoid tracking the gradients (much faster!)
            out = net(Xv)
            val_loss = lossfn(out, Yv)

        #compute the loss
        train_loss_log.append(float(loss.item()))
        val_loss_log.append(float(val_loss.item()))

        #scheduler step
        scheduler.step(val_loss_log[-1])

        if verbose:
            if (num_ep % 99 == 0):
                print('Epoch %d  - lr: %.6f -  Train loss: %.6f - Val loss: %.6f' % (num_ep + 1, Printlr(optimizer), float(loss.item()), float(val_loss.item())))

        #Early stopping
        
        #check the patience
        stop=False
        if num_ep > 2000:
            #check the improvement on the validation loss
            check1 = 0
            check2 = 0
            for es in range(0,80,5):
                if abs(val_loss_log[num_ep-2-es] - val_loss_log[num_ep-1-es]) < 1e-7: check1 += 1
            for es in range(200):
                if val_loss_log[num_ep-2-es] - val_loss_log[num_ep-1-es] < 0 : check2 += 1
                
            if check1 == 16 or check2 > 180: stop=True

        if stop:
            print('Stop due to early stopping')
            break


    return train_loss_log, val_loss_log



In [6]:
#DEFINE THE RANDOM SEARCH

from sklearn.model_selection import KFold

def RandomSearch(RC,model,parameters,cases,X_train,Y_train,cv,epochs=1000,verbose=False):

    if RC:
        
        #convert the keys of the parameter dictionary into a list
        NamePar = list(parameters.keys())
        
        #build the combination of parameters
        Combinations = []
        for i in range(cases):
            Case = {}
            for j in NamePar:
                #sampling the parameters space
                sampling = np.random.choice(parameters[j])
                Case[j] = sampling
            #build the combination
            Combinations.append(Case)

        Combinations = np.array(Combinations)

        #perform cross validation using Combinations

        #divide data in folds
        kf = KFold(n_splits=cv, random_state=1234, shuffle=True)

        #array for the losses
        loss = np.zeros(cases)

        #loop over all the combinations
        for i in range(cases):

            print('Check the combination:',Combinations[i])
            #loop over all the folds
            fold=1
            for train_index, val_index in kf.split(X_train):
                print('Fold', fold)
                Xt, Xv = X_train[train_index], X_train[val_index]
                Yt, Yv = Y_train[train_index], Y_train[val_index]

                #convert to Tensor
                Xt = torch.Tensor(Xt).float().view(-1, Xt.shape[1])
                Yt = torch.LongTensor(Yt).squeeze()
                Xv = torch.Tensor(Xv).float().view(-1, Xv.shape[1])
                Yv = torch.LongTensor(Yv).squeeze()

                #initialize the model with the first parameters
                mod = model(list(Combinations[i][NamePar[0]]))
                
                #Training the network
                #choose the correct optimizer
                opt = optim.Adam(mod.parameters(), lr=Combinations[i]['optimizer__lr'], weight_decay=5e-4)

                #training
                _, l = Training(mod, opt, Combinations[i]['scheduler__gamma'], Combinations[i]['scheduler__cool'], epochs, 
                         Xt, Yt, Xv, Yv, verbose)

                ### Summing the loss of each fold
                loss[i] += l[-1]
                fold += 1
            print('Loss:', loss[i]/cv, '\n')

        #computing the average loss over the folds
        loss = loss/cv
        #the best combinations correspondes to minimum loss
        best = Combinations[np.argmin(loss)]
        
        return Combinations, best

    else: 
        print('The random search is not performed.')
        return 0, 0



In [7]:
#DEFINE THE FINAL TRAINING

def FinalTraining(Final,best,X_train,Y_train,epochs=10000):
    if Final:
        print('The best structure is:', best, '\n')

        #divide the training set into training and validation using scikit learn
        Xt, Xv, Yt, Yv = train_test_split(X_train, Y_train, test_size=0.20, random_state=1234)
        #convert to Tensor
        Xt = torch.Tensor(Xt).float().view(-1, Xt.shape[1])
        Yt = torch.LongTensor(Yt).squeeze()
        Xv = torch.Tensor(Xv).float().view(-1, Xv.shape[1])
        Yv = torch.LongTensor(Yv).squeeze()

        #best combination of parameters

        #model structure
        net = Net(best['module__LN'])
        #optimizer
        opt = optim.Adam(net.parameters(), lr=best['optimizer__lr'], weight_decay=5e-4)
        #train
        tloss, vloss = Training(net, opt, best['scheduler__gamma'], best['scheduler__cool'], epochs, Xt, Yt, Xv, Yv, True)

        #save the weights on cpu
        if useCuda:
            net.to('cpu')
        #the state dictionary includes all the parameters of the network
        net_state_dict = net.state_dict()
        #save the state dict to a file
        model_save_name = 'NetParameters.torch'
        path = F"{model_save_name}" 
        torch.save(net_state_dict, path)
        
        return tloss, vloss
    else: 
        print('The final training is not performed.')
        return 0, 0



In [8]:
#DEFINE THE ACCURACY

from sklearn.metrics import confusion_matrix

def Accuracy(net,X,Y,Plot=True):
    X = torch.Tensor(X).float().view(-1, X.shape[1])
    Y = torch.LongTensor(Y).squeeze()

    if useCuda:
        X=X.cuda()
        Y=Y.cuda()

    Ypred = net.output(X)
    temp = Ypred-Y
    temp = temp.cpu().numpy()

    #mask the array
    temp[temp>0] = -1
    temp[temp==0] = 1
    temp[temp<0] = 0

    #count the correct
    correct = np.sum(temp)

    if Plot:
        if useCuda:
            X=X.cpu().numpy()
            Y=Y.cpu().numpy()
            Ypred=Ypred.cpu().numpy()
        #confusion matrix
        cm = confusion_matrix(Y, Ypred)
        #normalization
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        #plot
        fig, ax = plt.subplots(figsize=(8,8))
        im = ax.imshow(cm, interpolation='nearest', cmap='Blues', vmin=0, vmax=1)
        ax.figure.colorbar(im, ax=ax)
        # We want to show all ticks...
        ax.set(xticks=np.arange(cm.shape[1]),
               yticks=np.arange(cm.shape[0]),
               xticklabels=np.arange(0,10), yticklabels=np.arange(0,10),
               ylabel='True label',
               xlabel='Predicted label')

        thresh = cm.max() / 2.
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax.text(j, i, format(cm[i, j], '.2f'),
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")
        plt.show()

    return correct/len(Y)

In [9]:



##### START THE COMPUTATION #####



In [10]:
#DEFINE THE HYPERPARAMETERS

def Params():
    #Network architecture
    #Number of layers
    Layers = np.random.randint(1,4,size=50)

    #Neurons per layer
    Neurons =[]

    for i in range(int(len(Layers)/2)):
        m = np.random.choice([300,400,500,600]) 
        #divide the range
        max = m
        min = 100
        step=(max-min)/(Layers[i])
        min = m-step
        N=[]
        for j in range(Layers[i]):
            N.append(np.random.randint(min,max))
            max -= step
            min -= step
        Neurons.append(tuple(N))

    for i in range(int(len(Layers)/2),len(Layers)):
        m = np.random.choice([300,400,500,600]) 
        #divide the range
        max = m
        min = m-100
        N=[]
        for j in range(Layers[i]):
            N.append(np.random.randint(min,max))
        Neurons.append(tuple(N))

    #Training parameters
    #Learning rate
    LearnigRate = [0.005,0.001,0.0005,0.0001]
    #Gamma of decay
    Gamma = [0.5,0.7,0.9]
    #Cooldown
    Cool =[0,50,100]

    #FinalParameters
    parameters = {'module__LN': Neurons,
                  'optimizer__lr': LearnigRate,
                  'scheduler__gamma': Gamma,
                  'scheduler__cool': Cool
                  }

    return parameters

In [11]:
#PERFORM THE RANDOM SEARCH
comb, best = RandomSearch(RC,Net,Params(),50,X_train,Y_train,4,2000,True)

if RC:
    print(comb,'\n')
    print('The best combination is', best, '\n')

The random search is not performed.


In [12]:
#PERFORM THE FINAL TRAINING
#results obtained through the cross validation
best =  {'module__LN': (480, 132), 'optimizer__lr': 0.001, 'scheduler__gamma': 0.9, 'scheduler__cool': 0}

#final training
tloss, vloss = FinalTraining(Final,best,X_train,Y_train,10000)

#plot the loss
if Final:
    plt.figure(figsize=(12,8))
    plt.semilogy(tloss, label='Train loss')
    plt.semilogy(vloss, label='Val loss')
    plt.xlabel('Epoch', fontsize=20)
    plt.ylabel('Loss', fontsize=20)
    plt.grid()
    plt.xticks(fontsize=15)
    plt.legend(prop=dict(size=18))
    plt.legend()
    plt.tight_layout()
    plt.show()

The final training is not performed.


In [13]:
#COMPUTE THE ACCURACY

#LOAD THE NETWORK AND COMPUTE THE ACCURACY ON THE TEST SET
#load the network
#initialize the net
net = Net(best['module__LN'])
#load the state dict previously saved
net_state_dict = torch.load('NetParameters.torch')
  
#update the network parameters
net.load_state_dict(net_state_dict)

if Acc:
   
    #compute the accuracy on training set
    print('The accuracy on the training set is: %.3f' % Accuracy(net,X_train,Y_train))
    #compute the accuracy on the test set
    print('The accuracy on the test set is: %.3f' % Accuracy(net,X_test,Y_test))
    

In [14]:
#GET the weights

#copy the dictionary containing the weights and the bias
WeightsBias = net_state_dict.copy()
#get the names of the weights and of the bias
NameLayers = list(WeightsBias.keys())
#take just the name of the weights
WeightsName = [ x for x in NameLayers if 'bias' not in x ]

#number of neurons in the hidden layers
NN = []
for i in range(len(WeightsName)):
    NN.append(len(WeightsBias[WeightsName[i]]))
    
#RECEPTIVE FIELD 
def ReceptiveField(hn, netdict):

    RFS = []
    RFSLast = []

    for k in range(hn):

        RF = []
        W = []

        #first layer

        #choiche on the neuron
        n = np.random.choice(np.arange(1,NN[0]))
        #matrix of weights from the input to the first hidden layer
        mat = WeightsBias[WeightsName[0]]
        #weights matrix
        W.append(mat)
        #receptive field
        RF.append(mat[n,:].reshape(28,28))

        for i in range(1, len(WeightsName)-1):
            #choice on the neuron
            n = np.random.choice(np.arange(1,NN[i]))
            #matrix of weights from layer i-1 to the layer i
            mat = WeightsBias[WeightsName[i]]
            #store the weight matrix
            W.append(mat)
            #considering the weights from all the neurons in layer i-1 and the chosen neuron in layer i
            Wj = mat[n,:]

            prod=W[i-1]
            for j in range(i-1,0,-1):
                #compute the product among matrices
                prod = np.matmul(prod,W[j-1])

            #compute the final product
            prod = np.matmul(Wj,prod)

            #compute and store the receptive fields
            RF.append(prod.reshape(28,28))
        
        RFS.append(RF)
     
    RF = []
    
    for i in range(10):    
        #matrix of weight last hidden layer
        prod = WeightsBias[WeightsName[-2]]
        for j in range(len(WeightsName)-3,-1,-1):
            mat = WeightsBias[WeightsName[j]]
            #compute the product among matrices
            prod = np.matmul(prod,mat)
            
        #considering the weights from all the neurons in layer i-1 and the chosen neuron in the last layer 
        mat = WeightsBias[WeightsName[-1]]   
        Wi = mat[i,:]   
        #compute the final product
        prod = np.matmul(Wi,prod)
        #compute and store the receptive fields
        RF.append(prod.reshape(28,28))
        
    RFSLast.append(RF)

    return RFS, RFSLast

In [15]:
#COMPUTE THE RECEPTIVE FIELDS

if RF:
    #compute the receptive fields
    RFS, RFSLast = ReceptiveField(4, net_state_dict)
    #plot the receptive fields
    for layer in range(len(RFS[0])):
        (fig, subplots) = plt.subplots(2, 2, figsize=(8, 8))
        fig.suptitle("Layer=%i" % (layer+1), fontsize=15)
        case = 0 
        for j in range(2):
            for i in range(2):
                ax = subplots[j][i]
                im = ax.imshow(RFS[case][layer], cmap='gray')
                case += 1
                
    #plot the output layer
    (fig, subplots) = plt.subplots(2, 5, figsize=(20, 8))
    fig.suptitle("Last layer", fontsize=15)
    neuron = 0 
    for j in range(2):
        for i in range(5):
            ax = subplots[j][i]
            im = ax.imshow(RFSLast[0][neuron], cmap='gray')
            ax.set_title('Neuron %i' %(neuron))
            neuron += 1


In [16]:
#IMPROVE THE FEATURE VISUALIZATION


def Activation(Net,ChosenLayer,Input):
    #compute the output of first layer
    if ChosenLayer == 1:
        return Net.network.act1(Net.network.net1(Input))
    #compute the output of second layer
    if ChosenLayer == 2:
        return Net.network.act2(Net.network.net2(Net.network.act1(Net.network.net1(Input))))
    #compute the output of third layer
    if ChosenLayer == 3:
        return Net.network.net3(Net.network.act2(Net.network.net2(Net.network.act1(Net.network.net1(Input)))))
    

def FeatureVisualization(cL, cN, epochs,lrate, Net):
    #initilize a matrix with zeros
    mat = torch.zeros(784, requires_grad=True) 
    #define the optimizer
    optimizer = torch.optim.Adam([mat], lr=lrate, weight_decay=5e-4)

    #train
    for num_ep in range(epochs):  
        optimizer.zero_grad()
        #define the act as minus act to obtain a gradient ascent
        act = -Activation(Net,cL,mat)[cN] 
        act.backward()
        optimizer.step()
        
    return mat

In [17]:
if FV:
    #compute and plot for hidden layers
    for layer in range(1,3):
        (fig, subplots) = plt.subplots(2, 2, figsize=(8, 8))
        fig.suptitle("Layer=%i" % (layer+1), fontsize=15)
        case = 0 
        for j in range(2):
            for i in range(2):
                n = np.random.choice(np.arange(1,NN[layer]))
                F = FeatureVisualization(layer,n,500,0.1,net)
                F = F.cpu().detach().numpy()
                ax = subplots[j][i]
                im = ax.imshow(F.reshape(28,28), cmap='gray')

    #plot the output layer
    (fig, subplots) = plt.subplots(2, 5, figsize=(20, 8))
    fig.suptitle("Last layer", fontsize=25)
    n = 0 
    for j in range(2):
        for i in range(5):
            F = FeatureVisualization(3,n,500,0.1,net)
            F = F.cpu().detach().numpy()
            ax = subplots[j][i]
            im = ax.imshow(F.reshape(28,28), cmap='gray')
            ax.set_title('Neuron %i' %(n), fontsize=15)
            n += 1