In [None]:
import numpy as np  # build "from scratch" with numpy
from sklearn.datasets import load_boston  # data set to work with
from sklearn.model_selection import train_test_split
# from sklearn.datasets import make_moons
from functools import wraps
from sklearn.utils import shuffle # shuffle two arrays in unison
from collections import defaultdict


In [None]:
# suppress debug logs efficiently (as explained in https://stackoverflow.com/a/2829036/9610637)
import sys
import contextlib

class DummyFile(object):
    def write(self, x):
        pass
    
@contextlib.contextmanager
def suppressed_stdout():
    real_stdout = sys.stdout
    sys.stdout = DummyFile()
    yield # everything in 'with' block happens...
    sys.stdout = real_stdout

In [None]:
X, y = load_boston(return_X_y = True) # a regression problem

In [None]:
# # convert y to bins of width 5
# # (early part of classification retrofitting)

# y_inds = [(val // 5) for val in y]
# y_inds = np.array(y_inds, dtype = int)
# y_inds -= y_inds.min()
# vec_len = y_inds.max() + 1

# def convert_to_onehot(y_val, ind_max):
#     vec = np.zeros(ind_max, dtype = np.uint8)
#     vec[y_val] += 1
#     return vec

# y_cls = [convert_to_onehot(i, vec_len) for i in y_inds]
# y_cls = np.array(y_cls)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X,y)

In [None]:
def yield_minibatches(X, y, batch_size):
    '''
    Yield minibatches from X and y of size batch_size.
    Copies of X and y are shuffled before yielding batches.
    '''
    X_shuffled, y_shuffled = shuffle(X, y) # using sklearn.utils.shuffle()
    
    for i in range(0, X.shape[0], batch_size):
        yield X[i:batch_size+i, ...], y[i:batch_size+i, ...]
        
    return

In [None]:
@wraps
def vectorize(f):
    '''Make f NumPy-broadcastable.'''
    return np.vectorize(f)

def perform_dropout(arr, dropped_connection_prop = 0.5):
    '''Based on the shape of arr, drop a certain proportion of connections
    via a mask, and scale the remaining connections appropriately
    (so that the magnitude of output values is similar between training and testing).
    Returns the dropped and scaled arr, and the mask.'''
    # each connection is present on average, (1 - dropped_connection_prop)
    # so when it *is* present, scale it by 1/(1 - dropped_connection_prop)
    # then, when *all* connections are being used,
    # each connection will be (1 - dropped_connection_prop) its magnitude than during testing,
    # but there will be 1/(1 - dropped_connection_prop) as many connections,
    # -> 1x the magnitude (without having to do pre-processing to connections)
    keep_prob = 1/(1 - dropped_connection_prop)
    mask = np.random.random(size=arr.shape) # by default, returns vals in [0, 1)
    mask[np.where(mask < keep_prob)] = 1 # which neurons are on
    mask[np.where(mask >= keep_prob)] = 0 # turn off other neurons 
    return arr * mask * keep_prob, mask


In [None]:
# @vectorize # NumPy gets confused by the inequality...
def sanitize(v, clip_val = 5):
    '''Sanitize values in v (a NumPy array or number) for backprop.'''
    try:
        for it in np.ndindex(*v.shape):
            v[it] = (-2 * (v[it] < 0) + 1) * min(clip_val, abs(v[it]))
    except TypeError: # we have a number
        v = (-2 * (v < 0) + 1) * min(clip_val, abs(v))
    return v

In [None]:
## part of classification retrofitting endeavor
# def softmax(arr):
#     return np.exp(arr) / np.exp(arr).sum() #np.exp() performs exp() elementwise
#
# def softmax_loss():
#     pass
    
    
    
# for least-squares regression
@vectorize
def loss_function(y_pred, y_actual):
    '''Least-squares regression loss function.'''
    # we'll use least-squares for this regression model
    return 0.5 * (y_pred - y_actual)**2

@vectorize
def error_signal(y_pred, y_actual):
    '''Error signal for least-squares regression.'''
    # least-squares error signal
    return (y_pred - y_actual)

In [None]:
def train(model, X, y, n_epoch, batch_size = 20, verbose = False):
    '''Train the model using X and y a total of n_epoch times.
    X and y will be shuffled and fed in minibatches of size batch_size.
    
    If verbose is set to True, all of the model's internal logging will be written to sys.stdout;
    otherwise, only average loss at the end of each epoch is printed out.
    '''
    # contextlib.ExitStack() is like a 'with no-op', as per
    # https://docs.python.org/3/library/contextlib.html#simplifying-support-for-single-optional-context-managers
    verbosity_context = contextlib.ExitStack if verbose else suppressed_stdout
    
    for i_epoch in range(n_epoch):
        i_batch, loss = 0, 0
        with verbosity_context():
            for X_batch, y_batch in yield_minibatches(X, y, batch_size = 20):
                i_batch += 1
                y_preds = model.forward(X_batch)
                loss += loss_function(y_preds, y_batch).mean()
                model.backward(y_batch)
        loss /= i_batch
        print("Epoch #{0}. Average loss: {1}".format(i_epoch+1, loss))
    return

def test(model, X, y):
    model.set_mode('test')
    y_preds = model.forward(X)
    loss = loss_function(y_preds, y).mean()
    print("Average error: {0}".format(loss))

In [None]:
class Model:
    def __init__(self, n_features, n_outputs, n_layers=2, n_neurons_per_layer=25):
        '''
        n_layers: number of *hidden* layers (ie not including output layer).
        Using ReLU nonlinearity between layers.
        '''
        self.lr = 1E-4
        self.mode = 'train' # know whether to e.g. implement dropout
        self.n_layers = n_layers
        self.n_outputs = n_outputs
        self.layer_list = ['in', *range(self.n_layers), 'out']
            # contain per-run parameters, e.g. hidden layer vals, dropout masks
        self.history_dict = defaultdict(dict)
            # 'h_in' == X, 'h_out' == y_preds
        # initialize layers
        self.layers = {}
        self.layers['W_in'] = np.random.randn(n_features, n_neurons_per_layer)
        self.layers['b_in'] = np.ones(n_neurons_per_layer)
        for i in range(n_layers):
            self.layers['W_{0}'.format(i)] = np.random.randn(n_neurons_per_layer, n_neurons_per_layer)
            self.layers['b_{0}'.format(i)] = np.ones(n_neurons_per_layer)
                # start with relatively high bias to avoid having too many immediately dead neurons
        self.layers['W_out'] = np.random.randn(n_neurons_per_layer, n_outputs)
        self.layers['b_out'] = np.ones(n_outputs)
    
    
    
    def forward(self, X):
        '''
        Provide a prediction based on input array X. (Assuming order (n_samples, n_features))
        '''
        cur = X
        
        for i,x in enumerate(self.layer_list): # all layers
            self.history_dict['hs']['h_{}'.format(x)] = cur.copy() # remember inputs for backprop
            print("Forward prediction at layer '{0}'".format(x))
            cur_W, cur_b = self._prepare_layers(x)
#             cur = cur @ cur_W + cur_b # update inputs
            # With current backward implementation, biases seem to kill weights
            # and turn all output into a small constant.
            #
            # To fix, would probably need to implement regularization into loss function/error signal
            # to prevent gradual creep of biases/death of connections.
            # At the same time, would have to balance the regularization penalty
            # with the least-squares loss.
            # Or, a bit more hacky but could potentially work:
            # we could train the weights first (ensuring they don't die immediately),
            # then train the biases, and alternate until convergence (on ideally not just a number,
            # since the weights will already have some signal by the time the biases are being trained
            # and so the output would ideally not just be a constant)
            #
            # It seems we're not alone in finding regression models temperamental,
            # as in the suggestion to use bins here: http://cs231n.github.io/neural-networks-2/
            #
            # Given the data, it would be reasonable to split y into 10 bins of width 5
            # and make a classifier using e.g. softmax
            # However, since our goal was simply to implement a neural net from scratch
            # and ensure that it functions as intended,
            # and we seem to have done so (while getting some practical understanding of
            # the difficulties of regression models),
            # we shall just dummy out the bias and stick to regression with just weights
            # (even though the predicted values are more than a bit dubious)
            # rather than retrofit everything for classification
            # or impose L2 regularlization
            # (Next time!)
            cur = cur @ cur_W # dummy out biases


            if x != 'out':
                cur[np.where(cur < 0)] = 0 # apply ReLU
#             else:
#                 if self.n_outputs == 1: # regression
#                     cur = cur @ cur_W + cur_b
#                     break
#                 else: # classification. Use softmax (will do without biases)
#                     cur = softmax(cur @ cur_W)
            print('cur.shape after multiplication: {}'.format(cur.shape))
                # dim is always (n_samples, ...)
                # dim of cur_b matches last dimension of (cur @ cur_W) 
                # so bias is broadcasted to each sample (cur's first dim) 

        self.history_dict['y'] = cur.copy()
        return cur
            
    
    def backward(self, y):
        '''
        Based on results of most recent call to self.forward()
        and the correct answers y, update parameters.
        
        Error signal and loss function values are determined 
        by calls to loss_function() and error_signal()
        '''
        y_preds = self.history_dict['y']
        errs = error_signal(y_preds, y.reshape(y_preds.shape))
        print('errs.shape = {}'.format(errs.shape))
        
        errs = sanitize(errs)
        backprop_list = list(reversed(self.layer_list))
            # 'h_in' == X, 'h_out' == Y

        for x in backprop_list:
            print("Backprop at layer '{0}'".format(x))
            W_cur, b_cur = self.layers['W_{0}'.format(x)], self.layers['b_{0}'.format(x)]

            mask, h_cur = self.history_dict['masks']['W_{}'.format(x)], \
                          self.history_dict['hs']['h_{}'.format(x)]
            print('W_cur.shape = {}, b_cur.shape = {}, mask.shape = {}, h_cur.shape = {}'.
                          format(W_cur.shape, b_cur.shape, mask.shape, h_cur.shape))
                # update biases
            errs = errs.mean(axis = 0, keepdims = True) # average along samples
            b_cur -= sanitize(self.lr * errs.squeeze(), clip_val = 1)
                # errs.squeeze() <- make dims match (for regression)
            self.layers['b_{0}'.format(x)] = b_cur
                # update weights
            h_cur = h_cur.mean(axis = 0, keepdims = True) # average along samples
            # get error per weight parameter
            # to figure out order of matmul, take, for example, dims of first backprop step
            #
            # dim(errs) == (num_samples, n_outputs), dim(h_cur) = (n_samples, n_neurs)
            # want to be of shape dim(W_cur) == (n_neurs, n_outputs)
            errs = h_cur.T @ errs
            errs = sanitize(self.lr * errs, clip_val = 5)
            W_cur -= errs * mask
            self.layers['W_{0}'.format(x)] = W_cur

        

        
        
    def _prepare_layers(self, key_id):
        '''Prepare weights/biases for multiplication (i.e., perform dropout or not).
        Indicate which layer using key_id, a string.
        
        Returns the prepared layers as a tuple in order (W,b)
        and adds relevant info to self.history_dict, if applicable.
        '''
        W, b = self.layers['W_{}'.format(key_id)], self.layers['b_{}'.format(key_id)]
        if self.mode == 'train': # perform dropout of connections
            W, mask = perform_dropout(W, dropped_connection_prop = 0.5)
            # remember mask for proper backprop
            print('After perform_dropout, W.shape = {}, mask.shape = {}'.format(W.shape, mask.shape))
            self.history_dict['masks']['W_{}'.format(key_id)] = mask
        else:
            # be compatible with self.backward()
            self.history_dict['masks']['W_{}'.format(key_id)] = np.ones(W.shape, dtype = np.bool)
        return W,b
            
    
    def set_mode(self, mode_flag):
        """Switch between training mode ('train') and testing mode ('test')."""
        self.mode = mode_flag

In [None]:
model = Model(n_features = X_train.shape[-1], n_outputs = 1) # y is 1-D so can't do y.shape[-1]

In [None]:
train(model, X_train, y_train, n_epoch = 100, verbose = False)

In [None]:
test(model, X_test, y_test)