In [3]:
import sys
import time
import pickle
import gzip
from random import randint
from scipy import misc
from scipy import special
import scipy.ndimage
from scipy.sparse import csc_matrix, issparse
import numpy as np
import datetime as dt
from sklearn.cluster import KMeans,MiniBatchKMeans
import matplotlib.pyplot as plt
import json
import csv
import collections
import math
import sys

#setting path
DATA_PATH = 'data/mnist/'

IMAGES_TRAIN = 'data_training'
IMAGES_TEST = 'data_testing'

RANDOM_SEED = 42
N_CLASSES = 10
N_FEATURES = 28 * 28

#import data+label
data_training = DATA_PATH+IMAGES_TRAIN
data_testing = DATA_PATH+IMAGES_TEST
ft = gzip.open(data_training, 'rb')
TRAINING = pickle.load(ft)
ft.close()
ft = gzip.open(data_testing, 'rb')
TESTING = pickle.load(ft)
ft.close()

In [15]:
class NN:
    def __init__(self, neurons, stop_parameter):
        self.input_size = N_FEATURES
        self.output_size = N_CLASSES
        self.neurons = neurons
        self.stop_p = stop_parameter
        self.best = 0.
        self.same = 0
        self.iteration = 0
        
        #create random matrix for wheights
        np.random.seed(RANDOM_SEED)
        hidden_layer = np.random.rand(self.neurons, self.input_size + 1)/self.neurons
        output_layer = np.random.rand(self.output_size, self.neurons + 1) / self.output_size
        self.layers = [hidden_layer, output_layer]
        
    def train (self, training, testing):
        accu_train = 0. #[0., 0.]
        len_train = len(training[0])
        len_test = len(testing[0])
        self.start_time = dt.datetime.now()
        typeTrainingPrint = "Stop Function: " 
        typeTrainingPrint += "improvements below "+str(self.stop_p)+"%"
        print(typeTrainingPrint)
        #print('\nNeurons: %d\nBatch Train: %d\nBatch Test: %d\n%s\n' % (self.neurons,len_batch_train,len_batch_test,typeTrainingPrint))
        inputs = training[0][0:len_train]
        targets = np.zeros((len_train, 10))
        for i in range(len_train):
            targets[i, training[1][i]] = 1
        
        while not self.is_stop_function(accu_train): #(accu_train[1]):
            self.iteration += 1
        
            for input_vector, target_vector in zip(inputs, targets):
                self.backprop(input_vector, target_vector)
        
        
            accu_test = self.accu(testing)
            accu_train = self.accu(training)
                
            #print (accu_train)
            
            if (self.iteration == 1 or self.iteration % 10 == 0):
                self.print_message_iter(self.iteration,accu_test,accu_train,self.ETAepoch(self.start_time))
                
        if (self.iteration % 10 != 0):
            self.print_message_iter(self.iteration,accu_test,accu_train,self.ETAepoch(self.start_time))

        # Final message
        print('\n-- Training Session End (%s) --' % (dt.datetime.now()))

    def feed_forward(self, input_vector):
        outputs = []
        for layer in self.layers:
            input_with_bias = np.append(input_vector, 1) 
            output = np.inner(layer, input_with_bias)
            output = special.expit(output)
            outputs.append(output)
            
            input_vector = output
        return outputs

    def backprop(self, input_vector, target):
        c = 10**(-4) + 10**(-1)/math.sqrt(self.iteration)  # Learning coefficient
        hidden_outputs, outputs = self.feed_forward(input_vector)
        output_deltas = outputs * (1 - outputs) * (outputs - target)
        self.layers[-1] -= c*np.outer(output_deltas, np.append(hidden_outputs, 1))
        
        hidden_deltas = hidden_outputs * (1 - hidden_outputs) * np.dot(np.delete(self.layers[-1], self.neurons, 1).T, output_deltas)
        self.layers[0] -= c*np.outer(hidden_deltas, np.append(input_vector, 1))

    def predict(self, input_vector):
        return self.feed_forward(input_vector)[-1]

    def predict_one(self, input_vector):
        return np.argmax(self.feed_forward(input_vector)[-1])
    
    def accu(self, testing):
        res= np.zeros((10, 2))
        #se giusto aggiungo 1 ad entrambe le colonne di res (colonne = giusti, totali)
        for k in range(len(testing[1])):
            if self.predict_one(testing[0][k]) == testing[1][k]:
                res[testing[1][k]] += 1
            else:
                res[testing[1][k]][1] += 1
        total = np.sum(res, axis=0)
        return np.round(total[0]/total[1]*100, 2)
    
    def is_stop_function(self, accuracy):
        if accuracy > self.best + self.stop_p or self.iteration == 0:
            self.best = accuracy
            return False
        else:
            return True
            
    def print_message_iter(self,iteration,accu_test,accu_train,eta):
        len_eta = len(eta)
        space_fill = 6 - len_eta
        eta = "("+eta+")"
        for _ in range(space_fill):
            eta += " "
            message = 'Epoch '+str(self.iteration).zfill(3) + " "+eta+" "
            message += 'Accuracy TRAIN: '+str(accu_train).zfill(4)+'%\t'
            message += 'Accuracy TEST: '+str(accu_test).zfill(4)+'%\t'
        print(message)
        
    def ETAepoch(self,start_time):
        diff = dt.datetime.now() - self.start_time
        eta = divmod(diff.days * 86400 + diff.seconds, 60)
        if eta[0] != 0:
            ret = str(eta[0])+"m"
        else:
            ret = ""
        ret += str(eta[1])+"s"
        return ret

In [16]:
nn = NN(neurons=100, stop_parameter=0.01)
nn.train(TRAINING, TESTING)

Stop Function: improvements below 0.01%
Epoch 001 (49s)    Accuracy TRAIN: 92.84%	Accuracy TEST: 93.04%	
Epoch 010 (7m57s)  Accuracy TRAIN: 98.01%	Accuracy TEST: 97.03%	


UnboundLocalError: local variable 'message' referenced before assignment

In [2]:
import os
os.getcwd()

'C:\\Users\\Giosuè\\Desktop\\NN\\NN'