# Pyspark MNIST example

### Based on https://github.com/abulbasar/pyspark-examples

In [None]:
import findspark
findspark.init()
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
import pyspark
from pyspark.sql import SQLContext, SparkSession
spark = SparkSession \
        .builder \
        .master('spark://Tux:7077') \
        .appName("sparkFromJupyter") \
        .getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc=spark.sparkContext
print("Spark Version: " + spark.version)
print("PySpark Version: " + pyspark.__version__)

In [None]:
spark.sparkContext.uiWebUrl

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

%matplotlib inline

In [None]:
df_training = (spark
               .read
               .options(header = False, inferSchema = True)
               .csv("./data/mnist_train.csv"))

In [None]:
df_training.printSchema()

In [None]:
df_training.count()

In [None]:
df_training.take(1)

In [None]:
print("No of columns: ", len(df_training.columns), df_training.columns)


In [None]:
feature_culumns = ["_c" + str(i+1) for i in range(784)]
#print(feature_culumns)

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
vectorizer = VectorAssembler(inputCols=feature_culumns, outputCol="features")
training = (vectorizer
            .transform(df_training)
            .select("_c0", "features")
            .toDF("label", "features")
            .cache())
training.show()

In [None]:
a = training.first().features.toArray()
type(a)

In [None]:
plt.imshow(a.reshape(28, 28), cmap="Greys")

In [None]:
images = training.sample(False, 0.01, 1).take(25)
fig, _ = plt.subplots(5, 5, figsize = (10, 10))
for i, ax in enumerate(fig.axes):
    r = images[i]
    label = r.label
    features = r.features
    ax.imshow(features.toArray().reshape(28, 28), cmap = "Greys")
    ax.set_title("True: " + str(label))

plt.tight_layout()
    

In [None]:
counts = training.groupBy("label").count()

In [None]:
counts_df = counts.rdd.map(lambda r: {"label": r['label'], 
                                     "count": r['count']}).collect()
pd.DataFrame(counts_df).set_index("label").sort_index().plot.bar()

In [None]:
df_testing = (spark
              .read
              .options(header = False, inferSchema = True)
              .csv("data/mnist_test.csv"))
testing = (vectorizer
           .transform(df_testing)
           .select("_c0", "features")
           .toDF("label", "features")
           .cache())

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression(featuresCol="features", 
                        labelCol="label", 
                        regParam=0.1, 
                        elasticNetParam=0.1, 
                        maxIter=10000)

In [None]:
lr_model = lr.fit(training)

In [None]:
from pyspark.sql.functions import *

In [None]:
test_pred = lr_model.transform(testing).withColumn("matched", expr("label == prediction"))
test_pred.show()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                               predictionCol="prediction", 
                                               metricName="accuracy")

In [None]:
evaluator.evaluate(test_pred)

In [None]:
(test_pred
 .withColumn("matched", expr("cast(matched as int)"))
 .groupby("label")
 .agg(avg("matched"))
 .orderBy("label")
 .show())

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [None]:
layers = [784, 100, 20, 10]
perceptron = MultilayerPerceptronClassifier(maxIter=1000, layers=layers, blockSize=128, seed=1234)
perceptron_model = perceptron.fit(training)

In [None]:
from time import time

In [None]:
start_time = time()
perceptron_model = perceptron.fit(training)
test_pred = perceptron_model.transform(testing)
print("Accuracy:", evaluator.evaluate(test_pred))
print("Time taken: %d" % (time() - start_time))