# Spark Pipeline

## Setup

In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

sc = pyspark.SparkContext()
spark = SparkSession.builder.appName('example').getOrCreate()

## Libraries

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
import numpy as np

## Imports

In [2]:
df = spark.read.csv('data.csv', header = True)
df.show(5)
df.columns

## Pre-processing

Spark needs the target column to be called label.

In [43]:
df = df.withColumn("label", df["target"].cast('integer'))

In [44]:
scf_indexer = StringIndexer(inputCol = "some_cat_feature", outputCol = "some_cat_feature_index")
scf_encoder = OneHotEncoder(inputCol = "some_cat_feature_index", outputCol = "some_cat_feature_fact")

feature_cols = ["some list of column names"]

vec_assembler = VectorAssembler(inputCols = feature_cols, 
                                outputCol = "features")

pipe = Pipeline(stages = [scf_indexer, scf_encoder, vec_assembler])

In [50]:
piped_data = pipe.fit(df).transform(df)
training, test = piped_data.randomSplit([.8, .2])

## Modeling

In [1]:
%%time

clf_lr = LogisticRegression()
evaluator = evals.BinaryClassificationEvaluator(metricName = "areaUnderROC")

grid = tune.ParamGridBuilder()
grid = grid.addGrid(clf_lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(clf_lr.elasticNetParam, [0, 1])
grid = grid.build()

clf_lr_cv = tune.CrossValidator(
    estimator = clf_lr,
    estimatorParamMaps = grid,
    evaluator = evaluator,
    numFolds = 5
               )

best_clf_lr = clf_lr_cv.fit(training).bestModel
results = best_clf_lr.transform(training)
print(evaluator.evaluate(results))

## Stop

In [5]:
sc.stop()