In [1]:
from pyspark.sql import SQLContext
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
import time

In [2]:
sc = SQLContext(sc)
data_loc = 'Data/'
def ingest(file):
    a = sc.read.format('csv').options(header='false', inferSchema='true').load(data_loc+file)
    return a
train = ingest('Train-label-28x28.csv')
test = ingest('Test-label-28x28.csv')
train = train.withColumnRenamed("_c0","label")
test = test.withColumnRenamed("_c0","label")

In [5]:
num_dim = [1,5]
#num_dim = [1,5,10,25,50,75,100,200,400,784]

for x in num_dim:
    #Change pipeline
    transformer = VectorAssembler(inputCols=['_c%d' % i for i in range(1,785)],
                                  outputCol="features")
    standardizer = StandardScaler(withMean=True, withStd=True,
                                  inputCol='features',
                                  outputCol='std_features')
    indexer = StringIndexer(inputCol="label", outputCol="label_idx")
    pca = PCA(k=x, inputCol="std_features", outputCol="pca")
    lr = LogisticRegression(featuresCol='pca', labelCol='label_idx')
    pipeline = Pipeline(stages=[transformer, standardizer, indexer, pca, lr])
    
    #Start Performance Timer
    start = time.time()
    
    #Retrain
    model = pipeline.fit(train)
    
    #test
    prediction = model.transform(test)
    
    #Stop performance timer (time of train & test)
    end = time.time()
    
    #Score
    score = prediction.select(['label_idx', 'prediction'])
    
    #final score
    acc = score.rdd.map(lambda x: x[0] == x[1]).sum() / score.count()
    print('dimensions: {0}, accuracy: {1:.2f}, time: {2:.1f}'.format(x,acc,end-start))

dimensions: 1, accuracy: 0.30, time: 47.8
dimensions: 5, accuracy: 0.68, time: 34.3
