# Digit Recognizer
This is just the notebook that I wrote in order to run my Convolutional Neural Network model in an Apache Spark virtual cluster (powered by [Databricks](https://databricks.com/)).

In [1]:
!pip install nolearn

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from nolearn.lasagne import BatchIterator
from scipy.ndimage.interpolation import rotate

In [3]:
plt.style.use('bmh')

In [4]:
class RotateBatchIterator(BatchIterator):
    """Performs data augmentation by rotating half of the images in the batch.
    """
    def transform(self, Xb, yb):
        Xb, yb = super(RotateBatchIterator, self).transform(Xb, yb)
        batch_size = Xb.shape[0]
        indices = np.random.choice(batch_size, batch_size / 2, replace=False)
        ind_pos = indices[0::2]
        ind_neg = indices[1::2]
        Xb[ind_pos] = rotate(Xb[ind_pos], angle=30.0, axes=(3, 2),
            reshape=False, order=1)
        Xb[ind_neg] = rotate(Xb[ind_neg], angle=-30.0, axes=(3, 2),
            reshape=False, order=1)
        return Xb, yb

In [5]:
class AdjustParameter(object):
    """Adjusts the value of some parameter of the network at the end of each
    epoch, in order to make it vary over a range of predefined values.
    """
    def __init__(self, name, start, stop):
        self.name = name
        self.start = start
        self.stop = stop
        self.ls = None

    def __call__(self, nn, train_history):
        if self.ls is None:
            self.ls = np.linspace(self.start, self.stop, nn.max_epochs)

        epoch = train_history[-1]['epoch']
        new_value = np.cast['float32'](self.ls[epoch - 1])
        getattr(nn, self.name).set_value(new_value)

In [6]:
class EarlyStopping(object):
    """Stops the learning process early if the network spends too many epochs
    without any performance improvement.
    """
    def __init__(self, patience):
        self.patience = patience
        self.best_valid = np.Inf
        self.best_valid_epoch = 0
        self.best_weights = None

    def __call__(self, nn, train_history):
        current_valid = train_history[-1]['valid_loss']
        current_epoch = train_history[-1]['epoch']

        if current_valid < self.best_valid:
            self.best_valid = current_valid
            self.best_valid_epoch = current_epoch
            self.best_weights = nn.get_all_params_values()

        elif self.best_valid_epoch + self.patience < current_epoch:
            print('Early stopping')
            print('Best valid loss was %f at epoch %d' %
                  (self.best_valid, self.best_valid_epoch))
            nn.load_params_from(self.best_weights)
            raise StopIteration()

In [7]:
def train(X, y):
    """Trains a ConvNet classifier on some training data.

    Inputs:
        X  Training data.
        y  Training labels.

    Outputs:
        clf  Trained classifier.
    """
    from theano import shared
    from nolearn.lasagne import NeuralNet
    from nolearn.lasagne import TrainSplit
    from lasagne.layers import InputLayer
    from lasagne.layers import Conv2DLayer
    from lasagne.layers import DenseLayer
    from lasagne.layers import MaxPool2DLayer
    from lasagne.layers import DropoutLayer
    from lasagne.nonlinearities import softmax

    layers = [
        (InputLayer, {'shape': (None, X.shape[1], X.shape[2], X.shape[3])}),
        (Conv2DLayer, {'num_filters': 32, 'filter_size': 3, 'pad': 'same'}),
        (MaxPool2DLayer, {'pool_size': 2}),
        (DropoutLayer, {'p': 0.1}),
        (Conv2DLayer, {'num_filters': 64, 'filter_size': 3, 'pad': 'same'}),
        (MaxPool2DLayer, {'pool_size': 2}),
        (DropoutLayer, {'p': 0.2}),
        (Conv2DLayer, {'num_filters': 128, 'filter_size': 3, 'pad': 'same'}),
        (MaxPool2DLayer, {'pool_size': 2}),
        (DropoutLayer, {'p': 0.3}),
        (DenseLayer, {'num_units': 512}),
        (DropoutLayer, {'p': 0.4}),
        (DenseLayer, {'num_units': 256}),
        (DropoutLayer, {'p': 0.5}),
        (DenseLayer, {'num_units': 10, 'nonlinearity': softmax}),
    ]
    clf = NeuralNet(
        layers,
        max_epochs=100,
        train_split=TrainSplit(eval_size=0.25),
        # batch_iterator_train=RotateBatchIterator(batch_size=128),
        on_epoch_finished=[
            AdjustParameter('update_learning_rate', start=0.01, stop=0.0001),
            AdjustParameter('update_momentum', start=0.9, stop=0.999),
            EarlyStopping(patience=10),
        ],
        update_learning_rate=shared(np.cast['float32'](0.01)),
        update_momentum=shared(np.cast['float32'](0.9)),
        verbose=2,
    )

    return clf.fit(X, y)

In [8]:
def evaluate(clf, X, y):
    """Evaluates a pretrained classifier on some test data.

    Inputs:
        clf  Pretrained classifier.
        X    Test data.
        y    Test labels.

    Outputs:
        y_pred  Array of predicted labels.
    """
    from sklearn.metrics import confusion_matrix
    from sklearn.metrics import classification_report

    y_pred = clf.predict(X)

    print('ACCURACY:')
    print(((y == y_pred).sum() / y.size))
    print()

    print('CONFUSION MATRIX:')
    print(confusion_matrix(y, y_pred))
    print()

    print('CLASSIFICATION REPORT:')
    print(classification_report(y, y_pred))
    print()

    return y_pred

In [9]:
from pyspark.sql import SparkSession # databricks
from sklearn.cross_validation import train_test_split
from nolearn.lasagne.visualize import plot_loss
from math import sqrt

In [10]:
# TRAIN_PATH = 'datasets/train.csv' # localhost
# TEST_PATH = 'datasets/test.csv' # localhost
TRAIN_PATH = '/FileStore/tables/8iacswxe1475982269318/train.csv' # databricks
TEST_PATH = '/FileStore/tables/nrhys71b1475981053444/test.csv' # databricks

In [11]:
ss = SparkSession.builder \
                 .master("local") \
                 .appName("kaggle_digit_recognizer") \
                 .config("spark.some.config.option", "session") \
                 .getOrCreate() # databricks

In [12]:
rdd = sc.textFile(TRAIN_PATH)
rdd = rdd.filter(lambda row: 'pixel0' not in row)
rdd = rdd.map(lambda row: [int(x) for x in row.split(',')])

In [13]:
# df_train = pd.read_csv(TRAIN_PATH) # localhost
colnames = ['label'] + ['pixel%d' % i for i in range(784)] # databricks
df_train = ss.createDataFrame(rdd, colnames).toPandas() # databricks
del rdd # databricks

In [14]:
X_train = df_train.drop('label', axis=1).values.astype(np.float32) / 255
y_train = df_train['label'].values.astype(np.int32)
del df_train

In [15]:
imsize = (int(sqrt(X_train.shape[1])),) * 2
X_train = X_train.reshape(-1, 1, imsize[0], imsize[1])

In [16]:
# X_train, X_valid, y_train, y_valid = train_test_split(
#     X_train, y_train, test_size=0.2)

In [17]:
clf = train(X_train, y_train)
del X_train, y_train

In [18]:
fig = plt.figure()
plot_loss(clf)
# plt.show() # localhost
display(fig) # databricks

In [19]:
# evaluate(clf, X_valid, y_valid)
# del X_valid, y_valid

In [20]:
rdd = sc.textFile(TEST_PATH)
rdd = rdd.filter(lambda row: 'pixel0' not in row)
rdd = rdd.map(lambda row: [int(x) for x in row.split(',')])

In [21]:
# df_test = pd.read_csv(TEST_PATH) # localhost
colnames = ['pixel%d' % i for i in range(784)] # databricks
df_test = ss.createDataFrame(rdd, colnames).toPandas() # databricks
del rdd # databricks

In [22]:
X_test = df_test.values.astype(np.float32) / 255
del df_test

In [23]:
X_test = X_test.reshape(-1, 1, imsize[0], imsize[1])
y_test = clf.predict(X_test)

In [24]:
imid = np.arange(1, y_test.size + 1)
df_submission = pd.DataFrame({'ImageId': imid, 'Label': y_test})
# df_submission.to_csv('submission.csv') # localhost
display(ss.createDataFrame(df_submission)) # databricks