### Read Data into schemaRDD using HiveContext
   Because many times our data will come from relational data structures which we can view in [Hive](https://hive.apache.org/) or [Impala](http://impala.io/index.html), we will start our analysis in the same way. We can do this using one of the two contexts which are exposed when we start pySpark. The first, ***sc***, is a [pyspark.SparkContext](https://spark.apache.org/docs/1.3.0/api/python/pyspark.html?highlight=sparkcontext#pyspark.SparkContext) which is main entry point into spark functionality and represents the connection to our spark cluster. The second, ***sqlCtx***, is a [pyspark.HiveContext](https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html?highlight=hivecontext#pyspark.sql.HiveContext) which will be our main entry point to spark SQL functionality. It is Hive enabled so we can write and evaluate queries which are already loaded into Hive.

In [None]:
dat = sqlCtx.sql('SELECT value, acct_bal, age FROM german_parquet limit 10')
dat.collect()

The german credit data was loaded during the Vagrant up process. It is a popular dataset for binary classification with categorical data. Since the original data set was in German, we are providing comperable engligh column names, identify if the field is continuous or categorical, and the number of categories if categorical. If you are interested, you can check out what each of the categories are [here](http://www.statistik.lmu.de/service/datenarchiv/kredit/kreditvar_e.html).

| Field        | Descripion                        | Cont. | Cat. | NumCat |
| ------------ | --------------------------------- |:-----:|:----:|:------:|
| cred         | Creditability (Label)             | n/a   | n/a  | n/a    |
| acct_bal     | Balance of current account        |       | X    | 4      |
| dur_cred     | Duration of Credit (months)       | X     |      |        |
| pay_stat     | Payment Status of Previous Credit | X     |      |        |
| purpose      | Purpose                           | X     |      |        |
| cred_amt     | Credit Amount                     | X     |      |        |
| value        | Value Savings/Stocks              |       | X    | 5      |
| len_emp      | Length of current employment      |       | X    | 5      |
| install_pc   | Instalment per cent               |       | X    | 4      |
| sex_married  | Sex & Marital Status              |       | X    | 4      |
| guarantors   | Guarantors                        |       | X    | 3      |
| dur_addr     | Duration in Current address       |       | X    | 4      |
| max_val      | Most valuable available asset     |       | X    | 4      |
| age          | Age (years)                       | X     |      |        |
| concurr      | Concurrent Credits                |       | X    | 3      |
| typ_aprtmnt  | Type of apartment                 |       | X    | 3      |
| no_creds     | No of Credits at this Bank        |       | X    | 4      |
| occupation   | Occupation                        |       | X    | 4      |
| no_dep       | No of dependents                  |       | X    | 2      |
| telephone    | Telephone                         |       | X    | 2      |
| foreign_wkr  | Foreign Worker                    |       | X    | 2      |


Since we will be working with all data in the german table, we will reassign the [DataFrame](https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame) variable, dat, to include the entire data set. We can also take a look at the metadata and see it matches above. It's really convenient to be able to reuse metadata already defined in the Hive matastore.

In [None]:
dat = sqlCtx.sql('SELECT * FROM german_parquet')
dat.printSchema()

### Data cleaning & formatting
   Our data source was fortunately well documented with fully populated records (this will not always be the case). However, there are two modifications that we will consistently need to make to data to prepare it for MLlib functions:
#### Modify field data and type
   More specifically make sure that the fields are converted into a form that is expected by Spark MLlib. In our example we will be running a classification decision tree where many of the fields are categorical. The standard form expected by mllib.tree is that the index starts at zero. Further we need to be sure that fields consistently map to the same number to ensure that the model is being applied appropriately. Luckily, this category to number mapping was already done for use with a small exception; in some fields the categories increment starting from 1 however, mllib.tree expects them to increment from 0.
#### Modify row type
   This row type, is really to put data into an object that is serializable and performs well. Additionally, the row type inherently identifies which column is considered the response or 'label' for the record, which is necessary information when working with supervised learning algorithms. The row form is called a [LabeledPoint](https://spark.apache.org/docs/1.3.0/api/python/pyspark.mllib.html?highlight=labeledpoint#pyspark.mllib.regression.LabeledPoint). The expected RDD used in MLlib is an RDD of LabeledPoints.

#### A transform function for LabeledPoints
   Since we know that we will frequently want our data to be used in analysis, we can create a function which maps our data into the appropriate form. Fortunately, both modifications above can be done in a simple map of rows from one form to another. This might not always be the case, and there could be instances where aggregation is done to data in the form of a reduction. We will not explore that here.


In [None]:
from pyspark.mllib.regression import LabeledPoint
def german_lp(x):
    vals=x.asDict()
    label=vals['cred']
    feats=[vals['acct_bal']-1,
           vals['dur_cred'],
           vals['pay_stat'],
           vals['purpose'],
           vals['cred_amt'],
           vals['value']-1,
           vals['len_emp']-1,
           vals['install_pc']-1,
           vals['sex_married']-1,
           vals['guarantors']-1,
           vals['dur_addr']-1,
           vals['max_val']-1,
           vals['age'],
           vals['concurr']-1,
           vals['typ_aprtmnt']-1,
           vals['no_creds']-1,
           vals['occupation']-1,
           vals['no_dep']-1,
           vals['telephone']-1,
           vals['foreign_wkr']-1]
    return LabeledPoint(label, feats)

Managed Seperately as an [dict](https://docs.python.org/2/library/stdtypes.html#mapping-types-dict), we need to identify which columns are categorical data and which are continuous. The dict will include all the featuress which are categorical data mapping to the number of categories are in that feature. Continuous features are identified by their omission.

In [None]:
german_cfi = {0:4,5:5,6:5,7:4,8:4,9:3,10:4,11:4,13:3,14:3,15:4,16:4,17:2,18:2,19:2}

With our function defined, it is now a simple one liner to convert our DataFrame, dat, into an RDD of Labeled Points suitable for MLlib. We will use the map function. Notice, we cache() the change. This will retain a copy of the rdd in lp form into memory. This will make iterative evaluations more performent.

In [None]:
lp = dat.map(german_lp).cache()
lp.take(3)

#### Training a Decision Tree Model
   MLlib has enhamcements may come with any version. Be sure to check the online documentation between upgrades for improvements. The documentation for model we will evaluate today can be found in [Apache Spark Documentation](https://spark.apache.org/docs/1.3.0/api/python/pyspark.mllib.html#module-pyspark.mllib.tree). From there we find the following parameters for training a DecisionTreeModel:

| Parameter                         | Descripion                                                                                                         |
| ----------------------------- | ------------------------------------------------------------------------------------------------------------------ |
| ***data***                    | Training data: RDD of LabeledPoint. Labels are integers {0,1,...,numClasses}.                                      |
| ***numClasses***              | Number of classes for classification.                                                                              |
| ***categoricalFeaturesInfo*** | Map from categorical feature index to number of categories. Any feature not in this map is treated as continuous.  |
| ***impurity***                | Supported values: “entropy” or “gini”                                                                              |
| ***maxDepth***                | Max depth of tree. E.g., depth 0 means 1 leaf node. Depth 1 means 1 internal node + 2 leaf nodes.                  |
| ***maxBins***                 | Number of bins used for finding splits at each node.                                                               |
| ***minInstancesPerNode***     | Min number of instances required at child nodes to create the parent split                                         |
| ***minInfoGain***             | Min info gain required to create a split                                                                           |


In [None]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

model = DecisionTree.trainClassifier(lp, numClasses=2,
                                     categoricalFeaturesInfo=german_cfi,
                                     impurity='gini',
                                     maxDepth=3, 
                                     maxBins=5)
model.toDebugString()

While it is nice to be able to view the logic in a model, this text form doesn't show well how a decision tree works. Unfortunately, there is no MLlib Decision Tree plot functionality that I am aware of. For those still learning what a decision tree model is, we'll run this data set through an existing local in memory decision tree library just to view the visualization. NOTE w/ DISCLAIMER: the following snippet does not handel categorical variables as desired and will return a different model, this plot is only provided to visualize what a decision tree model does:

In [None]:
from __future__ import division
import pandas as pd
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.tree import export_graphviz
from sklearn.cross_validation import train_test_split
from sklearn.cross_validation import KFold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import mean_squared_error
from IPython.display import Image
import StringIO, pydot
%matplotlib inline

X=lp.map(lambda x: np.array(x.features)).collect()
y=lp.map(lambda x: np.array(x.label)).collect()
clf = DecisionTreeClassifier(criterion='gini', max_depth=3, max_leaf_nodes=5)
clf.fit(X, y)


# Visualize tree
dot_data = StringIO.StringIO()
export_graphviz(clf, out_file=dot_data)
graph = pydot.graph_from_dot_data(dot_data.getvalue())
Image(graph.create_png())

Returing to our MLlib model. Once our model is fit, it can easily be used to predict against a data set that is already in Labeled Point form. However, with all the parameters that are available, how are we sure that we've picked the right model. We are going to evaluate using a training and validation set. Then by iterating through our parameters and different samples, we'll discover which settings have good predictive properties.

In [None]:
val_dat = pd.DataFrame(columns=('obs', 'max_depth', 'num_nodes', 'train_err', 'val_err'))

for obs in range(30):
  for max_depth in range(2,8):
    (lp_train, lp_val) = lp.randomSplit([0.8, 0.2])
    model = DecisionTree.trainClassifier(lp_train, numClasses=2,
                                         categoricalFeaturesInfo=german_cfi,
                                         impurity='gini',
                                         minInstancesPerNode=10,
                                         maxDepth=max_depth)
    te = lp_train.map(lambda lp: lp.label).zip(model.predict(lp_train.map(lambda lp: lp.features)))
    ve = lp_val.map(lambda lp: lp.label).zip(model.predict(lp_val.map(lambda lp: lp.features)))
    train_err = te.filter(lambda (v, p): v != p).count() / float(te.count())
    val_err = ve.filter(lambda (v, p): v != p).count() / float(ve.count())
    val_dat.loc[len(val_dat)] = [obs,model.depth(),model.numNodes(),train_err,val_err]



Increasing the complexity (number of nodes in a model) doesn't necessarily improve the error in the validation data set. By running this over very many sample data splits, averaging and splining, we can get to a representative graphic for the effects of complexity on training and validation errors. Specifically, up until approximately 23 nodes we tend to see a reduction in error which is good. However, after approximately 23 we tend to see an increase in error which is bad. This phenomenon of the validation error increasing with model complexity is called '[over fitting](https://en.wikipedia.org/wiki/Overfitting#Machine_learning)'.The idea is that the model has become to specific to the training data and contains detail not general to the population. We must be conscious of this risk when selecting our models. Understanding this effect will help us refine our training parameters:

In [None]:
from scipy.interpolate import UnivariateSpline
vd_nodes=val_dat.groupby(['num_nodes'])['train_err','val_err'].median()
vd_depth=val_dat.groupby(['max_depth'])['train_err','val_err'].median()

fig = plt.figure()
fig.set_size_inches(16,4)
plot_vdn = fig.add_subplot(1,2,1)
plot_vdd = fig.add_subplot(1,2,2)

vdn_y  = vd_nodes.index.values
vdn_tx = vd_nodes.loc[:,'train_err']
vdn_vx = vd_nodes.loc[:,'val_err']

vdn_xs = np.linspace(0, 90, 1000)

#plt.plot(vdn_y, vdn_tx, 'bo', ms=5)
vdnt_spl = UnivariateSpline(vdn_y,vdn_tx)
vdnt_spl.set_smoothing_factor(0.5)
plot_vdn.plot(vdn_xs, vdnt_spl(vdn_xs), 'b', lw=3)

#plt.plot(vdn_y, vdn_vx, 'ro', ms=5)
vdnv_spl = UnivariateSpline(vdn_y,vdn_vx)
vdnv_spl.set_smoothing_factor(0.5)
plot_vdn.plot(vdn_xs, vdnv_spl(vdn_xs), 'r', lw=3)
plot_vdn.axis([0, 70, 0.2,0.30])
plot_vdn.set_xlabel('Number of Nodes')
plot_vdn.set_ylabel('Model Error')
plot_vdn.set_title('Training & Validation Model Error Vs. Number Nodes')

vdd_y  = vd_depth.index.values
vdd_tx = vd_depth.loc[:,'train_err']
vdd_vx = vd_depth.loc[:,'val_err']

vdd_xs = np.linspace(0, 7, 500)

#plt.plot(vdn_y, vdn_tx, 'bo', ms=5)
vddt_spl = UnivariateSpline(vdd_y,vdd_tx)
vddt_spl.set_smoothing_factor(0.5)
plot_vdd.plot(vdd_xs, vddt_spl(vdd_xs), 'b', lw=3)

#plt.plot(vdn_y, vdn_vx, 'ro', ms=5)
vddv_spl = UnivariateSpline(vdd_y,vdd_vx)
vddv_spl.set_smoothing_factor(0.5)
plot_vdd.plot(vdd_xs, vddv_spl(vdd_xs), 'r', lw=3)
plot_vdd.axis([2, 6, 0.2,0.30])
plot_vdd.set_xlabel('Parameter maxDepth')
plot_vdd.set_ylabel('Model Error')
plot_vdd.set_title('Training & Validation Model Error Vs. Parameter maxDepth')

plt.show()