# Notebook summary

This notebook contains two sections:

1. **Using decision trees in Spark MLlib:** MLlib is the old Spark Machine Learning library (Spark 1.x versions). It is based on the RDD data structure and, according to Databricks, it will be discontinued by Spark 3.0 version. This part of the notebook is for reference only, because RDD's are still used at the moment.
2. **Using decision trees in SPARK ML:** MLlib is the new Spark Machine Learning library and it will be the standard from now on (Spark 2.x version). It is based on the new Dataframe data structure. Dataframes are very similar to R Dataframes or Python Pandas Dataframes. You can jump to this second section if you are in a hurry.
3. **Using a pipeline in SPARK ML:** A pipeline is a method which is made of two or more methods. Each method can be a preprocessing step (i.e. feature selection, pca, ...) or a training algorithm (decision trees, ...).

## 1. Using decision trees in SPARK MLlib

For more information on the MLlib RDD-based Spark library: [http://spark.apache.org/docs/latest/mllib-guide.html]

You can find how to use other algorithms (Random Forest, Gradient Boosting, etc.) [here](https://github.com/apache/spark/tree/master/examples/src/main/python/mllib)

In [3]:
import sys
import os
import os.path
SPARK_HOME = """C:/spark-2.1.0-bin-hadoop2.7""" #CHANGE THIS PATH TO YOURS!

sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.4-src.zip")) #BEWARE WITH py4j version!!
sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "pyspark.zip"))
os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
sc = SparkContext(master="local[*]", appName="PythonDecisionTreeClassificationExample")

In [4]:
os.getcwd()

'C:\\Users\\aler.PC-164-148\\Google Drive\\TRABAJO LOCAL ALLWAYSSYNC\\DOCENCIA\\MASTER BIG DATA\\ACLASES 16-17\\misPruebasTerceraPractica'

In [5]:
%matplotlib inline
from pyspark.mllib.regression import LabeledPoint
import numpy as np
import matplotlib.pyplot as plt

** Let's load the Boston dataset and check its description. Its data about housing prices depending on the characteristics of the zone**

In [6]:
from sklearn.datasets import load_boston
boston = load_boston()


In [7]:
#print(boston.DESCR)
print(boston.feature_names)

['CRIM' 'ZN' 'INDUS' 'CHAS' 'NOX' 'RM' 'AGE' 'DIS' 'RAD' 'TAX' 'PTRATIO'
 'B' 'LSTAT']


**X will contain the input attributes, and y the output attribute. We can visualize the shape of the dataset (number of instances x number of input attributes), and the shape of the target (output) attribute.** 

In [8]:
X = boston.data
y = boston.target
data = zip(y,X)
print X.shape, y.shape

(506L, 13L) (506L,)


**Here we can see the first two instances (input and output attributes) of the dataset. Notice that data is a list of tuples (inputs, output).**

In [9]:
import pprint
print "X="
pprint.pprint(X[0:2])
print "y={}".format(y[0:2])
print "data="
pprint.pprint(data[0:2])

X=
array([[  6.32000000e-03,   1.80000000e+01,   2.31000000e+00,
          0.00000000e+00,   5.38000000e-01,   6.57500000e+00,
          6.52000000e+01,   4.09000000e+00,   1.00000000e+00,
          2.96000000e+02,   1.53000000e+01,   3.96900000e+02,
          4.98000000e+00],
       [  2.73100000e-02,   0.00000000e+00,   7.07000000e+00,
          0.00000000e+00,   4.69000000e-01,   6.42100000e+00,
          7.89000000e+01,   4.96710000e+00,   2.00000000e+00,
          2.42000000e+02,   1.78000000e+01,   3.96900000e+02,
          9.14000000e+00]])
y=[ 24.   21.6]
data=
[(24.0,
  array([  6.32000000e-03,   1.80000000e+01,   2.31000000e+00,
         0.00000000e+00,   5.38000000e-01,   6.57500000e+00,
         6.52000000e+01,   4.09000000e+00,   1.00000000e+00,
         2.96000000e+02,   1.53000000e+01,   3.96900000e+02,
         4.98000000e+00])),
 (21.600000000000001,
  array([  2.73100000e-02,   0.00000000e+00,   7.07000000e+00,
         0.00000000e+00,   4.69000000e-01,   6.42100000e+

**Let's convert the dataset into an Spark RDD. IMPORTANT: X and y were located into a SINGLE computer (driver). From now on, data_rdd is distributed among 4 partitions, which might be located in different computers.**

In [10]:
data_rdd = sc.parallelize(data,4)
print data_rdd.getNumPartitions()

4


**Here, we transform a RDD made of tuples into an RDD made of LabeledPoint. A LabeledPoint represents one instance, made of two elements. The first one contains the class or value to be predicted. The second one contains the features / attributes**

In [11]:
data_rdd = data_rdd.map(lambda x: LabeledPoint(x[0], x[1]))

In [12]:
pprint.pprint (data_rdd.take(3))

[LabeledPoint(24.0, [0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98]),
 LabeledPoint(21.6, [0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14]),
 LabeledPoint(34.7, [0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03])]


In [13]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

**Split the RDDs into 70% training and 30% test folds.**

In [14]:
(trainingData_rdd, testData_rdd) = data_rdd.randomSplit([0.7, 0.3])

In [15]:
#help(DecisionTree.trainRegressor)

**Create the regression tree on the training RDD**. 
More info in [https://spark.apache.org/docs/latest/mllib-decision-tree.html#classification]

In [16]:
# categoricalFeaturesInfo={} because all features are continous
model = DecisionTree.trainRegressor(trainingData_rdd, categoricalFeaturesInfo={}, maxDepth=5)

**Compute the MAE on the test RDD. IMPORTANT: zip is used to make tuples (label, prediction). This assumes that both RDDs (testData_rdd and predictions_rdd are in the same order. According to the documentation, this is the case. But in general, we cannot count on two RDDs being distributed in the same way into different machines.**

In [17]:
predictions_rdd = model.predict(testData_rdd.map(lambda lp: lp.features))
labelsAndPredictions = testData_rdd.map(lambda lp: lp.label).zip(predictions_rdd)
testErr = labelsAndPredictions.map(lambda (v, p): abs(v-p)).mean()
print('Test Error = ' + str(testErr))

Test Error = 3.04358899663


In [18]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
model = RandomForest.trainRegressor(trainingData_rdd, categoricalFeaturesInfo={}, numTrees=10, featureSubsetStrategy="auto", maxDepth=4, maxBins=32)

In [19]:
predictions_rdd = model.predict(testData_rdd.map(lambda lp: lp.features))
labelsAndPredictions = testData_rdd.map(lambda lp: lp.label).zip(predictions_rdd)
testErr = labelsAndPredictions.map(lambda (v, p): abs(v-p)).mean()
print('Test Error = ' + str(testErr))

Test Error = 2.43709290525


In [20]:
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
model = GradientBoostedTrees.trainRegressor(trainingData_rdd, categoricalFeaturesInfo={}, numIterations=20)

In [21]:
predictions_rdd = model.predict(testData_rdd.map(lambda lp: lp.features))
labelsAndPredictions = testData_rdd.map(lambda lp: lp.label).zip(predictions_rdd)
testErr = labelsAndPredictions.map(lambda (v, p): abs(v-p)).mean()
print('Test Error = ' + str(testErr))

Test Error = 2.55481886843


In [22]:
sc.stop()

## 2. Using decision trees in ML SPARK

ML is the Dataframe-based Spark library. This is the most modern library. For more information: [http://spark.apache.org/docs/latest/mllib-guide.html]

** The next cell starts Spark**. A difference between RDD-based Spark MLlib (old version 1.x) and the Dataframe-based Spark ML (Spark version 2.x) is the entry point: now, _SparkSession_ is used to start spark. A SparkContext for old RDD-based MLlib can still be accessed via spark.sparkContext.

In [23]:
import sys
import os
import os.path

SPARK_HOME = """C:/spark-2.1.0-bin-hadoop2.7""" #CHANGE THIS PATH TO YOURS!

sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.4-src.zip")) #BEWARE WITH py4j version!!
sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "pyspark.zip"))
os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    
sc = spark.sparkContext

** Let's load the Boston dataset and check its description. Its data about housing prices depending on the characteristics of the zone**

In [24]:
from sklearn.datasets import load_boston
boston = load_boston()

**X will contain the input attributes, and y the output attribute. We can visualize the shape of the dataset (number of instances x number of input attributes), and the shape of the target (output) attribute.** 

In [25]:
X = boston.data
y = boston.target
print X.shape, y.shape

(506L, 13L) (506L,)


**Names of attributes / features. _label_ is the output attribute.**

In [26]:
names=map(str,boston.feature_names)
names = ['label']+names

**Now, a Pandas Dataframe is created. This is not particularly important. This is done just as a preparation step to save the Boston dataset into a csv file because, commmonly, we always start reading data from a file.**

Note: an issue which is important to understand is that a Pandas Dataframe is not an Spark Dataframe. The former resides into a single machine. The latter is split into several partitions / machines for distributed processing.

In [27]:
import pandas as pd
data_pd = pd.DataFrame(X)
data_pd.insert(0, 'label', y)
data_pd.columns = names
data_pd.to_csv("boston.csv", index=False, header=True)

**The next cell reads the data (in csv format) into Spark.** _datasd_ is truly a Spark Dataframe.

In [28]:
data_sd = spark.read.csv("boston.csv",header=True,inferSchema=True)

** _show_ is a Spark command to visualize the first rows of the Spark dataframe.**

In [29]:
data_sd.show()

+-----+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+
|label|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|
+-----+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+
| 24.0|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98|
| 21.6|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14|
| 34.7|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03|
| 33.4|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94|
| 36.2|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33|
| 28.7|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21|
| 22.9|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0|   15.2| 395.6|12.43|
| 27.1|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0|   15.2| 396.9|19.15|
| 16.5|0.21124|12.5| 7.87| 0.0|0

**Next cell displays examples of Spark transformations:**

- _select()_ to select some columns and create other columns
- _filter()_ to filter some rows according to a condition

and of Spark actions:

- _show()_
- _count()_

**_repartition(4)_ is a command to distribute the dataframe into 4 partitions, which in principle, could reside into 4 different computers.**

In [30]:
print(type(data_sd))
# printSchema prints dataframe's column names and their types. dtypes
# contains the types of the columns.
print(data_sd.printSchema())
print(data_sd.dtypes)

# select selects columns. filter selects rows.
print(data_sd.select('label').show())
print(data_sd.select(data_sd['label'], data_sd['label']+1).show())
print(data_sd.filter(data_sd['label']>49).show())
print(data_sd.filter(data_sd['label']>49).count())

# repartition distributes the dataframe into x partitions.
print("Number of partition before repartition: {}".format(data_sd.rdd.getNumPartitions()))
data_sd=data_sd.repartition(4)
print("Number of partition after repartition: {}".format(data_sd.rdd.getNumPartitions()))

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- label: double (nullable = true)
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)

None
[('label', 'double'), ('CRIM', 'double'), ('ZN', 'double'), ('INDUS', 'double'), ('CHAS', 'double'), ('NOX', 'double'), ('RM', 'double'), ('AGE', 'double'), ('DIS', 'double'), ('RAD', 'double'), ('TAX', 'double'), ('PTRATIO', 'double'), ('B', 'double'), ('LSTAT', 'double')]
+-----+
|label|
+-----+
| 24.0|
| 21.6|
| 34.7|
| 33.4|
| 36.2|
| 28.7|
| 22.9|
| 27.1|
| 16.5|
| 18.9|
| 15.0|
| 18.9|
| 21.7|
| 20.4|
| 18.2|
| 19.9|
| 23.1|
| 17.5|
| 20.2

**The following cells prepare the dataframe for ML use**

The algorithms in Spark ML library need a dataframe with just two columns: the first one (typically named _features_) must contain a matrix with the input attributes, the second one must contain the output attribute (typically named _label_). In order to do that, _VectorAssembler_ is going to be used to put together all the input attributes.

In [31]:
from pyspark.ml.feature import VectorAssembler

ignore = ['label']

assembler = VectorAssembler(
    inputCols=[x for x in data_sd.columns if x not in ignore],
    outputCol='features')

data_sd = assembler.transform(data_sd).select(['label', 'features'])

This is how the first rows of the dataframe look like.

In [32]:
data_sd.show(truncate=False)

+-----+---------------------------------------------------------------------------+
|label|features                                                                   |
+-----+---------------------------------------------------------------------------+
|21.6 |[0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14]   |
|28.7 |[0.02985,0.0,2.18,0.0,0.458,6.43,58.7,6.0622,3.0,222.0,18.7,394.12,5.21]   |
|18.9 |[0.17004,12.5,7.87,0.0,0.524,6.004,85.9,6.5921,5.0,311.0,15.2,386.71,17.1] |
|20.4 |[0.62976,0.0,8.14,0.0,0.538,5.949,61.8,4.7075,4.0,307.0,21.0,396.9,8.26]   |
|17.5 |[0.7842,0.0,8.14,0.0,0.538,5.99,81.7,4.2579,4.0,307.0,21.0,386.75,14.67]   |
|19.6 |[0.85204,0.0,8.14,0.0,0.538,5.965,89.2,4.0123,4.0,307.0,21.0,392.53,13.83] |
|13.9 |[0.84054,0.0,8.14,0.0,0.538,5.599,85.7,4.4546,4.0,307.0,21.0,303.42,16.51] |
|21.0 |[1.00245,0.0,8.14,0.0,0.538,6.674,87.3,4.239,4.0,307.0,21.0,380.23,11.98]  |
|13.1 |[1.15172,0.0,8.14,0.0,0.538,5.701,95.0,3.7872,4.0,307.0,21.0,358.77,1

**Next cell imports Spark decision trees for regression, and methods for evaluating regression models.**

In [33]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

**We are going to use the train/test strategy for evaluation.** 70% are going to be used for training, 30% for evaluation.  

In [34]:
(trainingData_sd, testData_sd) = data_sd.randomSplit([0.7, 0.3])

**A decision tree with default parameters is trained.**

In [35]:
dt = DecisionTreeRegressor()
model=dt.fit(trainingData_sd)

**And predictions are computed on the test set.** The model is used to _transform_ the test dataset into a set of predictions.

In [36]:
predictions_sd = model.transform(testData_sd)
predictions_sd.show()

+-----+--------------------+------------------+
|label|            features|        prediction|
+-----+--------------------+------------------+
|  7.2|[16.8118,0.0,18.1...|11.108571428571429|
|  8.5|[7.67202,0.0,18.1...|11.108571428571429|
| 11.5|[8.15174,0.0,18.1...|11.108571428571429|
| 13.1|[1.15172,0.0,8.14...|15.053846153846154|
| 13.3|[9.82349,0.0,18.1...|11.108571428571429|
| 17.1|[9.72418,0.0,18.1...|11.108571428571429|
| 17.7|[3.69311,0.0,18.1...|20.594166666666663|
| 18.3|[0.26838,0.0,9.69...|20.594166666666663|
| 18.7|[0.22212,0.0,10.0...| 20.47142857142857|
| 18.9|[0.17004,12.5,7.8...| 20.47142857142857|
| 19.4|[0.21977,0.0,6.91...|             17.34|
| 20.0|[0.43571,0.0,10.5...| 20.47142857142857|
| 20.3|[0.07165,0.0,25.6...|21.400000000000006|
| 20.3|[0.14103,0.0,13.9...| 20.47142857142857|
| 21.6|[0.02731,0.0,7.07...|20.594166666666663|
| 21.6|[0.26938,0.0,9.9,...|20.594166666666663|
| 21.7|[3.8497,0.0,18.1,...|20.594166666666663|
| 22.0|[0.03537,34.0,6.0...|20.594166666

**Finally, MAE is computed on the _label_ and _prediction_ columns.**

In [37]:
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions_sd)
print(mae)

3.22542895135


In [38]:
maxk = len(trainingData_sd.take(1)[0].features)
print(maxk)

13


## 3. Using a pipeline in SPARK ML

In [39]:
from pyspark.ml.feature import PCA
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml import Pipeline

In [40]:
# Pipeline 1 = pca 5 components => selector 3 features => decision tree 
pca1 = PCA(k=3, inputCol="features")
dt1 = DecisionTreeRegressor(featuresCol=pca1.getOutputCol(), 
                           labelCol="label")

pipeline1 = Pipeline(stages=[pca1, dt1])

# Pipeline2 = selector3 => decision tree 

pca2 = PCA(k=5, inputCol="features")
dt2 = DecisionTreeRegressor(featuresCol=pca2.getOutputCol(), 
                           labelCol="label")
pipeline2 = Pipeline(stages=[pca2, dt2])

In [41]:
model1 = pipeline1.fit(trainingData_sd)
model2 = pipeline2.fit(trainingData_sd)

In [42]:
predictions1 = model1.transform(testData_sd)
predictions2 = model2.transform(testData_sd)

In [43]:
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="mae")
mae1 = evaluator.evaluate(predictions1)
mae2 = evaluator.evaluate(predictions2)
print("MAE. 3 components vs. 5 components: {} vs. {}".format(mae1,mae2))


MAE. 3 components vs. 5 components: 6.38704342096 vs. 5.68334521756


In [44]:
spark.stop()