#### A Tutorial on Spark in Python 
You might already know Apache Spark as a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. It’s well-known for its speed, ease of use, generality and the ability to run virtually everywhere. And even though Spark is one of the most asked tools for data engineers, also data scientists can benefit from Spark when doing exploratory data analysis, feature extraction, supervised learning and model evaluation.

#### Welcome to Apache Spark with Python
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
[http://spark.apache.org](http://spark.apache.org)

In this notebook, we'll train two classifiers to predict survivors in the Titanic dataset. We'll use this classic machine learning problem as a brief introduction to using Apache Spark local mode in a notebook.

In [3]:
import pyspark  
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.tree import DecisionTree

First we create a **SparkContext**, the main object in the Spark API. This call may take a few seconds to return as it fires up a JVM under the covers.

In [4]:
sc = pyspark.SparkContext()

#### Sample the data
We point the context at a CSV file on disk. The result is a RDD, not the content of the file. This is a  <span style="color:red">Spark transformation</span> .

In [8]:
raw_rdd = sc.textFile("/Users/nanaakwasiabayieboateng/Documents/memphisclassesbooks/DataMiningscience/Capitalone/green_tripdata_2016-09.csv")

We query RDD for the number of lines in the file. The call here causes the file to be read and the result computed. This is a Spark action.

In [6]:
raw_rdd.count()

1162375

We query for the first five rows of the RDD. Even though the data is small, we shouldn't get into the habit of pulling the entire dataset into the notebook. Many datasets that we might want to work with using Spark will be much too large to fit in memory of a single machine.

In [9]:
raw_rdd.take(5)

['VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type',
 '',
 '2,2016-09-01 00:58:21,2016-09-01 01:11:46,N,1,92,82,1,3.34,12.5,0.5,0.5,1,0,,0.3,14.8,1,1,,',
 '2,2016-09-01 00:49:50,2016-09-01 01:05:57,N,1,83,92,2,3.78,14.5,0.5,0.5,0,0,,0.3,15.8,2,1,,',
 '2,2016-09-01 00:06:58,2016-09-01 00:15:13,N,1,93,223,1,4.84,15,0.5,0.5,0,0,,0.3,16.3,2,1,,']

We see a header row followed by a set of data rows. We filter out the header to define a new RDD containing only the data rows.

**get the heading**

In [11]:
header = raw_rdd.first()
header

'VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type'

In [12]:
data_rdd = raw_rdd.filter(lambda line: line != header)
data_rdd

PythonRDD[8] at RDD at PythonRDD.scala:48

We take a random sample of the data rows to better understand the possible values.

In [13]:
data_rdd.takeSample(False, 5, 0)

['2,2016-09-19 06:39:52,2016-09-19 06:53:11,N,1,166,247,1,4.83,16,0,0.5,0,0,,0.3,16.8,2,1,,',
 '2,2016-09-01 08:36:59,2016-09-01 09:02:49,N,1,42,230,1,4.84,19.5,0,0.5,0,0,,0.3,20.3,1,1,,',
 '1,2016-09-27 22:28:00,2016-09-27 22:40:15,N,1,196,28,1,3.00,12,0.5,0.5,2.65,0,,0.3,15.95,1,1,,',
 '1,2016-09-23 20:55:58,2016-09-23 20:56:07,N,1,145,145,1,.00,2.5,0.5,0.5,0,0,,0.3,3.8,3,1,,',
 '2,2016-09-11 00:54:55,2016-09-11 01:28:47,N,1,247,85,1,16.76,47,0.5,0.5,0,5.54,,0.3,55.79,1,1,,']

#### Create labeled points (i.e., feature vectors and ground truth)
Now we define a function to turn the passenger attributions into structured LabeledPoint objects.

In [26]:
data_rdd.countByValue()

defaultdict(int,
            {'': 1,
             '2,2016-09-24 23:26:44,2016-09-24 23:36:06,N,1,66,40,2,2.76,10.5,0.5,0.5,2.95,0,,0.3,14.75,1,1,,': 1,
             '2,2016-09-18 19:43:29,2016-09-18 20:04:22,N,1,255,25,1,3.87,16,0,0.5,5.04,0,,0.3,21.84,1,1,,': 1,
             '2,2016-09-14 00:38:52,2016-09-14 00:41:12,N,1,35,35,1,.22,3.5,0.5,0.5,0,0,,0.3,4.8,2,1,,': 1,
             '2,2016-09-24 15:06:51,2016-09-24 15:34:38,N,1,66,112,1,5.53,21.5,0,0.5,0,0,,0.3,22.3,2,1,,': 1,
             '2,2016-09-11 04:03:39,2016-09-11 04:07:47,N,1,173,57,1,.47,4.5,0.5,0.5,0,0,,0.3,5.8,2,1,,': 1,
             '2,2016-09-16 01:01:53,2016-09-16 01:20:36,N,1,112,61,5,6.66,21,0.5,0.5,2,0,,0.3,24.3,1,1,,': 1,
             '2,2016-09-26 20:07:43,2016-09-26 20:14:56,N,1,83,129,1,2.35,9,0.5,0.5,2.58,0,,0.3,12.88,1,1,,': 1,
             '1,2016-09-24 15:45:46,2016-09-24 16:21:40,N,1,243,43,2,7.80,29,0,0.5,3,0,,0.3,32.8,1,1,,': 1,
             '2,2016-09-17 10:40:28,2016-09-17 11:11:02,N,1,243,112,1,11.62,35

#### Split for training and test
We split the transformed data into a training (70%) and test set (30%), and print the total number of items in each segment.

In [14]:
training_rdd, test_rdd = data_rdd.randomSplit([0.7, 0.3], seed = 0)

In [15]:
training_count = training_rdd.count()
test_count = test_rdd.count()

In [16]:
training_count, test_count

(813350, 349024)

In [17]:
data_rdd.toPandas() 

AttributeError: 'PipelinedRDD' object has no attribute 'toPandas'

In [18]:
import findspark
findspark.init()

ImportError: No module named 'findspark'

In [22]:
101/102,66/67,88/89

(0.9901960784313726, 0.9850746268656716, 0.9887640449438202)

In [None]:
99/100

#### Intro
Pipeline concept is definitely not new for software world, Unix pipe operator (|) links two tasks putting the output of one as the input of the other. In machine learning solutions it is pretty much usual to apply several transformation and manipulation to datasets, or to different portions or sample of the same dataset (from classic test/train slices to samples taken for cross-validation procedure). In these cases, pipelines are definitely useful to define a sequence of tasks chained together to define a complete process, which in turn simplifies the operation of the ml solution. In addition, in BigData environment, it is possible to apply the “laziness” of execution to the entire process in order to make it more scalable and robust, therefore no surprise to see pipeline implemented in Spark machine learning library and R API available, by SparklyR package, to leverage the construct. Pipeline component in Spark are basically of two types :

**transformer:** since dataframe usually need to undergo several kinds of changes column-wide, row-wide or even single value-wide, transformers are the component meant to deliver these transformations. Typically a transformer has a table or dataframe as input and delivers a table/dataframe as output. Sparks, through MLlib, provide a set of feature’s transformers meant to address most common transformations needed;
estimator: estimator is the component which delivers a model, fitting an algorithm to train data. Indeed fit() is the key method for an estimator and produces, as said a model which is a transformer. Leveraging the parallel processing which is provided by Spark, it is possible to run several estimators in parallel on different training dataset in order to find the best solution (or even to avoid overfitting issue). ML algorithms are basically a set of **Estimators**, they build a rich set of machine learning (ML) common algorithms, available from MLlib. This is a library of algorithms meant to be scalable and run in a parallel environment. Starting from the 2.0 release of Spark, the RDD-based library is in maintenance mode (the RDD-based APIs are expected to be removed in 3.0 release) whereas the mainstream development is focused on supporting dataframes. In MLlib features are to be expressed with labeledpoints, which means numeric vectors for features and predictors.Pipelines of transformers are, even for this reason, extremely useful to operationalize an ML solutions Spark-based. For additional details on MLlib refer to Apache Spark documentation
In this post we’ll see a simple example of pipeline usage and we’ll see two version of the same example: the first one using Scala (which is a kind of “native” language for Spark environment), afterward we’ll see how to implement the same example in R, leveraging SaprklyR package in order to show how powerful and complete it is.

#### The dataset
For this example, the dataset comes from UCI – Machine Learning Repository Irvine, CA: University of California, School of Information and Computer Science. “Adults Income” dataset reports individual’s annual income results from various factors. The annual income will be our label (it is divided into two classes: <=50K and >50K) and there are 14 features, for each individual, we can leverage to explore the possibility in predicting income level. For additional detail on “adults dataset” refer to the UCI machine learning repository http://www.cs.toronto.edu/~delve/data/adult/desc.html.

#### Scala code
As said, we’ll show how we can use scala API to access pipeline in MLlib, therefore we need to include references to classes we’re planning to use in our example and to start a spark session :

In [None]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.SparkSession

then we’ll read dataset and will start to manipulate data in order to prepare for the pipeline. In our example, we’ll get data out of local repository (instead of referring to an eg. HDFS or Datalake repository, there are API – for both scala and R – which allows the access to these repositories as well). We’ll leverage this upload activity also to rename some columns, in particular, we’ll rename the “income” column as “label” since we’ll use this a label column in our logistic regression algorithm.

In [None]:
//load data source from local repository
val csv = spark.read.option("inferSchema","true")
  .option("header", "true").csv("...\yyyy\xxxx\adult.csv")

val data_start = csv.select(($"workclass"),($"gender"),($"education"),($"age"),
($"marital-status").alias("marital"), ($"occupation"),($"relationship"), 
($"race"),($"hours-per-week").alias("hr_per_week"), ($"native-country").alias("country"),
($"income").alias("label"),($"capital-gain").alias("capitalgain"),
($"capital-loss").alias("capitalloss"),($"educational-num").alias("eduyears")).toDF

We’ll do some data clean up basically recoding the “working class” and “marital” columns, in order to reduce the number of codes and we’ll get rid of rows for which “occupation”, “working class”” (even recoded) and “capital gain” are not available. For first two column the dataset has the “?” value instead of “NA”, for capital gain there’s the 99999 value which will be filtered out. To recode “working class” and “marital” columns we’ll use UDF functions which in turn are wrappers of the actual recoding functions. To add a new column to the (new) dataframe we’ll use the “withColumn” method which will add “new_marital” and “new_workclass” to the startingdata dataframe. Afterwards, we’ll filter out all missing values and we’ll be ready to build the pipeline.



In [None]:
// recoding marital status and working class, adding a new column 
def newcol_marital(str1:String): String = { 
    var nw_str = "noVal"
     if ((str1 == "Married-spouse-absent") | (str1 =="Married-AF-spouse") 
         | (str1 == "Married-civ-spouse")) {nw_str = "Married" } 
        else if ((str1 == "Divorced") | (str1 == "Never-married" ) 
                 | (str1 == "Separated" ) | (str1 == "Widowed")) 
          {nw_str = "Nonmarried" } 
        else { nw_str = str1}
    return nw_str
}
val udfnewcol = udf(newcol_marital _)
val recodeddata = data_start.withColumn("new_marital", udfnewcol('marital'))

def newcol_wkclass(str1:String): String = { 
    var nw_str = "noVal"
     if ((str1 == "Local-gov") | (str1 =="Federal-gov") | (str1 == "State-gov")) 
        {nw_str = "Gov" } 
    else if ((str1 == "Self-emp-not-inc") | (str1 == "Self-emp-inc" )) 
        {nw_str = "Selfemployed" } 
    else if ((str1 == "Never-worked") | (str1 == "Without-pay")) 
        {nw_str = "Notemployed" } 
    else { nw_str = str1}
    return nw_str
}

val udfnewcol = udf(newcol_wkclass _)
val startingdata = recodeddata.withColumn("new_workclass", udfnewcol('workclass'))

// remove missing data
val df_work01 = startingdata.na.drop("any")
val df_work = startingdata.filter("occupation <> '?' 
                                and capitalgain < 99999 
                                and new_workclass <> '?' 
                                and country <> '?' ")


In our example, we’re going to use 12 features, 7 are categorical variables and 5 are numeric variables. The feature’s array we’ll use to fit the model will be the results of merging two arrays, one for categorical variables and the second one for numeric variables. Before building the categorical variables array, we need to transform categories to indexes using transformers provided by spark.ml, even the label must be transformed to an index. Our pipeline then will include 7 pipeline stages to transform categorical variables, 1 stage to transform the label, 2 stages to build categorical and numeric arrays, and the final stage to fit the logistic model. Finally, we’ll build an 11-stages pipeline.
To transform categorical variables into index we’ll use “Stringindexer” Transformer. StringIndexer encodes a vector of string to a column of non-negative indices, ranging from 0 to the number of values. The indices ordered by label frequencies, so the most frequent value gets index 0. For each variable, we need to define the input column and the output column which we’ll use as input for other transformer or evaluators. Finally it is possible to define the strategy to handle unseen labels (possible when you use the Stringindexer to fit a model and run the fitted model against a test dataframe) through setHandleInvalid method , in our case we simply put “skip” to tell Stringindexer we want to skip unseen labels (additional details are available in MLlib documentation).



In [None]:
// define stages
val new_workclassIdx = new StringIndexer().setInputCol("new_workclass")
.setOutputCol("new_workclassIdx").setHandleInvalid("skip")

val genderIdx = new StringIndexer().setInputCol("gender")
.setOutputCol("genderIdx").setHandleInvalid("skip")

val maritalIdx = new StringIndexer().setInputCol("new_marital")
.setOutputCol("maritalIdx").setHandleInvalid("skip")

val occupationIdx = new StringIndexer().setInputCol("occupation")
.setOutputCol("occupationIdx").setHandleInvalid("skip")

val relationshipIdx = new StringIndexer().setInputCol("relationship")
.setOutputCol("relationshipIdx").setHandleInvalid("skip")

val raceIdx = new StringIndexer().setInputCol("race")
.setOutputCol("raceIdx").setHandleInvalid("skip")

val countryIdx = new StringIndexer().setInputCol("country")
.setOutputCol("countryIdx").setHandleInvalid("skip")

val labelIdx = new StringIndexer().setInputCol("label")
.setOutputCol("labelIdx").setHandleInvalid("skip")

In addition to Transfomer and Estimator provided by spark.ml package, it is possible to define custom Estimator and Transformers. As an example we’ll see how to define a custom transformer aimed at recoding “marital status” in our dataset (basically we’ll do the same task we have already seen, implementing it with a custom transformer; for additional details on implementing customer estimator and transformer see the nice article by H.Karau. To define a custom transformer, we’ll define a new scala class, columnRecoder, which extends the Transformer class, we’ll override the transformSchemamethod to map the correct type of the new column we’re going to add with the transformation and we’ll implement the transform method which actually does the recoding in our dataset. A possible implementation is :

In [None]:
import org.apache.spark.ml.Transformer
class columnRecoder(override val uid: String) extends Transformer {
  final val inputCol= new Param[String](this, "inputCol", "input column")
  final val outputCol = new Param[String](this, "outputCol", "output column")

def setInputCol(value: String): this.type = set(inputCol, value)

def setOutputCol(value: String): this.type = set(outputCol, value)

def this() = this(Identifiable.randomUID("columnRecoder"))

def copy(existingParam: ParamMap): columnRecoder = {defaultCopy(existingParam)}
override def transformSchema(schema: StructType): StructType = {
    // Check inputCol type
    val idx = schema.fieldIndex($(inputCol))
    val field = schema.fields(idx)
    if (field.dataType != StringType) {
      throw new Exception(s"Input type ${field.dataType} type mismatch: String expected")
    }
    // The return field
    schema.add(StructField($(outputCol),StringType, false))
  }

val newcol_recode = new marital_code()

private def newcol_recode(str1: String): String = { 
    var nw_str = "noVal"
     if ((str1 == "Married-spouse-absent") | (str1 =="Married-AF-spouse") 
        | (str1 == "Married-civ-spouse")) 
       {nw_str = "Married" } 
        else if ((str1 == "Divorced") | (str1 == "Never-married" ) 
        | (str1 == "Separated" ) | (str1 == "Widowed")) 
          {nw_str = "Nonmarried" } 
        else { nw_str = str1}
    nw_str
  }
  
private def udfnewcol =  udf(newcol_recode.recode(_))
  
def transform(df: Dataset[_]): DataFrame = { 
  df.withColumn($(outputCol), udfnewcol(df($(inputCol)))) 
  }
}

Once defined as a transformer, we can use it in our pipeline as the first stage.



In [None]:
// define stages
val new_marital = new columnRecoder().setInputCol("marital")
.setOutputCol("new_marital")

val new_workclassIdx = new StringIndexer().setInputCol("new_workclass")
.setOutputCol("new_workclassIdx").setHandleInvalid("skip")

val genderIdx = new StringIndexer().setInputCol("gender")
.setOutputCol("genderIdx").setHandleInvalid("skip")

val maritalIdx = new StringIndexer().setInputCol("new_marital")
.setOutputCol("maritalIdx").setHandleInvalid("skip")

.......


A second step in building our pipeline is to assemble categorical indexes in a single vector, therefore many categorical indexes are put all together in a single vector through the VectorAssemblertransformer. This VectorAssembler will deliver a single column feature which will be, in turn, transformed to indexes by VectorIndexer transformer to deliver indexes within the “catFeatures” column:

In [None]:
// cat vector for categorical variables
val catVect = new VectorAssembler()
                  .setInputCols(Array("new_workclassIdx", "genderIdx", "catVect","maritalIdx", "occupationIdx","relationshipIdx","raceIdx","countryIdx"))
                  .setOutputCol("cat01Features")

val catIdx = new VectorIndexer()
                  .setInputCol(catVect.getOutputCol)
                  .setOutputCol("catFeatures")

For numeric variables we need just to assemble columns with VectorAssembler, then we’re ready to put these two vectors (one for categorical variables, the other for numeric variables) together in a single vector.

In [None]:
// numeric vector for numeric variable
val numVect = new VectorAssembler()
                  .setInputCols(Array("age","hr_per_week","capitalgain","capitalloss","eduyears"))
                  .setOutputCol("numFeatures")

val featVect = new VectorAssembler()
                    .setInputCols(Array("catFeatures", "numFeatures"))
                    .setOutputCol("features")

We have now label and features ready to build the logistic regression model which is the final component of our pipeline. We can also set some parameters for the model, in particular, we can define the threshold (by default set to 0.5) to make the decision between label values, as well as the max number of iterations for this algorithm and a parameter to tune regularization.
When all stages of the pipeline are ready, we just need to define the pipeline component itself, passing as an input parameter an array with all defined stages:

In [None]:
val lr = new LogisticRegression().setLabelCol("labelIdx").setFeaturesCol("features")
  .setThreshold(0.33).setMaxIter(10).setRegParam(0.2)

val pipeline = new Pipeline().setStages(Array(new_marital,new_workclassIdx, labelIdx,maritalIdx,occupationIdx, relationshipIdx,raceIdx,genderIdx, countryIdx,catVect, catIdx, numVect,featVect,lr))

Now the pipeline component, which encompasses a number of transformations as well as the classification algorithm, is ready; to actually use it we supply a train dataset to fit the model and then a test dataset to evaluate our fitted model. Since we have defined a pipeline, we’ll be sure that both, train and test datasets, will undergo the same transformations, therefore, we don’t have to replicate the process twice.
We need now to define train and test datasets.In our dataset, label values are unbalanced being the “more than 50k USD per year” value around the 25% of the total, in order to preserve the same proportion between label values we’ll subset the original dataset based on label value, obtaining a low-income dataset and an high-income dataset. We’ll split both dataset for train (70%) and test (30%), then we’ll merge back the two “train”” and the two “test” datasets and we’ll use resulting “train” dataset as input for our pipeline:

In [None]:
// split betwen train and test
val df_low_income = df_work.filter("label == '<=50K'")
val df_high_income = df_work.filter("label == '>50K'")

val splits_LI = df_low_income.randomSplit(Array(0.7, 0.3), seed=123)
val splits_HI = df_high_income.randomSplit(Array(0.7, 0.3), seed=123)

val df_work_train = splits_LI(0).unionAll(splits_HI(0))
val df_work_test = splits_LI(1).unionAll(splits_HI(1))

// fitting the pipeline
val data_model = pipeline.fit(df_work_train)


Once the pipeline is trained, we can use the data_model for testing against the test dataset, calculate the confusion matrix and evaluate the classifier metrics :

In [None]:
// generate prediction
val data_prediction = data_model.transform(df_work_test)
val data_predicted = data_prediction.select("features", "prediction", "label","labelIdx")

// confusion matrix
val tp = data_predicted.filter("prediction == 1 AND labelIdx == 1").count().toFloat
val fp = data_predicted.filter("prediction == 1 AND labelIdx == 0").count().toFloat
val tn = data_predicted.filter("prediction == 0 AND labelIdx == 0").count().toFloat
val fn = data_predicted.filter("prediction == 0 AND labelIdx == 1").count().toFloat
val metrics = spark.createDataFrame(Seq(
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn)))).toDF("metric", "value")
metrics.show()