# PySpark Notebook Walkthrough

### Before running be sure to have cluster and Spark history server started:
#### start-dfs.sh && start-yarn.sh && $SPARK_HOME/sbin/start-history-server.sh

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

### Create new SparkSession for this app and give it a name

In [3]:
spark = SparkSession.builder.appName('PySpark-100Node-ANN5').getOrCreate()
sc = spark.sparkContext

### Create SQLContext within current SparkSession to read in and work with data

In [4]:
sql_c = SQLContext(sc)

### Read data in from the DFS, put the data in the HDFS using the below command.
hdfs dfs -put data.csv

In [5]:
path = 'data.csv'

In [6]:
df = sql_c.read.csv(path, header=True)

In [7]:
# Display first 5 dataframe entries
df.show(5)

+--------------------+----------------+----------------+--------+--------+--------+--------+--------+-------------+-----+
|               objid|              ra|             dec|       u|       g|       r|       i|       z|     redshift|class|
+--------------------+----------------+----------------+--------+--------+--------+--------+--------+-------------+-----+
|1.23766870461946E...|262.675447007966|7.03373899198224|19.19479|16.72763| 15.6987|15.32512|15.11311| 0.0001174448| STAR|
|1.23766870515647E...|263.155622796506| 7.2609959506419|17.21147|16.16533|15.79078|15.62873|15.58098| 7.529317E-05| STAR|
|1.23766870515627E...|262.749186164557|7.36335372355878|17.57057|16.54758|16.14557|15.96953|15.89596|-0.0008980818| STAR|
|1.23766857147502E...|262.221092584887|6.94796600919322|22.68772|20.27666|19.59911|19.29291|19.32655|-0.0001748509| STAR|
|1.2376687051566E+018|263.367549608197|7.02018938642791|19.29753|18.01646|17.43456|17.21292|17.05329|-0.0002282125| STAR|
+--------------------+--

In [8]:
df.printSchema()

root
 |-- objid: string (nullable = true)
 |-- ra: string (nullable = true)
 |-- dec: string (nullable = true)
 |-- u: string (nullable = true)
 |-- g: string (nullable = true)
 |-- r: string (nullable = true)
 |-- i: string (nullable = true)
 |-- z: string (nullable = true)
 |-- redshift: string (nullable = true)
 |-- class: string (nullable = true)



### The above schema shows that the current columns stored in the dataframe are, by default, string data types.
### We fix this issue here.

In [9]:
from pyspark.sql.types import *

In [10]:
# Function to convert each dataframe column to the correct type
def convertColumns(df, names, newTypes):
    for name, newType in zip(names, newTypes): 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

In [11]:
names = ['objid', 'ra', 'dec', 'u', 'g', 'r', 'i', 'z', 'redshift', 'class']
types = [FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), 
             FloatType(), FloatType(), FloatType(), StringType()]

In [12]:
df = convertColumns(df, names, types)

In [13]:
df.printSchema()

root
 |-- objid: float (nullable = true)
 |-- ra: float (nullable = true)
 |-- dec: float (nullable = true)
 |-- u: float (nullable = true)
 |-- g: float (nullable = true)
 |-- r: float (nullable = true)
 |-- i: float (nullable = true)
 |-- z: float (nullable = true)
 |-- redshift: float (nullable = true)
 |-- class: string (nullable = true)



### Now each feature has its proper type. Next, we drop the logistical features.

In [14]:
df = df.drop(*['objid', 'ra', 'dec'])
df.show(5)

+--------+--------+--------+--------+--------+------------+-----+
|       u|       g|       r|       i|       z|    redshift|class|
+--------+--------+--------+--------+--------+------------+-----+
|19.19479|16.72763| 15.6987|15.32512|15.11311| 1.174448E-4| STAR|
|17.21147|16.16533|15.79078|15.62873|15.58098| 7.529317E-5| STAR|
|17.57057|16.54758|16.14557|15.96953|15.89596|-8.980818E-4| STAR|
|22.68772|20.27666|19.59911|19.29291|19.32655|-1.748509E-4| STAR|
|19.29753|18.01646|17.43456|17.21292|17.05329|-2.282125E-4| STAR|
+--------+--------+--------+--------+--------+------------+-----+
only showing top 5 rows



### All that remains now are the numerical features and the corresponding class.
### The next step is to condense the input dataframe into a dataframe with two entries: A 'label' feature and a DenseVector 'features' containing the combined numerical data. PySpark uses dataframes with this format for the training of ML models.

In [15]:
from pyspark.ml.linalg import DenseVector

In [16]:
input_data = df.rdd.map(lambda x: (x[6], DenseVector(x[:6])))

In [17]:
df = spark.createDataFrame(input_data, ["label", "features"])

In [18]:
# New df has 2 columns
df.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
| STAR|[19.1947898864746...|
| STAR|[17.2114696502685...|
| STAR|[17.5705699920654...|
| STAR|[22.6877193450927...|
| STAR|[19.2975292205810...|
+-----+--------------------+
only showing top 5 rows



In [19]:
# 'features' column contains all of the previously separate numeric data
df.take(1)

[Row(label='STAR', features=DenseVector([19.1948, 16.7276, 15.6987, 15.3251, 15.1131, 0.0001]))]

In [20]:
from pyspark.sql.functions import col

In [21]:
# Show label counts by class
df.groupBy("label") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+------+------+
| label| count|
+------+------+
|GALAXY|300045|
|  STAR|124392|
|   QSO| 75563|
+------+------+



### The current data must now be transformed further to be used to train a ML model
### First, we need to encode the label classes to be numeric. For this we use the StringIndexer function.

In [22]:
from pyspark.ml.feature import StringIndexer

In [23]:
# Using the label column, create a new
stringIndexer = StringIndexer(inputCol="label", outputCol="label_new")
indexer = stringIndexer.fit(df)
scaled_df = indexer.transform(df)

### Second, we standardize the numeric features by subtracting the mean and scale to unit variance each feature by using the StandardScaler function.

In [24]:
from pyspark.ml.feature import StandardScaler

In [25]:
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(scaled_df)
scaled_df = scaler.transform(scaled_df)

In [26]:
# Checkout new dataframe structure
scaled_df.take(1)

[Row(label='STAR', features=DenseVector([19.1948, 16.7276, 15.6987, 15.3251, 15.1131, 0.0001]), label_new=1.0, features_scaled=DenseVector([0.3105, 0.2781, 0.296, 0.2792, 0.2666, 0.0002]))]

### Since we now have the newly processed features, we drop the original features and rename the new.

In [27]:
scaled_df = scaled_df.drop('features')
scaled_df = scaled_df.drop('label')
scaled_df = scaled_df.withColumnRenamed("label_new", "label")
scaled_df = scaled_df.withColumnRenamed("features_scaled", "features")

In [28]:
# View the final processed dataframe
scaled_df.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|[0.31051706979631...|
|  1.0|[0.27843259313067...|
|  1.0|[0.28424181462031...|
|  1.0|[0.36702272716012...|
|  1.0|[0.31217909981426...|
+-----+--------------------+
only showing top 5 rows



In [29]:
# The new numeric labels correspond to the same frequencies above, as expected
scaled_df.groupBy("label") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+-----+------+
|label| count|
+-----+------+
|  0.0|300045|
|  1.0|124392|
|  2.0| 75563|
+-----+------+



# Model Building

In [30]:
import time

## 100 Epoch Multilayer Perceptron

In [31]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [32]:
train, test = scaled_df.randomSplit([.8,.2], seed=101)

In [33]:
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 400117
Test Dataset Count: 99883


In [34]:
# specify layers for the neural network:
# input layer of size 6 (features), three hidden layers of size 64, 64, 32
# and output of size 3 (# classes)
layers = [6, 64, 64, 32, 3]

In [35]:
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=101)

In [36]:
# Time model training time
start = time.time()
model = trainer.fit(train)
duration = time.time() - start

In [37]:
print('MLP training time: {:.2f}s'.format(duration))

MLP training time: 1133.23s


In [38]:
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
test_acc = evaluator.evaluate(predictionAndLabels)

In [39]:
print('MLP Test Accuracy: {:.3f}'.format(test_acc))

MLP Test Accuracy: 0.968


### Once finished, end the current context.

In [40]:
sc.stop()