In [1]:
import torch
import pandas as pd
import numpy as np
import torch.utils
import torchvision.transforms
from torch import nn
import matplotlib.pyplot as plt
%matplotlib inline

torch.set_printoptions(threshold=1000)

In [32]:
#Loading house prices training and test dataset
hptrain = pd.read_csv("Datasets/hptrain.csv",index_col=0)
hptest = pd.read_csv("Datasets/hptest.csv",index_col=0)

In [33]:
#Transforming data into tensor data format from pandas' dataframe
X_ols_train = torch.tensor(hptrain[['LotArea', 'OverallQual', 'OverallCond', 'MasVnrArea', 'TotalBsmtSF']].values,requires_grad=True,dtype=torch.float32)
X_ols_train = nn.functional.normalize(X_ols_train)
Y_ols_train = torch.tensor(hptrain[['SalePrice']].values,requires_grad=True,dtype=torch.float32)


#Splitting data into iterable batches
olstensor = torch.utils.data.TensorDataset(X_ols_train,Y_ols_train)
olsdataloader = torch.utils.data.DataLoader(olstensor, batch_size=20,shuffle=True)

In [34]:
#Specifying the paramenters of our model (ols - ordinary least squares i.e. linear regression)
olsnet = nn.Sequential(nn.Linear(X_ols_train.size()[1],Y_ols_train.size()[1]))
olsnet[0].weight.data.normal_(0, 0.1)
olsnet[0].bias.data.normal_(0, 0.1)
olsloss = nn.MSELoss()
olstrainer = torch.optim.SGD(olsnet.parameters(),lr=0.0001)

In [35]:
num_epochs = 5
olsnet.train()
for epoch in range(num_epochs):
    print('-------------------')
    print(f'{epoch+1}th epoch')
    print('===================')
    olstrainer.zero_grad()
    X, Y = next(iter(olsdataloader))
    output = olsnet(X)

    l = olsloss(output,Y)
    print(f'Loss ols:{l}')


    l.backward(retain_graph=True)

    olstrainer.step()
    print('Coefficients: ',olsnet[0].weight.data)


-------------------
1th epoch
Loss ols:29889024000.0
Coefficients:  tensor([[3.3032e+01, 1.1173e-02, 1.6113e-01, 3.2019e-01, 4.7043e+00]])
-------------------
2th epoch
Loss ols:38795599872.0
Coefficients:  tensor([[6.9708e+01, 4.3485e-02, 1.8754e-01, 5.9882e-01, 9.7254e+00]])
-------------------
3th epoch
Loss ols:30773739520.0
Coefficients:  tensor([[1.0256e+02, 6.8166e-02, 2.1024e-01, 9.3925e-01, 1.3945e+01]])
-------------------
4th epoch
Loss ols:58688897024.0
Coefficients:  tensor([[1.4750e+02, 9.6646e-02, 2.3292e-01, 1.5081e+00, 1.9511e+01]])
-------------------
5th epoch
Loss ols:44163383296.0
Coefficients:  tensor([[1.8386e+02, 1.2079e-01, 2.5332e-01, 1.8477e+00, 2.4070e+01]])


In [36]:
'''--------------------------------------------------------'''

'--------------------------------------------------------'

In [None]:
#Creating a class for displaying the results
class Evaluation_Cross_Entropy:
    """For evaluating the accuracy of image recognition neural networks"""
    def __init__(self,net,img = None,label = None,device='cpu'):
        #History of saved error rates, used for in-sample error evaluation during network training
        self.hist = []
        self.loss = []
        self.net = net
        self.img = img
        self.label = label
        self.device = device

    def prob(self):
        """Returns probabilities for each label"""
        output = self.net(self.img).double().to(device=self.device)
        exp = torch.exp(output)
        sum = torch.sum(exp,dim=-1)
        probabilities = exp / sum.unsqueeze(1)
        return probabilities

    def pred(self):
        """Returns the labels our model predicted for each image"""
        pred = torch.argmax(self.prob(),dim=1)
        return pred

    def cert(self):
        """(certainty) Returns the probabilities of the chosen label for each image"""
        return torch.amax(self.prob(),dim=1)

    def error(self):
        """Returns misclassification error rate"""
        return torch.sum(self.pred() != self.label)/len(self.label)

    def missclass(self):
        """Returns what labels our model misclassified as a % of all misclassifications"""

        #Vector of boolean values, True if an image has been misclassified
        boo = self.label != self.pred()

        #Finds indices of True booleans
        temp = boo.nonzero()

        #Reshapes the indices
        temp2 = torch.reshape(temp,(1,temp.size(dim=0))).data[0]

        #Finds the labels of misclassified images
        temp3 = torch.index_select(self.label,0,temp2)

        #Finds what image labels and how many of them have been misclassified
        unique_miss, count_miss = torch.unique(temp3,sorted=True,return_counts=True)

        #Finds all possible image labels
        unique_label = torch.unique(self.label,sorted=True)

        #Initiates values
        count_label = torch.zeros(1,len(unique_label)).data[0]
        j = 0

        #Checks for an edge case where some images might have all been classified correctly and inputs 0s there
        for i in range(len(unique_label)):
            if unique_label[i] == unique_miss[j]:
                count_label[i] = count_miss[j]
                j +=1

        #Returns a vector of % of labels misclassified
        return count_miss/torch.sum(boo)


    def add(self,img,label,loss = False):
        """Appends an error rate (and loss if not False) to the history)"""
        self.img = img
        self.label = label
        self.hist.append(self.error().item())
        self.loss.append(loss)

    def plot_error(self):
        """Plots the in-sample error from training history"""

        plt.plot(self.hist)
        plt.ylabel('In-sample error')
        plt.xlabel('Epoch')
        plt.show()

    def plot_loss(self):
        """Plots the loss from training history"""

        plt.plot(self.loss)
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.show()


In [None]:
#Creating an automatic trainer to be used with sequential linear classes
class Gradient_Img_Trainer:
    """Automatic trainer for linear classifiers embedded through a sequential class"""
    def __init__(self,net,train_iter,loss,trainer,test_iter = None,scheduler = None,device='cpu'):
        self.net = net.to(device=device)
        self.train_iter = train_iter
        self.loss = loss.to(device=device)
        self.trainer = trainer
        self.test_iter = test_iter
        self.result = None
        self.scheduler = scheduler
        self.device = device

    def dim(self):
        """Prints the output shapes of all layers"""
        X,Y = next(iter(mnist_train_iter))
        X,Y = X.to(device=self.device),Y.to(device=self.device)
        for layer in self.net:
            X = layer(X).to(device=self.device)
            print(self.net.__class__.__name__,'output shape: \t',X.shape)


    def normal_init(self, std = 0.1):
        """Used for initializing the weights of the nn from a normal distribution"""
        def init_normal(m):
            if type(m) == nn.Linear:
                nn.init.normal_(m.weight, std=std).to(device=self.device)

        self.net.apply(init_normal)

    def xavier_uniform(self):
        """Used for initializing the weights of the nn according to a xavier method from a uniform"""
        def xavier(m):
            if type(m)== nn.Linear:
                nn.init.xavier_uniform_(m.weight).to(device=self.device)

        self.net.apply(xavier)

    def xavier_normal(self):
        """Used for initializing the weights of the nn according to a xavier method from a normal"""
        def xavier(m):
            if type(m)== nn.Linear:
                nn.init.xavier_normal_(m.weight).to(device=self.device)

        self.net.apply(xavier)


    def Train(self, num_epochs, print_loss = True, print_error = True):
        """Used for training the neural network
                :param print_loss: - print loss for each epoch
                :param print_error: - print error for each epoch
                :param graph: - graph in-sample error at the end of training"""
        self.net.train()
        result = Evaluation_Cross_Entropy(net=self.net,device=self.device)
        for epoch in range(num_epochs):
            if print_loss is True or print_error is True:
                print('-------------------')
                print(f'{epoch+1}th epoch')
                print('===================')

            self.net.zero_grad()

            #Trainig the model
            img, label = next(iter(self.train_iter))
            img,label = img.to(device=self.device),label.to(device=self.device)
            output = self.net(img).to(device=self.device)
            l = self.loss(output,label)
            l.mean().backward()
            self.trainer.step()

            result.add(img, label, l.sum().item())

            #Adjusting the learning rate
            if self.scheduler is not None:
                self.scheduler.step()

            if print_loss is True:
                print("Loss:",l.sum().item())

            if print_error is True:
                print("In-sample error:", result.error())


        self.result = result
        torch.cuda.empty_cache()

    def Test(self,test_iter=None):
        self.net.eval()
        if self.test_iter is None:
            self.test_iter = test_iter
        img, label = next(iter(self.test_iter))
        self.result.img = img.to(device=self.device)
        self.result.label = label.to(device=self.device)
        print("Out-of-sample error:",self.result.error())
        torch.cuda.empty_cache()

In [None]:
#Loading in complex MNIST train and test datasets while transforming them to 28x28
trans = [torchvision.transforms.ToTensor()]
trans.insert(0, torchvision.transforms.Resize(28))
trans = torchvision.transforms.Compose(trans)


mnist_train = torchvision.datasets.FashionMNIST(
    root="../Pytorch_Practise/Datasets",train = True, transform=trans, download= True)
mnist_test = torchvision.datasets.FashionMNIST(
    root="../Pytorch_Practise/Datasets",train = False, transform=trans, download= True)

In [None]:
#Visualizing 9 random examples
labels_map = {
    0: "T-Shirt",
    1: "Trouser",
    2: "Pullover",
    3: "Dress",
    4: "Coat",
    5: "Sandal",
    6: "Shirt",
    7: "Sneaker",
    8: "Bag",
    9: "Ankle Boot",
}

figure = plt.figure(figsize=(8, 8))
cols, rows = 3, 3

for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(mnist_train), size=(1,)).item()
    imge, labele = mnist_train[sample_idx]
    figure.add_subplot(rows, cols, i)
    plt.title(labels_map[labele])
    plt.axis("off")
    plt.imshow(imge.squeeze())
plt.show()

In [None]:
#Splitting our data into batch sizes
mnist_batch_size = 256

mnist_train_iter = torch.utils.data.DataLoader(mnist_train,mnist_batch_size,shuffle=True,num_workers=4,pin_memory=True)
mnist_test_iter = torch.utils.data.DataLoader(mnist_test,mnist_test.__len__(),shuffle=False,num_workers=4,pin_memory=True)




In [None]:
#Creating the model and initializing the weights (logit - logistic regression)
logit_net = nn.Sequential(nn.Flatten(),nn.Linear(784,10))


#Initializing weights
def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight, std=0.01)

logit_net.apply(init_weights)

#Specifying cross entropy loss
logit_loss = nn.CrossEntropyLoss(reduction='none')

#Specifying Stochastic Gradient Descent trainer
logit_trainer = torch.optim.SGD(logit_net.parameters(), lr=0.1)

In [None]:
logit_training = Gradient_Img_Trainer(logit_net, mnist_train_iter, logit_loss, logit_trainer, mnist_test_iter)
logit_training.Train(50)

In [None]:
logit_training.result.plot_error()

In [None]:
logit_training.result.plot_loss()

In [None]:
logit_training.Test()

In [None]:
'''--------------------------------------------------------'''

In [None]:
#Perceptron initiation

MLP = nn.Sequential(nn.Flatten(),
                    nn.Linear(784,256),
                    nn.ReLU(),
                    nn.Linear(256,10))

def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight, std=0.01)

MLP.apply(init_weights)

MLP_loss = nn.CrossEntropyLoss(reduction='none')

MLP_trainer = torch.optim.SGD(MLP.parameters(),lr = 0.1)

In [None]:
MLP_Training = Gradient_Img_Trainer(MLP, mnist_train_iter, MLP_loss, MLP_trainer, mnist_test_iter)
MLP_Training.Train(50)

In [None]:
MLP_Training.result.plot_error()

In [None]:
MLP_Training.Test()

In [None]:
'''--------------------------------------------------------'''

In [None]:
#Running a logistic regression with weight decay

Wd_net = nn.Sequential(nn.Flatten(),
                    nn.Linear(784,10))

def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight, std=0.01)

Wd_net.apply(init_weights)

Wd_loss = nn.CrossEntropyLoss(reduction='none')

Weight_decay = 10

Wd_trainer = torch.optim.SGD([
    {"params":Wd_net[1].weight,'weight decay':Weight_decay},
    {"params":Wd_net[1].bias}],lr = 0.1)

In [None]:
Wd_Training = Gradient_Img_Trainer(Wd_net, mnist_train_iter, Wd_loss, Wd_trainer, mnist_test_iter)
Wd_Training.Train(50)

In [None]:
Wd_Training.Test()

In [None]:
'''--------------------------------------------------------'''

In [None]:
#MLP with Drop-out

Dropout_net = nn.Sequential(nn.Flatten(),
                            nn.Linear(784,256),
                            nn.ReLU(),
                            nn.Dropout(0.2),
                            nn.Linear(256,256),
                            nn.ReLU(0.5),
                            nn.Dropout(0.5),
                            nn.Linear(256,10)
                            )

def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight, std=0.01)

Dropout_net.apply(init_weights)

Dropout_loss = nn.CrossEntropyLoss(reduction='none')
Dropout_trainer = torch.optim.SGD(Dropout_net.parameters(), lr = 0.5)

In [None]:
#Training with Drop-out
Dropout_training = Gradient_Img_Trainer(Dropout_net,mnist_train_iter,Dropout_loss,
                                        Dropout_trainer,mnist_test_iter)
Dropout_training.Train(150)

In [None]:
#Testing
Dropout_training.Test()

In [None]:
'''--------------------------------------------------------'''

In [None]:
#Basic CNN implementation
CNN = nn.Sequential(
    nn.Conv2d(1,6,kernel_size=5,padding=2),nn.ReLU(),
    nn.MaxPool2d(kernel_size=2,stride=2),
    nn.Conv2d(6,16,kernel_size=5), nn.ReLU(),
    nn.MaxPool2d(kernel_size=2,stride=2),
    nn.Flatten(),
    nn.Linear(16*5*5,120), nn.ReLU(),
    nn.Linear(120,84), nn.ReLU(),
    nn.Linear(84,10))

def xavier_init(m):
    if type(m)== nn.Linear:
        nn.init.xavier_uniform_(m.weight)

CNN.apply(xavier_init)


CNN_loss = nn.CrossEntropyLoss(reduction='none')

CNN_trainer = torch.optim.SGD(CNN.parameters(),lr = 0.3)

In [None]:
#Training of our CNN
CNN_training = Gradient_Img_Trainer(CNN,mnist_train_iter,CNN_loss,CNN_trainer,mnist_test_iter)
CNN_training.Train(600, print_loss=False)

In [None]:
#Testing our model (we'd normally use validation set here to avoid overfitting to test data)
CNN_training.Test()

In [None]:
'''--------------------------------------------------------'''

In [None]:
#Running a CNN with learning rate decay
CNN2 = nn.Sequential(
    nn.Conv2d(1,6,kernel_size=5,padding=2),nn.ReLU(),
    nn.MaxPool2d(kernel_size=2,stride=2),
    nn.Conv2d(6,16,kernel_size=5), nn.ReLU(),
    nn.MaxPool2d(kernel_size=2,stride=2),
    nn.Flatten(),
    nn.Linear(16*5*5,120), nn.ReLU(),nn.Dropout(0.2),
    nn.Linear(120,84), nn.ReLU(),nn.Dropout(0.2),
    nn.Linear(84,10))

def xavier_init(m):
    if type(m)== nn.Linear:
        nn.init.xavier_uniform_(m.weight)

CNN2.apply(xavier_init)


CNN2_loss = nn.CrossEntropyLoss(reduction='none')

CNN2_trainer = torch.optim.SGD(CNN2.parameters(),lr = 0.3)

CNN2_scheduler = torch.optim.lr_scheduler.MultiStepLR(CNN2_trainer,milestones=[50,100,200,400,600,700],gamma=0.8)

In [None]:
#Training our model with decaying learning rate scheduled at 50'th and 600'th epoch
CNN2_training = Gradient_Img_Trainer(CNN2,mnist_train_iter,CNN2_loss,CNN2_trainer,mnist_test_iter,CNN2_scheduler)
CNN2_training.Train(800, print_loss=False)

In [None]:
CNN2_training.Test()

In [None]:
'''--------------------------------------------------------'''

In [None]:
##CUDA

In [None]:
#For our CNN implementation we'll be significantly increasing our batch sizes
mnist_batch_size = 1024

mnist_train_iter = torch.utils.data.DataLoader(mnist_train,mnist_batch_size,shuffle=True,pin_memory=True,pin_memory_device='cuda:0')
mnist_test_iter = torch.utils.data.DataLoader(mnist_test,mnist_test.__len__(),shuffle=False,pin_memory=True,pin_memory_device='cuda:0')

In [None]:
#Experimenting with convolutions and batch normalization
CNN3 = nn.Sequential(
    nn.Conv2d(1,16,kernel_size=5,padding=2),nn.BatchNorm2d(16),nn.ReLU(),
    nn.MaxPool2d(kernel_size=2,stride=2),
    nn.Conv2d(16,64,kernel_size=3,padding=1),nn.BatchNorm2d(64),nn.ReLU(),
    nn.Conv2d(64,64,kernel_size=3,padding=1),nn.BatchNorm2d(64),nn.ReLU(),
    nn.Conv2d(64,16,kernel_size=3,padding=1),nn.BatchNorm2d(16),nn.ReLU(),
    nn.MaxPool2d(kernel_size=2,stride=2),
    nn.Flatten(),
    nn.Linear(784,120),nn.BatchNorm1d(120), nn.ReLU(),
    nn.Linear(120,84),nn.BatchNorm1d(84),nn.ReLU(),
    nn.Linear(84,10))

In [None]:
#Running a CNN with learning rate decay
def xavier_init(m):
    if type(m)== nn.Linear:
        nn.init.xavier_uniform_(m.weight).cuda()

CNN3.apply(xavier_init)


CNN3_loss = nn.CrossEntropyLoss(reduction='none')

CNN3_trainer = torch.optim.SGD(CNN3.parameters(),lr = 0.3)

CNN3_scheduler = torch.optim.lr_scheduler.MultiStepLR(CNN3_trainer,milestones=[50,100,200,400,600,700],gamma=0.8)

In [None]:
#Training the model
CNN3_training = Gradient_Img_Trainer(CNN3,mnist_train_iter,CNN3_loss,CNN3_trainer,mnist_test_iter,CNN3_scheduler,'cuda:0')

CNN3_training.Train(800,print_loss=False)

In [None]:
CNN3_training.result.plot_error()

In [None]:
CNN3_training.result.plot_loss()

In [None]:
#We're starting to see promising results
CNN3_training.Test()

In [56]:
#We're starting to see promising results
CNN3_training.Test()

Out-of-sample error: tensor(0.0852, device='cuda:0')
