Notebook inspired by this repository: https://github.com/dianewoodbridge/2018-msan697-example

In [100]:
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Implementation of Logistic regression, Decision Trees, Random Forest and K-means clustering using Spark ML

## Logistic Regression

#### Creating RDD

In [4]:
def toDoubleSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v) #if it is not a float type return as a string.

In [5]:
census_raw = sc.textFile("Data/adult.raw", 4).map(lambda x: x.split(', '))

In [6]:
census_raw.take(1)

[[u'39',
  u'State-gov',
  u'77516',
  u'Bachelors',
  u'Never-married',
  u'Adm-clerical',
  u'Not-in-family',
  u'White',
  u'Male',
  u'2174',
  u'0',
  u'40',
  u'United-States',
  u'<=50K']]

In [7]:
census_raw = census_raw.map(lambda x: [toDoubleSafe(i) for i in x]) ##is map one to one ??

In [8]:
census_raw.take(1)

[[39.0,
  'State-gov',
  77516.0,
  'Bachelors',
  'Never-married',
  'Adm-clerical',
  'Not-in-family',
  'White',
  'Male',
  2174.0,
  0.0,
  40.0,
  'United-States',
  '<=50K']]

#### RDD --> Dataframe

In [9]:
#schema
adultschema = StructType([
    StructField("age",DoubleType(),True),
    StructField("capital_gain",DoubleType(),True),
    StructField("capital_loss",DoubleType(),True),
    StructField("education",StringType(),True),
    StructField("fnlwgt",DoubleType(),True),
    StructField("hours_per_week",DoubleType(),True),
    StructField("income",StringType(),True),
    StructField("marital_status",StringType(),True),
    StructField("native_country",StringType(),True),
    StructField("occupation",StringType(),True),
    StructField("race",StringType(),True),
    StructField("relationship",StringType(),True),
    StructField("sex",StringType(),True),
    StructField("workclass",StringType(),True),
])

In [11]:

columns = ["age", "workclass", "fnlwgt", "education", "marital_status",
           "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss",
           "hours_per_week", "native_country", "income"]
dfraw = sqlContext.createDataFrame(census_raw.map(lambda row: Row(**{x[0]: x[1] for x in zip(columns, row)})), \
                                    adultschema) ##what is **??

In [12]:
dfraw.show(2)

+----+------------+------------+---------+-------+--------------+------+------------------+--------------+---------------+-----+-------------+----+----------------+
| age|capital_gain|capital_loss|education| fnlwgt|hours_per_week|income|    marital_status|native_country|     occupation| race| relationship| sex|       workclass|
+----+------------+------------+---------+-------+--------------+------+------------------+--------------+---------------+-----+-------------+----+----------------+
|39.0|      2174.0|         0.0|Bachelors|77516.0|          40.0| <=50K|     Never-married| United-States|   Adm-clerical|White|Not-in-family|Male|       State-gov|
|50.0|         0.0|         0.0|Bachelors|83311.0|          13.0| <=50K|Married-civ-spouse| United-States|Exec-managerial|White|      Husband|Male|Self-emp-not-inc|
+----+------------+------------+---------+-------+--------------+------+------------------+--------------+---------------+-----+-------------+----+----------------+
only showi

#### Cleaning data

In [42]:
##Task:Calculate missing values for each column??

48842

#### 1 Missing data imputation

In [13]:
#Check the most commonly used vals.
print dfraw.groupBy(dfraw["workclass"]).count().orderBy("count",ascending=False).show()
print dfraw.groupBy(dfraw["occupation"]).count().orderBy("count",ascending=False).show()
print dfraw.groupBy(dfraw["native_country"]).count().orderBy("count",ascending=False).show()

+----------------+-----+
|       workclass|count|
+----------------+-----+
|         Private|33906|
|Self-emp-not-inc| 3862|
|       Local-gov| 3136|
|               ?| 2799|
|       State-gov| 1981|
|    Self-emp-inc| 1695|
|     Federal-gov| 1432|
|     Without-pay|   21|
|    Never-worked|   10|
+----------------+-----+

None
+-----------------+-----+
|       occupation|count|
+-----------------+-----+
|   Prof-specialty| 6172|
|     Craft-repair| 6112|
|  Exec-managerial| 6086|
|     Adm-clerical| 5611|
|            Sales| 5504|
|    Other-service| 4923|
|Machine-op-inspct| 3022|
|                ?| 2809|
| Transport-moving| 2355|
|Handlers-cleaners| 2072|
|  Farming-fishing| 1490|
|     Tech-support| 1446|
|  Protective-serv|  983|
|  Priv-house-serv|  242|
|     Armed-Forces|   15|
+-----------------+-----+

None
+------------------+-----+
|    native_country|count|
+------------------+-----+
|     United-States|43832|
|            Mexico|  951|
|                 ?|  857|
|      

In [14]:
#Missing data imputation - Impute the most common row for "?".
dfrawrp = dfraw.na.replace(["?"], ["Private"], ["workclass"])
dfrawrpl = dfrawrp.na.replace(["?"], ["Prof-specialty"], ["occupation"])
dfrawnona = dfrawrpl.na.replace(["?"], ["United-States"], ["native_country"])

#### 2 Strings --> Numeric

Label Encoding

In [15]:
#converting strings to numeric values
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

dfnumeric = indexStringColumns(dfrawnona, ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country", "income"])

In [16]:
dfnumeric.show(1)

+----+------------+------------+-------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
| age|capital_gain|capital_loss| fnlwgt|hours_per_week|workclass|education|marital_status|occupation|relationship|race|sex|native_country|income|
+----+------------+------------+-------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
|39.0|      2174.0|         0.0|77516.0|          40.0|      3.0|      2.0|           1.0|       3.0|         1.0| 0.0|0.0|           0.0|   0.0|
+----+------------+------------+-------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
only showing top 1 row



One Hot Encoding

One hot encoding is stored as sparse matrix

In [20]:
from pyspark.ml.feature import OneHotEncoder
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric, ["workclass", "education", "marital_status", "occupation", "relationship", "race", "native_country"])

In [22]:
dfhot.show(1)

+----+------------+------------+-------+--------------+---+------+-------------+--------------+--------------+--------------+-------------+-------------+--------------+
| age|capital_gain|capital_loss| fnlwgt|hours_per_week|sex|income|    workclass|     education|marital_status|    occupation| relationship|         race|native_country|
+----+------------+------------+-------+--------------+---+------+-------------+--------------+--------------+--------------+-------------+-------------+--------------+
|39.0|      2174.0|         0.0|77516.0|          40.0|0.0|   0.0|(8,[3],[1.0])|(16,[2],[1.0])| (7,[1],[1.0])|(14,[3],[1.0])|(6,[1],[1.0])|(5,[0],[1.0])|(41,[0],[1.0])|
+----+------------+------------+-------+--------------+---+------+-------------+--------------+--------------+--------------+-------------+-------------+--------------+
only showing top 1 row



Vector assembler

Merge all the new vectors and the original columns into a single vector.

In [23]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
input_cols=["age","capital_gain","capital_loss","fnlwgt","hours_per_week","sex","workclass","education","marital_status","occupation","relationship","native_country","race"]

#VectorAssembler takes a number of column names(inputCols) and output column name (outputCol)
#and transforms a DataFrame to assemble the values in inputCols into one single vector with outputCol.
va = VectorAssembler(outputCol="features", inputCols=input_cols)
#lpoints - labeled data.
lpoints = va.transform(dfhot).select("features", "income").withColumnRenamed("income", "label")

In [27]:
lpoints.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(103,[0,1,3,4,9,1...|  0.0|
|(103,[0,3,4,7,16,...|  0.0|
|(103,[0,3,4,6,14,...|  0.0|
+--------------------+-----+
only showing top 3 rows



#### Training the model

In [28]:
#Divide the dataset into training and testing sets.
splits = lpoints.randomSplit([0.8, 0.2],24)

#cache() : the algorithm is interative and training and data sets are going to be reused many times.
adulttrain = splits[0].cache()
adultvalid = splits[1].cache()

In [29]:
#Train the model.
#reg: regularization parameter
#maxIter: for optimization
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.01, maxIter=1000, fitIntercept=True)
lrmodel = lr.fit(adulttrain)
#The above lines are same as..
#lr = LogisticRegression()
#lrmodel = lr.setParams(regParam=0.01, maxIter=1000, fitIntercept=True).fit(adulttrain)

#### Model Interpret

In [30]:
#Interpret the model parameters
print(lrmodel.coefficients)
print(lrmodel.intercept)

[0.0198274946326,0.000139814835621,0.000550187045489,5.54629690121e-07,0.0274608150982,-0.515594519893,0.01382149839,-0.35249892284,0.0439631401646,-0.142597660088,0.277554838479,0.595739259724,-0.347320395946,-1.37478837906,-0.349051694905,-0.00785397863856,0.748090528287,1.12540025465,0.146678826456,-1.01373180378,0.209274500298,-1.00531128956,-1.44290845972,1.53643439125,-1.30448828444,-0.663041131962,1.62089607588,-1.04713720904,-1.28466665808,-1.70908006866,0.836042113304,-0.670309114117,-0.321792590669,-0.364411451627,-0.291310624946,-0.162709684753,0.499092261525,0.215713447224,0.0393406761999,0.677491608141,-0.0278172563926,0.206178354796,-0.795144546117,-0.285291174623,-0.151522835706,-0.592151526088,-0.856416979039,0.49567120783,0.356768742735,-0.848285546249,-0.215254280311,0.452246152209,-0.104046502386,-0.787771274762,-0.292871185947,1.27688794184,-0.549981878962,0.203094277465,-0.608614587215,0.243256080394,0.330533724472,-0.166221797062,0.402883532489,-0.232475946827,0.0

#### Model prediction and evaluation

In [31]:
#Evaluate models using test dataset.
#First, transform the validation set.
validpredicts = lrmodel.transform(adultvalid)
validpredicts.show()

#rawPrediction : includes two values - log-odds that a sample doesn't and does belong to the category (making > 50,000).
#probability : the probability that the sample is not in the category.
#prediction : proability that the sample belongs to the category.
#validpredicts.select("rawPrediction").collect()
#validpredicts.select("probability").collect()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(103,[0,1,3,4,5,6...|  1.0|[-0.8881045325238...|[0.29150114020308...|       1.0|
|(103,[0,1,3,4,5,6...|  1.0|[0.39976627388180...|[0.59863150360595...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[-0.8778892471184...|[0.29361536963030...|       1.0|
|(103,[0,1,3,4,5,6...|  0.0|[3.68253132159345...|[0.97545824305503...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[3.20648983094234...|[0.96107777193858...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[3.00445632615502...|[0.95277504320852...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[3.12243604353193...|[0.95780878126680...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[2.88118997453155...|[0.94690871853674...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[-0.4273359342471...|[0.39476266497520...|       1.0|
|(103,[0,1,3,4,5

In [32]:
#Evaluate the model. default metric : Area Under ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bceval = BinaryClassificationEvaluator()
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderROC:0.901690303455


In [33]:
#Evaluate the model. metric : Area Under PR??
bceval.setMetricName("areaUnderPR")
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderPR:0.752158952034


#### Cross validation 

In [34]:
# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(5)
#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [1000]).addGrid(lr.regParam, [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]).build()
#setEstimatorParamMaps() takes ParamGridBuilder().
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(adulttrain)

In [35]:
print cvmodel.bestModel.coefficients
print cvmodel.bestModel.intercept
print cvmodel.bestModel._java_obj.getMaxIter()
print cvmodel.bestModel._java_obj.getRegParam()

[0.0221081107864,0.000310889154522,0.00064663413053,6.53753418245e-07,0.0311299676048,-0.703796749355,0.0211626292645,-0.419327807871,0.033436745195,-0.16926813171,0.234480056474,0.644489789633,-0.402568475973,-16.9359827978,-0.365807527177,0.020999622264,0.836000219488,1.24974986929,0.150176935288,-1.27247957443,0.250457344783,-1.14831500087,-1.74171618125,1.71280045321,-1.56447053219,-0.789286010333,1.82495012182,-1.22711393865,-1.65377444877,-5.49704679535,1.53767506263,-1.1344948079,-0.746712231993,-0.784425179142,-0.705517733042,-0.561285567148,1.21111633612,0.206245260691,0.0682727571705,0.720656715411,-0.00408505873994,0.2282812359,-0.952033370244,-0.283569938368,-0.137073138776,-0.645765693306,-0.975487681739,0.553935426785,0.424856459414,-1.45922308727,-0.197706863174,-0.120679724466,0.386680966828,-0.759623917007,0.229040590588,0.976736394224,-0.664084802114,0.193674976628,-0.730078953657,0.168874546783,0.323399114451,-0.160000317159,0.453499647341,-0.343718099217,-0.16611441

In [36]:
print BinaryClassificationEvaluator().evaluate(cvmodel.bestModel.transform(adultvalid))

0.904685356283


In [37]:
print BinaryClassificationEvaluator().setMetricName("areaUnderPR").evaluate(cvmodel.bestModel.transform(adultvalid))

0.764577694033


----

## Decision Trees

#### Creating RDD

In [38]:
pen_raw = sc.textFile("Data/penbased.dat", 4)

In [39]:
pen_raw.take(2) #pixels flattened

[u'47.0, 100.0, 27.0, 81.0, 57.0, 37.0, 26.0, 0.0, 0.0, 23.0, 56.0, 53.0, 100.0, 90.0, 40.0, 98.0, 8.0',
 u'0.0, 89.0, 27.0, 100.0, 42.0, 75.0, 29.0, 45.0, 15.0, 15.0, 37.0, 0.0, 69.0, 2.0, 100.0, 6.0, 2.0']

In [40]:
#Load the data and create an RDD (16 pixels and label)
pen_raw = pen_raw.map(lambda x:  x.split(", ")).map(lambda row: [float(x) for x in row])

In [41]:
pen_raw.take(1)

[[47.0,
  100.0,
  27.0,
  81.0,
  57.0,
  37.0,
  26.0,
  0.0,
  0.0,
  23.0,
  56.0,
  53.0,
  100.0,
  90.0,
  40.0,
  98.0,
  8.0]]

#### RDD --> Dataframe

In [58]:
penschema = StructType([
   StructField("features", ArrayType(elementType=FloatType(),containsNull=False),True),
   StructField("label", DoubleType(),True)
])

In [59]:
pen_df = sqlContext.createDataFrame(pen_raw.map(lambda x : Row(x[0:16],x[16])), penschema)

In [60]:
pen_df.dtypes

[('features', 'array<float>'), ('label', 'double')]

In [61]:
pen_df.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[47.0, 100.0, 27....|  8.0|
|[0.0, 89.0, 27.0,...|  2.0|
|[0.0, 57.0, 31.0,...|  1.0|
|[0.0, 100.0, 7.0,...|  4.0|
|[0.0, 67.0, 49.0,...|  1.0|
|[100.0, 100.0, 88...|  6.0|
|[0.0, 100.0, 3.0,...|  4.0|
|[0.0, 39.0, 2.0, ...|  0.0|
|[13.0, 89.0, 12.0...|  5.0|
|[74.0, 87.0, 31.0...|  9.0|
|[48.0, 96.0, 62.0...|  8.0|
|[100.0, 100.0, 72...|  5.0|
|[91.0, 74.0, 54.0...|  9.0|
|[0.0, 85.0, 38.0,...|  7.0|
|[35.0, 76.0, 57.0...|  3.0|
|[50.0, 84.0, 66.0...|  3.0|
|[99.0, 80.0, 63.0...|  9.0|
|[24.0, 66.0, 43.0...|  2.0|
|[0.0, 73.0, 19.0,...|  2.0|
|[12.0, 77.0, 20.0...|  5.0|
+--------------------+-----+
only showing top 20 rows



In [62]:
array_to_vector = udf(lambda l: Vectors.dense(l), VectorUDT())

In [63]:
pen_df = pen_df.select(array_to_vector(pen_df["features"]).alias("features"),'label')

In [64]:
pen_df.dtypes

[('features', 'vector'), ('label', 'double')]

Alternate method

In [None]:
# penschema = StructType([
#     StructField("pix1",DoubleType(),True),
#     StructField("pix2",DoubleType(),True),
#     StructField("pix3",DoubleType(),True),
#     StructField("pix4",DoubleType(),True),
#     StructField("pix5",DoubleType(),True),
#     StructField("pix6",DoubleType(),True),
#     StructField("pix7",DoubleType(),True),
#     StructField("pix8",DoubleType(),True),
#     StructField("pix9",DoubleType(),True),
#     StructField("pix10",DoubleType(),True),
#     StructField("pix11",DoubleType(),True),
#     StructField("pix12",DoubleType(),True),
#     StructField("pix13",DoubleType(),True),
#     StructField("pix14",DoubleType(),True),
#     StructField("pix15",DoubleType(),True),
#     StructField("pix16",DoubleType(),True),
#     StructField("label",DoubleType(),True)
# ])

# dfpen = sqlContext.createDataFrame(pen_raw.map(lambda x : Row(x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16])), penschema)

In [None]:
# # Merging the data with Vector Assembler.
# from pyspark.ml.feature import VectorAssembler
# va = VectorAssembler(outputCol="features", inputCols=dfpen.columns[0:-1]) #except the last col.
# penlpoints = va.transform(dfpen).select("features", "label")

#### Training

In [65]:
# Create Training and Test data.
pensplit = pen_df.randomSplit([0.8, 0.2])
pen_trn = pensplit[0].cache()
pen_val = pensplit[1].cache()

In [66]:
pen_trn.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.0,31.0,15....|  9.0|
|[0.0,0.0,34.0,12....|  9.0|
|[0.0,0.0,41.0,16....|  9.0|
|[0.0,0.0,42.0,18....|  9.0|
|[0.0,3.0,28.0,0.0...|  9.0|
|[0.0,4.0,74.0,29....|  1.0|
|[0.0,5.0,18.0,35....|  1.0|
|[0.0,9.0,42.0,38....|  1.0|
|[0.0,17.0,0.0,41....|  1.0|
|[0.0,20.0,47.0,42...|  1.0|
|[0.0,22.0,36.0,47...|  1.0|
|[0.0,24.0,43.0,47...|  1.0|
|[0.0,26.0,57.0,56...|  8.0|
|[0.0,28.0,22.0,49...|  1.0|
|[0.0,30.0,46.0,60...|  8.0|
|[0.0,34.0,25.0,52...|  1.0|
|[0.0,34.0,34.0,55...|  1.0|
|[0.0,34.0,43.0,67...|  1.0|
|[0.0,36.0,30.0,59...|  1.0|
|[0.0,36.0,35.0,56...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [68]:
# Train the data.
from pyspark.ml.classification import DecisionTreeClassifier
# Paramenters
#maxDepth : maximum tree depth (default : 5).
#maxBins : maximum number of bins when binning continuous features (default : 32). why is this required??
#minInstancesPerNode : minimum number of dataset samples each branch needs to have after a split (default : 1).
#minInfoGain : minimum information gain for a split (default : 0).
dt = DecisionTreeClassifier(maxDepth=20, maxBins= 32, minInstancesPerNode=1, minInfoGain = 0)
dtmodel = dt.fit(pen_trn)

In [69]:
print dtmodel._call_java('toDebugString')

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_46e884ac52f4d89afea2) of depth 20 with 615 nodes
  If (feature 15 <= 53.0)
   If (feature 4 <= 40.0)
    If (feature 9 <= 17.0)
     If (feature 14 <= 62.0)
      If (feature 1 <= 74.0)
       Predict: 1.0
      Else (feature 1 > 74.0)
       If (feature 3 <= 68.0)
        Predict: 4.0
       Else (feature 3 > 68.0)
        Predict: 6.0
     Else (feature 14 > 62.0)
      If (feature 12 <= 55.0)
       If (feature 0 <= 38.0)
        Predict: 1.0
       Else (feature 0 > 38.0)
        Predict: 8.0
      Else (feature 12 > 55.0)
       If (feature 0 <= 29.0)
        If (feature 7 <= 12.0)
         Predict: 7.0
        Else (feature 7 > 12.0)
         Predict: 2.0
       Else (feature 0 > 29.0)
        If (feature 0 <= 86.0)
         Predict: 4.0
        Else (feature 0 > 86.0)
         Predict: 6.0
    Else (feature 9 > 17.0)
     If (feature 1 <= 99.0)
      If (feature 0 <= 38.0)
       If (feature 7 <= 26.0)
        Predict: 

In [70]:
#Test data.
dtpredicts = dtmodel.transform(pen_val)

In [71]:
dtpredicts.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.0,0.0,18.0,27....|  1.0|[0.0,404.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,0.0,38.0,13....|  9.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       9.0|
|[0.0,0.0,51.0,9.0...|  9.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       9.0|
|[0.0,13.0,27.0,42...|  1.0|[0.0,404.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,23.0,63.0,46...|  8.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       8.0|
|[0.0,31.0,52.0,59...|  8.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       8.0|
|[0.0,33.0,31.0,60...|  1.0|[0.0,404.0,0.0,0....|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,42.0,10.0,60...|  1.0|[0.0,16.0,0.0,0.0...|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,43.0,35.0,60...|  1.0|[0.0,3.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|       1.0|
|[0.0,44.0,36.0,

In [73]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(dtpredicts)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0486815


In [74]:
#missclassification
dtpredicts.select('label','prediction').rdd.map(lambda x : (x,1)).countByKey() 

defaultdict(int,
            {Row(label=0.0, prediction=0.0): 187,
             Row(label=0.0, prediction=1.0): 1,
             Row(label=0.0, prediction=4.0): 1,
             Row(label=0.0, prediction=6.0): 2,
             Row(label=0.0, prediction=8.0): 3,
             Row(label=1.0, prediction=1.0): 183,
             Row(label=1.0, prediction=2.0): 12,
             Row(label=1.0, prediction=3.0): 2,
             Row(label=1.0, prediction=5.0): 1,
             Row(label=1.0, prediction=7.0): 3,
             Row(label=1.0, prediction=9.0): 3,
             Row(label=2.0, prediction=1.0): 10,
             Row(label=2.0, prediction=2.0): 195,
             Row(label=2.0, prediction=7.0): 1,
             Row(label=3.0, prediction=1.0): 1,
             Row(label=3.0, prediction=3.0): 173,
             Row(label=3.0, prediction=9.0): 2,
             Row(label=4.0, prediction=0.0): 2,
             Row(label=4.0, prediction=2.0): 1,
             Row(label=4.0, prediction=3.0): 1,
             

In [75]:
#Depreciated in Spark 2.0 -- Use accuracy
from pyspark.mllib.evaluation import MulticlassMetrics
dtresrdd = dtpredicts.select("prediction", "label").rdd #convert DataFrame to RDD.
dtmm = MulticlassMetrics(dtresrdd) 
print dtmm.precision() 
print(dtmm.confusionMatrix())



0.951318458418
DenseMatrix([[ 187.,    1.,    0.,    0.,    1.,    0.,    2.,    0.,    3.,
                 0.],
             [   0.,  183.,   12.,    2.,    0.,    1.,    0.,    3.,    0.,
                 3.],
             [   0.,   10.,  195.,    0.,    0.,    0.,    0.,    1.,    0.,
                 0.],
             [   0.,    1.,    0.,  173.,    0.,    0.,    0.,    0.,    0.,
                 2.],
             [   2.,    0.,    1.,    1.,  206.,    1.,    2.,    0.,    0.,
                 3.],
             [   0.,    0.,    0.,    2.,    0.,  196.,    0.,    0.,    6.,
                 3.],
             [   1.,    3.,    1.,    1.,    1.,    0.,  189.,    0.,    1.,
                 0.],
             [   0.,    1.,    1.,    1.,    0.,    0.,    2.,  215.,    1.,
                 1.],
             [   2.,    0.,    0.,    0.,    0.,    2.,    1.,    2.,  158.,
                 0.],
             [   1.,    0.,    1.,    0.,    0.,    7.,    0.,    1.,    1.,
               17

In [78]:
# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(dt).setEvaluator(evaluator).setNumFolds(5)
#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [5,10,15,20,25,30]).build()
#setEstimatorParamMaps() takes ParamGridBuilder().
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(pen_trn)
print cvmodel.bestModel._java_obj.getMaxDepth()
print "Accuracy : " +  str(MulticlassClassificationEvaluator().evaluate(cvmodel.bestModel.transform(pen_val)))

20
Accuracy : 0.951324656547


### Decision Tree -- Pipeline 

In [None]:
# # Transformer - Vector Assembler.
# from pyspark.ml.feature import VectorAssembler
# va = VectorAssembler(outputCol="features", inputCols=pen_df.columns[0:-1]) #except the last col.

# # Estimator - DecisionTreeClassifier which creates a transformer (Decision Tree Classifier model)
# from pyspark.ml.classification import DecisionTreeClassifier
# dt = DecisionTreeClassifier(maxDepth=20, maxBins= 32, minInstancesPerNode=1, minInfoGain = 0)

# # Fit the pipeline to training documents.
# from pyspark.ml import Pipeline
# pipeline = Pipeline(stages=[va,dt])
# dtmodel = pipeline.fit(pen_trn)

# dtpredicts = dtmodel.transform(pen_val)

# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# accuracy = evaluator.evaluate(dtpredicts)
# print("Test Error = %g" % (1.0 - accuracy))

# dtpredicts.select('label','prediction').rdd.map(lambda x : (x,1)).countByKey() 



----

## Random Forest

Using the same dataset so we'll proceed after the caching portion

Model Training

In [79]:
# Train the model.
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(maxDepth=20)
rfmodel = rf.fit(pen_trn)
print rfmodel._call_java('toDebugString')

RandomForestClassificationModel (uid=RandomForestClassifier_42fa8ca899b9fd0f67f5) with 20 trees
  Tree 0 (weight 1.0):
    If (feature 15 <= 53.0)
     If (feature 1 <= 99.0)
      If (feature 14 <= 87.0)
       If (feature 0 <= 60.0)
        If (feature 13 <= 16.0)
         If (feature 2 <= 84.0)
          If (feature 1 <= 66.0)
           If (feature 13 <= 0.0)
            If (feature 11 <= 20.0)
             If (feature 12 <= 31.0)
              If (feature 1 <= 57.0)
               If (feature 11 <= 11.0)
                Predict: 2.0
               Else (feature 11 > 11.0)
                Predict: 8.0
              Else (feature 1 > 57.0)
               If (feature 9 <= 40.0)
                If (feature 8 <= 56.0)
                 Predict: 7.0
                Else (feature 8 > 56.0)
                 Predict: 1.0
               Else (feature 9 > 40.0)
                Predict: 1.0
             Else (feature 12 > 31.0)
              Predict: 3.0
            Else (feature 11 > 20.0)
  

In [80]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
rfpredicts = rfmodel.transform(pen_val)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(rfpredicts)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0111562


In [81]:
#Confusion Matrix
rfpredicts.select('label','prediction').rdd.map(lambda x : (x,1)).countByKey()

defaultdict(int,
            {Row(label=0.0, prediction=0.0): 190,
             Row(label=0.0, prediction=1.0): 1,
             Row(label=0.0, prediction=8.0): 3,
             Row(label=1.0, prediction=1.0): 198,
             Row(label=1.0, prediction=2.0): 3,
             Row(label=1.0, prediction=3.0): 3,
             Row(label=2.0, prediction=2.0): 206,
             Row(label=3.0, prediction=3.0): 174,
             Row(label=3.0, prediction=4.0): 1,
             Row(label=3.0, prediction=9.0): 1,
             Row(label=4.0, prediction=4.0): 215,
             Row(label=4.0, prediction=9.0): 1,
             Row(label=5.0, prediction=5.0): 205,
             Row(label=5.0, prediction=9.0): 2,
             Row(label=6.0, prediction=4.0): 1,
             Row(label=6.0, prediction=6.0): 196,
             Row(label=7.0, prediction=7.0): 220,
             Row(label=7.0, prediction=8.0): 1,
             Row(label=7.0, prediction=9.0): 1,
             Row(label=8.0, prediction=7.0): 3,
       

## K-means clustering

Using the pixel data

In [94]:
from pyspark.ml.clustering import KMeans
kmeans =  KMeans(k = 10, maxIter = 200, tol = 0.1) # k = 10 as there are 10 different handwritten numbers.
model = kmeans.fit(pen_df)

In [95]:
# Evaluate clustering by computing Within Set Sum of Squared Errors
wssse = model.computeCost(pen_df) 
print("Within Set Sum of Squared Errors = " + str(wssse))

Within Set Sum of Squared Errors = 46160858.0701


In [99]:
pen_df.count()

9912

In [96]:
# Average distance from the center (max = 100)
import math
print("Average distance from the center = " + str(math.sqrt(wssse/pen_df.count())))

Average distance from the center = 68.2427139368


In [97]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[ 88.00580833  97.78993224  52.64762827  87.28848015  21.28944821
  59.95062924   7.01548887  28.31945789  32.33881897   4.47918683
  79.51016457  11.47821878  62.06582769  30.81219748  13.39303001
  24.91674734]
[ 27.44980443  83.71968709  63.03259452  94.55997392  85.55280313
  87.22946545  55.13233377  65.58148631  69.64602347  45.38787484
  87.32920469  22.85397653  52.22946545   7.26597132   4.30247718
   9.58083442]
[ 44.53996448  98.30195382  13.6660746   77.04795737   5.36234458
  49.47424512  69.21669627  47.98401421  96.60923623  65.72824156
  77.97513321  67.89698046  62.92717584  34.38543517  50.60035524
   0.34280639]
[ 87.46744186  87.90813953  58.16046512  92.40232558  35.72325581
  79.76046512  56.75813953  74.44883721  80.47674419  63.75232558
  81.59302326  32.18372093  48.70232558   7.74534884   4.32906977
   3.93604651]
[  3.26024096  60.78554217  30.41204819  72.74578313  72.10240964
  89.77951807  91.46746988  94.43493976  79.77228916  73.8590361

In [98]:
model.transform(pen_df).select('label', 'prediction').rdd.map(lambda x : (x,1)).countByKey()  
# prediction is a group, not an actual label.

defaultdict(int,
            {Row(label=0.0, prediction=0): 31,
             Row(label=0.0, prediction=2): 12,
             Row(label=0.0, prediction=3): 1,
             Row(label=0.0, prediction=4): 2,
             Row(label=0.0, prediction=5): 352,
             Row(label=0.0, prediction=6): 630,
             Row(label=0.0, prediction=8): 6,
             Row(label=0.0, prediction=9): 3,
             Row(label=1.0, prediction=0): 7,
             Row(label=1.0, prediction=1): 70,
             Row(label=1.0, prediction=3): 66,
             Row(label=1.0, prediction=4): 573,
             Row(label=1.0, prediction=8): 304,
             Row(label=1.0, prediction=9): 2,
             Row(label=2.0, prediction=4): 16,
             Row(label=2.0, prediction=8): 1006,
             Row(label=3.0, prediction=1): 919,
             Row(label=3.0, prediction=3): 2,
             Row(label=3.0, prediction=4): 19,
             Row(label=3.0, prediction=8): 1,
             Row(label=3.0, prediction=9): 1