In [1]:
import numpy as np
import matplotlib as plt

In [101]:
def logistic_regression(loss, grad_l, w0, data, batch_size, n_epochs, alpha = 0.01):
    ''' 
    Input:
        l: the function l(w; D) we want to optimize.
        It is supposed to be a Python function, not an array.
        grad_l: the gradient of l(w; D). It is supposed to be a Python function, not an array.
        w0: an n-dimensional array which represents the initial iterate. By default, it
        should be randomly sampled.
        data: a tuple (x, y) that contains the two arrays x and y, where x is the input data,
        y is the output data.
        batch_size: an integer. The dimension of each batch. Should be a divisor of the number of data.
        n_epochs: an integer. The number of epochs you want to reapeat the iterations.
    Output:
        w: an array that contains the value of w_k FOR EACH iterate w_k (not only the latter).
        f_val: an array that contains the value of l(w_k; D)
        FOR EACH iterate w_k ONLY after each epoch.
        grads: an array that contains the value of grad_l(w_k; D)
        FOR EACH iterate w_k ONLY after each epoch.
        err: an array the contains the value of ||grad_l(w_k; D)||_2
        FOR EACH iterate w_k ONLY after each epoch.
    '''

    epoch = 1


    # divide data in X and y
    X, y = data
    l, N = X.shape

    total_number_iter = (N // batch_size) * n_epochs
    tot_iter = 0
    
    # Every row is a wieght vector
    weights_array = np.zeros((total_number_iter, l))
    loss_array = np.zeros((n_epochs, 1))
    gradient_array = np.zeros((n_epochs, l)) 
    err_array = np.zeros((n_epochs, 1))

    iter_epoch = N // batch_size
    # Now lest shuffle the dataset
    X_shuffled, y_shuffled = shuffle_dataset(X, y, N // batch_size)
    
    while epoch <= n_epochs:
        # Define the indexes of the batch
        start = 0 
        end = batch_size
        for i in range(iter_epoch):
            # Add the actual weights to the array
            weights_array[tot_iter, :] = np.reshape(w0, (l, ))
            
            # Selecte the sample
            X_sample = X_shuffled[:, start:end]
            y_sample = y_shuffled[start:end]
            
            # update the batch indexes
            start = end
            end = end + batch_size

            # Compute the gradient
            grad = grad_loss(w0, X_sample, y_sample)
            
            # Update the weights
            w0 -= alpha * np.reshape(grad, (l, 1))
            tot_iter += 1

            if i % 100 == 0:
                loss_value = loss_func(w0, X_sample, y_sample)
                grad_norm = np.linalg.norm(grad)
                print('At iteration {}\ntotal loss = {} ; error = {}'.format(tot_iter, loss_value, grad_norm))
        
        # Compute the loss value
        loss_value = loss_func(w0, X_sample, y_sample)

        # Compute the norm of the gradient
        grad_norm = np.linalg.norm(grad)

        
        

        loss_array[epoch - 1] = np.reshape(loss_value, 1)
        gradient_array[epoch - 1, :] = np.reshape(grad, (l, ))
        err_array[epoch - 1] = grad_norm
 
        X_shuffled, y_shuffled = shuffle_dataset(X, y, N // batch_size)
        epoch += 1

    return weights_array, loss_array, gradient_array, err_array
           


def shuffle_dataset(X, y, batch_size):
    '''
    Input : X is the dataset
            y is the labels array
            batch_size is the lenght of one subset
    Return : X and y shuffled randomly
            
    '''
    idxs = np.arange(X.shape[1])
    np.random.shuffle(idxs)
    
    X_sample = X[:, idxs]
    y_sample = y[idxs]
    
    return X_sample, y_sample 



def loss_func(w0, x_hat, y):
    '''     
    Input : w0 is the column of weights with shape = l+1 x 1. 
                (the weights are repeated to vectorize the operation).
            x_hat is a matrix vector containing the batch. shape = l+1 x batch_size. 
            y is the label vector.
    Returns : the MSE of the prodiction xhat @ wo 
    '''
    _, N = x_hat.shape


    z = x_hat.T @ w0
    #print('Dot product : {}'.format(z))
    fw = sigmoid(z)
    #print('Objective val :  {} Shape : {}'.format(fw, fw.shape))
    y = np.reshape(y, (N, 1))

    err = fw - y
    #print('Error : {} Error shape : {}'.format(err, err.shape))
    MSE = np.linalg.norm((err), axis = 1)**2
    #print('MSE : {}'.format(MSE))
    return (np.sum(MSE)) / N


def grad_loss(w0, x_hat, y):
    '''      
    Input : w0 is a column vector of weights with shape = l+1 x 1
            x_hat is a matrix with the data-points of my batch (l + 1 x N) 
            y is the label vector
    Returns : the gradient of the loss function computed over w0 for 
              the whole batch
    '''
    _, N = x_hat.shape

    z = x_hat.T @ w0

    fw = sigmoid(z) # I obtain a row vector
    
    # I need column vectors
    y = np.reshape(y, (N, 1))
    fw = np.reshape(fw, (N, 1))

    #print('obj : {} dot : {}'.format(fw, z))
    grad_matrix =  fw * (1 - fw) * x_hat.T * (fw - y)
    #print('Grad matrix shape : {}'.format(grad_matrix))
    # I am interested on the sum over the batch.
    
    return (np.sum(grad_matrix, axis = 0)) / N

# Forward step implementation
def predictor(X_hat, w, treshold = 0.5):
    '''     
    Input : X_hat --> test dataset with a one on the first row (shape = l+1 x N)
            w --> weight vector fitted
            treshold --> probability treshold to validate a prediciton (default 0.5)
    Returns : an array with the predictions
    '''
    _, N = X_hat.shape
    fw = sigmoid(X_hat.T @ w)
    preds = np.zeros((N,))
    preds = fw > treshold
    return preds

def accuracy(preds, labels) -> float:
    ''' 
    Input : preds --> a vector with the predictions made by the modeò
            labels --> a vector with the ground truth of the test set
    Returns : the sum over the array made by the elements for which the result is the same
              divided by N (number of elements)
    '''
    N = preds.shape
    xor_arr = np.bitwise_xor(preds, labels)
    nxor_arr = np.logical_not(xor_arr) 
    return np.sum(nxor_arr) / N
    

def sigmoid(z):
    '''         
    Input : float value
    Returns : sigmoid of z
    '''
    return 1 / ( 1 + np.exp(-z))



def divide_dataset(X, Y, N_train):
    '''
    Input: X, y dataset and labels
           N_train is the number of element of the train set
    Returns: The dasatet randomly divided into train and test
             according to N_train in the form :
             (Xtrain, Ytrain), (Xtest, Ytest) 
    '''
    
    idxs = np.arange(0, len(X[0])-1, 1)
    np.random.shuffle(idxs)

    trian_idxs = idxs[:N_train]
    test_idxs = idxs[N_train:]


    XTrain = X[:,trian_idxs]
    YTrain = Y[trian_idxs]

    XTest = X[:,test_idxs]
    YTest = Y[test_idxs]

    return (XTrain, YTrain), (XTest, YTest)

In [78]:
def get_digits(num):
    i = 0
    digits = []
    while i < num:
        digit = input("Insert a digit : ")
        try:
            digits.append(int(digit)) 
            i += 1
        except:
            print("Not an integer")
    return digits

## Implementation of logistic regression
The implementation of binary logistic regression on the MNIST dataset.
It's possible to choose the digits for the user.
#### 1) Dataset opening and division

In [94]:
import pandas as pd
import numpy as np
data = pd.read_csv('data.csv')
data_arr = np.array(data)

X = data_arr[:, :-1].T
y = data_arr[:, 0]


classes = get_digits(2)
print(classes)


# I apply my mask to the two arrays
masks = [y == i for i in classes]

# List of dataset with a unique digit taken from mask
Xlist = [X[:, mask] for mask in masks]
Ylist = [y[mask] for mask in masks]

print('Single class datasets shape')
print([X.shape for X in Xlist])

# Lest put them into a single nunpy array
X = np.concatenate(Xlist, axis=1) 
y = np.concatenate(Ylist)

# Binary conversion 
# the labels must be binary
y[y == classes[0]] = 0
y[y == classes[1]] = 1

l, N = X.shape

print('Dataset with only selected digits')
print('X shape : {}\nY shape : {} '.format(X.shape, y.shape))

print("Splitting into train and test")
(Xtrain, Ytrain), (Xtest, Ytest) = divide_dataset(X, y, int(0.1 * N))
print(max(Ytrain))
print('Train set shape : {}'.format(Xtrain.shape))
print('Test set shape : {}'.format(Xtest.shape))

[2, 9]
Single class datasets shape
[(784, 4177), (784, 4188)]
Dataset with only selected digits
X shape : (784, 8365)
Y shape : (8365,) 
Splitting into train and test
1
Train set shape : (784, 836)
Test set shape : (784, 7528)


####  2) Fitting the model

In [96]:
l, N = Xtrain.shape
# Computation of the dataset with ones in the first element
X_hat = np.concatenate((np.ones((1, N)), Xtrain), axis = 0)

# Starting weights vector
w0 = np.random.normal(0, 0.001, (l + 1, 1))

# Dataset tuple
data = (X_hat, Ytrain)

# Batch size, it could be any number less than N
batch_size = 1 

# Number of epochs
num_epochs = 1

# Learning rate
lr = 1e-4

# Training phase
w, loss, grad, err = logistic_regression(loss_func, grad_loss, w0, data, batch_size, num_epochs, lr)

At iteration 1
total loss = 8.408414808756972e-10 ; error = 56.15761645106224
At iteration 101
total loss = 8.848149848907279e-16 ; error = 1.5364963979693884e-12
At iteration 201
total loss = 0.0 ; error = 294.3041622803303
At iteration 301
total loss = 3.612484732401154e-09 ; error = 7.08987507108312e-06
At iteration 401
total loss = 3.652748781629295e-91 ; error = 9.523869835275436e-88
At iteration 501
total loss = 1.8568033618267466e-29 ; error = 4.971166831055746e-26
At iteration 601
total loss = 2.206821509448512e-50 ; error = 4.421370060943043e-47
At iteration 701
total loss = 0.999627368312386 ; error = 0.3197087737447451
At iteration 801
total loss = 1.2182552572481185e-13 ; error = 2.848128423666277e-10


### 3) Testing the accuracy of the model on test data

In [107]:
# Lets compute Xtest with ones on the first row
Xtesthat = np.concatenate((np.ones((1, Xtest.shape[1])), Xtest), axis = 0)

# Select the final weights from the list
final_weights = w[-1]

# Select the treshold for which a prediction is considered good
treshold = 0.5

#Lets now run the predictor  
preds = predictor(Xtesthat, final_weights, treshold)

res = float(accuracy(preds, Ytest) * 100) # to get the precentage
print('Accuracy : {}%'.format(round(res, 3)))

Accuracy : 92.867%
