In [1]:
%matplotlib inline
import matplotlib.pyplot as plt

import numpy as np
import time
import random

In [2]:
import findspark
findspark.init() 

import pyspark
sc = pyspark.SparkContext()

In [3]:
from mnist import MNIST
mndata = MNIST('/Users/dcusworth/Desktop/mnist/MNIST/python-mnist/data')
images, labels = mndata.load_training()

In [4]:
#Build feature map
N = 1000 #How many images I want to load
d = 784 #Pixels of MNIST data
    
#label_func = lambda x,choose_label: [1 if la == choose_label else -1 for la in x]
def label_func(x, choose_label):
    if x == choose_label:
        return 1
    else:
        return -1

In [115]:
start = time.time()

#Retrieve data and labels - do preprocessing
y_labs = labels[0:N]

#Loop over set of regularization parameters
vaccs = []
lambdas = [10**q for q in np.linspace(-5,5,10)]
label_dat = range(10)

#Load images
feature_map = np.zeros((N,d))
for i in range(N): #Just do a subset of training for now
    feature_map[i,:] = images[i]

#Start spark instance on points
#Take train test split
sinds = range(N)
random.shuffle(sinds)
tint = int(.8*N)
tind = sinds[0:tint]
vind = sinds[tint:-1]

#Center - i.e. remove mean image
fpoints = sc.parallelize(feature_map)
fmean = fpoints.map(lambda x: x).reduce(lambda x,y: (x+y) ) / float(N)
x_c = fpoints.map(lambda x: x-fmean).collect()

#Create Spark context for feature matrix
x_t = sc.parallelize(list(enumerate(x_c))).filter(lambda x: x[0] in tind).map(lambda x: x[1])
xtb = sc.broadcast(x_t.collect())
x_v = sc.parallelize(list(enumerate(x_c))).filter(lambda x: x[0] in vind).map(lambda x: x[1])
xvb = sc.broadcast(x_v.collect())

#Get training/test labels
ytrain = sc.parallelize(list(enumerate(y_labs))).filter(lambda x: x[0] in tind).map(lambda x: x[1]).collect()
y_val = sc.parallelize(list(enumerate(y_labs))).filter(lambda x: x[0] in vind).map(lambda x: x[1]).collect()
tpoints = sc.parallelize(zip(ytrain, xtb.value))


#Get pseudo-inverse
pseudo_inv = sc.parallelize(zip(lambdas, [np.asarray(xtb.value)] * len(lambdas)))\
    .map(lambda x: (x[0], np.linalg.inv(np.dot(x[1].T, x[1]) + np.eye(x[1].shape[1])*N*x[0])))\
    .collect()

#Get transformation of Y - i.e. X^T * Y
XtY= sc.parallelize(zip(label_dat, [np.asarray(ytrain)] * len(label_dat), [np.asarray(xtb.value)] * len(label_dat)))\
    .map(lambda x: (x[0], [label_func(q, x[0]) for q in x[1]], x[2]))\
    .map(lambda x: (x[0], np.dot(x[2].T, x[1]))).collect()

#Get combinations of labels and lambdas
def solution_func(ipseudo, XtY):
    #ipseudo = pseudo_inv[0]

    iouts = sc.parallelize(XtY)\
            .map(lambda x: (x[0], np.dot(ipseudo[1], x[1])))\
            .map(lambda x: (x[0], np.dot(np.asarray(xvb.value), x[1])))\
            .sortByKey(True)\
            .map(lambda x: x[1])\
            .collect()

    out_pred = zip(*iouts)
    ipred = sc.parallelize(zip(*iouts)).map(lambda x: np.argmax(x)).collect()
    
    return (ipseudo[0], ipred)

#Run over all lambdas
sols = []
for ipinv in pseudo_inv:
    sols.append(solution_func(ipinv, XtY))

#Find the best lambda
best_sol = sc.parallelize(sols)\
    .map(lambda x: (x[0], np.sum([y == p for y,p in zip(y_val, x[1])]) / float(len(x[1]))))\
    .max(lambda x:x[1])
    
end = time.time()
    
print 'validation accuracy = ', best_sol[1]
print 'best lambda =', best_sol[0]
print 'elapsed time for', N, 'samples = ', end-start, 'seconds'

 validation accuracy =  0.824120603015
best lambda = 599.484250319
elapsed time for 1000 samples =  10.8703939915 seconds
