# Spark Machine Learning Pipeline

This coursework is about implementing and applying Spark Machine Learning Pipelines, and evaluating them with respect to preprocessing, parametrisation, and scaling.

## 1. Data set initial analysis and summary of pipeline task. (20%)

### 1.1. Summary of machine learning pipeline
Step 1.  
Step 2.  
Step 3.  
Step 4.  


### 1.2. Loading data to RDD

In [None]:
# import dependencies for creating a data frame
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import csv


# Create SparkSession 
spark = SparkSession.builder.getOrCreate() 




# create RDD from csv files
trainRDD = spark.read.csv("hdfs://saltdean/data/data/santander-products/train_ver2.csv", 
                          header=True, mode="DROPMALFORMED", schema=schema)






# alternatively...
# create RDD from csv files
trainRDD = sc.textFile("hdfs://saltdean/data/data/santander-products/train_ver2.csv")
trainRDD = trainRDD.mapPartitions(lambda x: csv.reader(x))





# alternatively... from https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
# create RDD from csv files
lines = sc.textFile("hdfs://saltdean/data/data/santander-products/train_ver2.csv")
elements = lines.map(lambda l: l.split(","))

# Each line is converted to a tuple.
clients = elements.map(lambda p: (p[0], p[1].strip(),p[2],...))

# The schema is encoded in a string.
schemaString = "name age ..."
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD and register the DataFrame to be used with Spark SQL.
trainRDD = spark.createDataFrame(clients, schema)
trainRDD.createOrReplaceTempView('trainingset')






# alternatively, as seen in tutorial 8:
lines = sc.textFile("hdfs://saltdean/data/data/santander-products/train_ver2.csv")
parts = lines.map(lambda l: l.split(","))
trainRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))

# Create DataFrame and register it to be used with Spark SQL.
trainData = spark.createDataFrame(trainRDD)
trainData.createOrReplaceTempView('Clients')

# For testing
print(trainData.describe()) # columns info
print(trainData.count()) # number of instances


### 1.3. Descriptive Statistics

In [None]:
# Code modified from 
# https://www.kaggle.com/apryor6/santander-product-recommendation/detailed-cleaning-visualization-python/notebook









## 2. Implementation of machine learning pipeline. (25%)
Implement a machine learning pipeline in Spark, including feature extractors, transformers, and/or selectors. Test that your pipeline it is correctly implemented and explain your choice of processing steps, learning algorithms, and parameter settings.

In [None]:
# code modified from Spark documentation at:
# https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#random-forest-classifier
# and DataBricks at:
# https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

# imports dependencies for Random Forest pipeline
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, OneHotEncoder, StringIndexer, VectorAssembler


# stages in the Pipeline
stages = []

# One-Hot Encoding
categoricalColumns = ["a", "b", "c", "d", "e", "f", "g", "j"]
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index") # Category Indexing with StringIndexer
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec") # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    stages += [stringIndexer, encoder]  # Add stages to the pipeline
    
# Convert labels into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "add here target column in csv file", outputCol = "labels")
stages += [label_stringIdx]  # Add stage to the pipeline

# Transform all features into a vector using VectorAssembler
numericCols = ["m", "n", "o", "p", "q", "r"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]  # Add stage to the pipeline

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="labels", 
                            featuresCol="features", 
                            numTrees=100,                 #  Number of trees in the random forest
                            impurity='entropy',            # Criterion used for information gain calculation
                            featureSubsetStrategy="auto",
                            predictionCol="prediction"
                            maxDepth=5, 
                            maxBins=32, 
                            minInstancesPerNode=1, 
                            minInfoGain=0.0, 
                            subsamplingRate=1.0)
stages += [rf]  # Add stage to the pipeline

# Machine Learning Pipeline
pipeline = Pipeline(stages=stages)


## 3. Evaluation and test of model. (20%)
Evaluate the performance of your pipeline using training and test set (don’t use CV but pyspark.ml.tuning.TrainValidationSplit).

### 3.1. Evaluate performance of machine learning pipeline on training data and test data.

In [None]:
# imports dependencies
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Split data into training set and testing set
[trainData, testData] = trainData.randomSplit([0.8, 0.2], seed = 100)

# Train model in pipeline
rfModel = pipeline.fit(trainData)

# Make predictions for training set and compute training set accuracy
predictions = rfModel.transform(trainData)
evaluator = MulticlassClassificationEvaluator(labelCol="labels", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print(train_pipeline.stages[0])  # summary


# Run the feature transformations pipeline on the test data set
pipelineModel = prePro_pipeline.fit(testClients)  #  computes feature statistics
testData = pipelineModel.transform(testClients)  #  transforms the features

# Make predictions for test set and compute test error
test_predictions = rfModel.transform(testData)
test_accuracy = evaluator.evaluate(test_predictions)
print("Test Error = %g" % (1.0 - test_accuracy))


## 4. Model fine-tuning. (35%) 
Implement a parameter grid (using pyspark.ml.tuning.ParamGridBuilder[source]), varying at least one feature preprocessing step, one machine learning parameter, and the training set size. Document the training and test performance and the time taken for training and testing. Comment on your findings.

### 4.1. Training set size evaluation

In [None]:
print('Training set size evaluation')

# size of different training set to be evaluated, and split of training set
sizes = [0.5, 0.1, 0.05, 0.01, 0.001]
data = trainData.randomSplit(sizes, seed = 100)

print('\n=== training set of size 100%')
# Train model in pipeline
tempModel = pipeline.fit(trainData)
# Make predictions for training set and compute training set accuracy
tempPredictions = tempModel.transform(trainData)
tempAccuracy = evaluator.evaluate(tempPredictions)
print("Classification Error = %g" % (1.0 - tempAccuracy))

for x in data:
    print('\n=== training set of size reduced to %g' % x)
    # Train model in pipeline
    tempModel = pipeline.fit(data[x])
    # Make predictions for training set and compute training set accuracy
    tempPredictions = tempModel.transform(data[x])
    tempAccuracy = evaluator.evaluate(tempPredictions)
    print("Classification Error = %g" % (1.0 - tempAccuracy))
    

### 4.2. Machine Learning Model Hyperparameter search

In [None]:
# Define hyperparameters and their values to search and evaluate
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10,20,50,100,200,500,1000,5000]) \
    .addGrid(rf.minInstancesPerNode, [0,1,2,4,6,8,10]) \
    .addGrid(rf.maxDepth, [2,5,10,20,50]).build()

# Grid Search and Cross Validation
crossVal = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator)
print('starting Hyperparameter Grid Search with cross-validation')
rfCrosVal = crossVal.fit(trainData)
print('Grid Search has finished')

print(rfCrosVal.bestModel.rank)
paramMap = list(zip(rfCrosVal.getEstimatorParamMaps(),rfCrosVal.avgMetrics))
paramMax = max(paramMap, key=lambda x: x[1])
print(paramMax)

# Evaluate the model with test data
cvtest_predictions = rfCrosVal.transform(testData)
cvtest_accuracy = evaluator.evaluate(cvtest_predictions)
print("Test Error = %g" % (1.0 - cvtest_accuracy))
