In [1]:
import os
import sys

SPARK_HOME = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, SPARK_HOME + "/python")

In [2]:
# Add the py4j to the path.
# You may need to change the version number to match your install
sys.path.insert(0, os.path.join(SPARK_HOME, 'python/lib/py4j-0.10.3-src.zip'))

# Initialize PySpark to predefine the SparkContext variable 'sc'
execfile(os.path.join(SPARK_HOME, 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.2
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


Below a standard functions to load dataset and import all necessary stuff

In [3]:
import os
import os.path as osp

import numpy as np

%matplotlib inline
import matplotlib.pyplot as plt

import sklearn
from sklearn.metrics import accuracy_score

In [4]:
%%sh

wget -q -nc https://raw.githubusercontent.com/amitgroup/amitgroup/master/amitgroup/io/mnist.py

In [5]:
import mnist

In [6]:
%%sh

mkdir -p mnist && {
    cd mnist;
    wget -q -nc http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz &&
    gunzip *.gz
}

gzip: t10k-images-idx3-ubyte already exists;	not overwritten
gzip: t10k-labels-idx1-ubyte already exists;	not overwritten
gzip: train-images-idx3-ubyte already exists;	not overwritten
gzip: train-labels-idx1-ubyte already exists;	not overwritten


In [7]:
X, y = mnist.load_mnist(dataset='training', path='mnist/')
X = X.reshape(-1, 1, 28, 28)

X_test, y_test = mnist.load_mnist(dataset='testing', path='mnist/')
X_test = X_test.reshape(-1, 1, 28, 28)

Creating RDD for train and test, specifying number of partitions to be 64

In [8]:
X_train_rdd = sc.parallelize(
    (i, y[i], X[i].ravel().copy())
    for i in xrange(X.shape[0])
).repartition(64).persist()

X_test_rdd = sc.parallelize(
    (i, y_test[i], X_test[i].ravel().copy())
    for i in xrange(X_test.shape[0])
).repartition(64).persist()

In [9]:
#Checking number of partitions
X_train_rdd.getNumPartitions()

64

In [10]:
len(X)

60000

I used the base code from the seminar and built upon it. Here in epoch we state the update rule for weights and calculate weights themselves. The gradient is obtained from formula: 
$$\omega_i^{t+1} = \omega_i^t + \nu\frac{1}{N}\sum_i^N \sum_j x_i^j[y_i^j-\hat{P}(Y_i^j=k \>|\> w,b)]$$, $y_i^j$ is an indicator of class j of observation i

In [22]:
class LogisticRegression(object):
    def __init__(self, C = 1.0e-3):
        self.W = np.random.uniform(-1, 1, size=(28 * 28, 10))
        self.b = np.random.uniform(-1, 1, size=(10, ))
        self.I = np.identity(10)
        self.size = 60000
    
    def softmax(self, x):
        ### note that this is not an efficient implementation
        ### of X.dot(W)
        exps = np.exp(x.dot(self.W)+self.b)
        return exps / np.sum(exps)
    
    def epoch(self,x,learning_rate, bch):
        N = 1.0/(self.size*bch)
        batch = x.sample(False,bch).cache()
        W_grad = batch.map(lambda (i, y, x): N*np.outer(x,self.I[y]-self.softmax(x))).reduce(lambda a,b: a+b)
        b_grad = batch.map(lambda (i, y, x): N*(self.I[y]-self.softmax(x))).reduce(lambda a,b: a+b)
        self.W = self.W + learning_rate*W_grad
        self.b = self.b + learning_rate*b_grad
        
    def train(self,x,lr=1, n_epochs = 100,aim=0.80,extension =5,significance = 0.01):
        best_acc = 0
        timer = extension+1
        for i in range(n_epochs):
            self.epoch(x,lr,0.5)
            y_labels = x.map(lambda (i, y, x): y).collect()
            y_predict = x.map(lambda (i, y, x): np.argmax(self.softmax(x))).collect()
            accuracy = accuracy_score(y_labels,y_predict)
            if accuracy > best_acc+significance:
                best_acc = accuracy
                timer = extension+1
            if accuracy > aim:
                timer -=1
                if timer == 0:
                    print "required accuracy reached"
                    print 'best accuracy after %d epochs  : ' %i, best_acc
                    break
                
    def test_pred(self, x):
        y_t = x.map(lambda (i, y, x): y).collect()
        y_t_predict = x.map(lambda (i, y, x): np.argmax(self.softmax(x))).collect()
        accuracy = accuracy_score(y_t,y_t_predict)
        return accuracy

It takes forever to reach 85%, as accuracy increases steadily but increasingly slow, so i decided to cut it at 80%. Due to big sample size, overfitting is not the case, as seen from check on test sample:

In [23]:
L = LogisticRegression()
L.train(X_train_rdd)

required accuracy reached
best accuracy after 53 epochs  :  0.803166666667


In [24]:
L.test_pred(X_test_rdd)

0.81569999999999998