Introduction to Machine Learning Pipeline Workshop
=============

We can start the iPython notebook by running the ./start.sh script or 

PYSPARK_DRIVER_PYTHON="ipython" PYSPARK_DRIVER_PYTHON_OPTS="notebook" ${SPARK_HOME}/bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0

or pre-2.0

IPYTHON_OPTS="notebook" ${SPARK_HOME}/spark/bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0

From here we can see that we already have a SparkContext & SQLContext ready to go:


In [1]:
sc

<pyspark.context.SparkContext at 0x7fcf931f7310>

In [2]:
sqlContext

<pyspark.sql.context.SQLContext at 0x7fcf90ccb350>

Now we start by downloading loading some data which is in csv format so its a good thing we got that csv package included already for us.

Note: the data is a modified version of https://archive.ics.uci.edu/ml/datasets/Adult

In [3]:
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("resources/adult.data")

In [4]:
df.cache()

DataFrame[age: string, workclass: string, fnlwgt: string, education: string, education-num: string, maritial-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: string, capital-loss: string, hours-per-week: string, native-country: string, category: string]

In [5]:
df.head()

Row(age=u'39', workclass=u' State-gov', fnlwgt=u' 77516', education=u' Bachelors', education-num=u' 13', maritial-status=u' Never-married', occupation=u' Adm-clerical', relationship=u' Not-in-family', race=u' White', sex=u' Male', capital-gain=u' 2174', capital-loss=u' 0', hours-per-week=u' 40', native-country=u' United-States', category=u' <=50K')

So as we can see Spark has simply loaded all of the values as strings since we haven't specified another schema. We can isntead ask it to infer the schema and also handle this extra space magic:

In [6]:
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("resources/adult.data")

In [7]:
df.head()

Row(age=39, workclass=u' State-gov', fnlwgt=77516.0, education=u' Bachelors', education-num=13.0, maritial-status=u' Never-married', occupation=u' Adm-clerical', relationship=u' Not-in-family', race=u' White', sex=u' Male', capital-gain=2174.0, capital-loss=0.0, hours-per-week=40.0, native-country=u' United-States', category=u' <=50K')

In [8]:
df.cache()

DataFrame[age: int, workclass: string, fnlwgt: double, education: string, education-num: double, maritial-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: double, capital-loss: double, hours-per-week: double, native-country: string, category: string]

Now we import a logistic regression model

In [9]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.param import Param, Params
from pyspark.ml.feature import Bucketizer, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline


The first step is prepairing the features, here we are just choosing existing numeric features:

In [10]:
assembler = VectorAssembler(inputCols=["age", "education-num"], outputCol="feautres")

Now the vector assembler only works on doubles, so we need to take our category and turn it into a double. The StringIndexer will do this for us:

In [11]:
indexer = StringIndexer(inputCol="category").setOutputCol("category-index")

In [12]:
pipeline = Pipeline().setStages([assembler, indexer])

We then need to "fit" our pipeline. This allows the StringIndexer to determine what strings will be assigned what indexes in the eventual transformation:

In [13]:
model=pipeline.fit(df)

We then transform our data into the prepaired format for our machine learning model to work on:

In [14]:
prepared = model.transform(df)

In [15]:
prepared.head()

Row(age=39, workclass=u' State-gov', fnlwgt=77516.0, education=u' Bachelors', education-num=13.0, maritial-status=u' Never-married', occupation=u' Adm-clerical', relationship=u' Not-in-family', race=u' White', sex=u' Male', capital-gain=2174.0, capital-loss=0.0, hours-per-week=40.0, native-country=u' United-States', category=u' <=50K', feautres=DenseVector([39.0, 13.0]), category-index=0.0)

In [16]:
dt = DecisionTreeClassifier(labelCol = "category-index", featuresCol="feautres")

And now we fit on the prepared data

In [17]:
dt_model = dt.fit(prepared)

In [18]:
dt_model

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4caaa33595b6d98d0500) of depth 5 with 61 nodes

Or we could make this part of the pipeline:

In [19]:
pipeline_and_model = Pipeline().setStages([assembler, indexer, dt])
pipeline_model = pipeline_and_model.fit(df)

In [20]:
dt_model.transform(prepared).select("prediction", "category-index").take(20)

[Row(prediction=1.0, category-index=0.0),
 Row(prediction=1.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=1.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=1.0, category-index=1.0),
 Row(prediction=1.0, category-index=1.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=1.0, category-index=1.0)]

In [21]:
pipeline_model.transform(df).select("prediction", "category-index").take(20)

[Row(prediction=1.0, category-index=0.0),
 Row(prediction=1.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=1.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=1.0, category-index=1.0),
 Row(prediction=1.0, category-index=1.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=1.0, category-index=1.0)]

What about if we had these as the label names?

In [24]:
labels = list(pipeline_model.stages[1].labels)

In [25]:
from pyspark.ml.feature import IndexToString
inverter = IndexToString(inputCol="prediction", outputCol="prediction-label", labels=labels)

In [26]:
inverter.transform(pipeline_model.transform(df)).select("prediction-label", "category").take(20)

[Row(prediction-label=u' >50K', category=u' <=50K'),
 Row(prediction-label=u' >50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' >50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' >50K'),
 Row(prediction-label=u' >50K', category=u' >50K'),
 Row(prediction-label=u' >50K', category=u' >50K'),
 Row(prediction-label=u' <=50K', category=u' >50K'),
 Row(prediction-label=u' <=50K', category=u' >50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' >50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', catego

In [27]:
pipeline_model.stages[2]

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4caaa33595b6d98d0500) of depth 5 with 61 nodes

In [36]:
from pyspark.sql.functions import *
df.groupBy("age").agg(min("hours-per-week"), avg("hours-per-week"), max("capital-gain"))

DataFrame[age: int, min(hours-per-week): double, avg(hours-per-week): double, max(capital-gain): double]

In [74]:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("age").orderBy("capital-gain").rowsBetween(-100, 100)

In [76]:
df.select(df["age"], df['capital-gain'], avg("capital-gain").over(windowSpec)).orderBy(desc("capital-gain")).show()

+---+------------+----------------------------------------------------------------------------------------------------------------+
|age|capital-gain|avg(capital-gain) OVER (PARTITION BY age ORDER BY capital-gain ASC ROWS BETWEEN 100 PRECEDING AND 100 FOLLOWING)|
+---+------------+----------------------------------------------------------------------------------------------------------------+
| 52|     99999.0|                                                                                              11563.377358490567|
| 52|     99999.0|                                                                                              11900.174757281553|
| 28|     99999.0|                                                                                               4736.762376237623|
| 44|     99999.0|                                                                                                9865.47572815534|
| 52|     99999.0|                                                          