# Spark and Machine Learning

## What is Machine Learning?

** Machine learning (ML) is the study of computer algorithms that improve automatically through experience and by the use of data.**

* Machine learning algorithms are used in a wide variety of applications.
>* medicine, email filtering, and computer vision, where it is difficult or unfeasible to develop conventional algorithms to perform the needed tasks. 



<img align="right" width="450" height="450" src="https://upload.wikimedia.org/wikipedia/commons/f/fe/Fig-X_All_ML_as_a_subfield_of_AI.jpg"> 





* Machine learning approaches are traditionally divided into three broad categories:

    >* **Supervised learning:** The computer is presented with example inputs and their desired outputs, given by a "teacher", and the goal is to learn a general rule that maps inputs to outputs.
    
    <img width="600" src="https://miro.medium.com/max/700/1*ASYpFfDh7XnreU-ygqXonw.png">
     
     >* **Unsupervised learning:** No labels are given to the learning algorithm, leaving it on its own to find structure in its input. Unsupervised learning can be a goal in itself (discovering hidden patterns in data) or a means towards an end (feature learning).
     
     <img width="600" src ="https://www.guru99.com/images/1/030819_1030_Unsupervise3.png">
     
     >* **Reinforcement learning:** A computer program interacts with a dynamic environment in which it must perform a certain goal (such as driving a vehicle or playing a game against an opponent). As it navigates its problem space, the program is provided feedback that's analogous to rewards, which it tries to maximize




## Supervised vs. Unsupervised Learning



<table  style="width:100%"><tbody><tr><td><strong>Parameters </strong> </td><td><strong>Supervised machine learning technique </strong> </td><td><strong>Unsupervised machine learning technique </strong> </td></tr><tr><td>Process </td><td>In a supervised learning model, input and output variables will be given. </td><td>In unsupervised learning model, only input data will be given </td></tr><tr><td>Input Data </td><td>Algorithms are trained using labeled data. </td><td>Algorithms are used against data which is not labeled </td></tr><tr><td>Algorithms Used </td><td>Support vector machine, Neural network, Linear and logistics regression, random forest, and Classification trees. </td><td>Unsupervised algorithms can be divided into different categories: like Cluster algorithms, K-means, Hierarchical clustering, etc. </td></tr><tr><td>Computational Complexity </td><td>Supervised learning is a simpler method. </td><td>Unsupervised learning is computationally complex </td></tr><tr><td>Use of Data </td><td>Supervised learning model uses training data to learn a link between the input and the outputs. </td><td>Unsupervised learning does not use output data. </td></tr><tr><td>Accuracy of Results </td><td>Highly accurate and trustworthy method. </td><td>Less accurate and trustworthy method. </td></tr><tr><td>Real Time Learning </td><td>Learning method takes place offline. </td><td>Learning method takes place in real time. </td></tr><tr><td>Number of Classes </td><td>Number of classes is known. </td><td>Number of classes is not known. </td></tr><tr><td>Main Drawback </td><td>Classifying big data can be a real challenge in Supervised Learning. </td><td>You cannot get precise information regarding data sorting, and the output as data used in unsupervised learning is labeled and not known. </td></tr></tbody></table> 
_Ref: https://www.guru99.com/supervised-vs-unsupervised-learning.html_

## Classification 
* Categorize data into groups
>* e.g., Spam vs. Not Spam
* Groups provided by user 
>* i.e., Supervised 

### The most popular classification algorithms
<center>
    <img width="800" src="https://serokell.io/files/fx/fxpez2t8.7_(4)_(1).jpg">
</center>

## Regression
* Variable relationship
* Prediction and forecasting 
* Supervised

<img  width="700" src="https://upload.wikimedia.org/wikipedia/commons/b/be/Normdist_regression.png">

## Clustering

* Inputs to groups
>* Groups are not known
>* Number of groups are not known
>* Unsupervised

<img  width="700" src="https://upload.wikimedia.org/wikipedia/commons/thumb/c/c8/Cluster-2.svg/1920px-Cluster-2.svg.png">

# Regression

## Example 1: Regression

In [0]:
%sh curl -O 'https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_linear_regression_data.txt'
# saves file to file:/databricks/driver/sample_linear_regression_data.txt

In [0]:
#path
path = 'file:/databricks/driver/sample_linear_regression_data.txt'

training = spark.read.format('libsvm')\
    .load( 'file:/databricks/driver/sample_linear_regression_data.txt')
display(training)

label,features
-9.490009878824548,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.4551273600657362, 0.36644694351969087, -0.38256108933468047, -0.4458430198517267, 0.33109790358914726, 0.8067445293443565, -0.2624341731773887, -0.44850386111659524, -0.07269284838169332, 0.5658035575800715))"
0.2577820163584905,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.8386555657374337, -0.1270180511534269, 0.499812362510895, -0.22686625128130267, -0.6452430441812433, 0.18869982177936828, -0.5804648622673358, 0.651931743775642, -0.6555641246242951, 0.17485476357259122))"
-4.438869807456516,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.5025608135349202, 0.14208069682973434, 0.16004976900412138, 0.505019897181302, -0.9371635223468384, -0.2841601610457427, 0.6355938616712786, -0.1646249064941625, 0.9480713629917628, 0.42681251564645817))"
-19.782762789614537,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.0388509668871313, -0.4166870051763918, 0.8997202693189332, 0.6409836467726933, 0.273289095712564, -0.26175701211620517, -0.2794902492677298, -0.1306778297187794, -0.08536581111046115, -0.05462315824828923))"
-7.966593841555266,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.06195495876886281, 0.6546448480299902, -0.6979368909424835, 0.6677324708883314, -0.07938725467767771, -0.43885601665437957, -0.608071585153688, -0.6414531182501653, 0.7313735926547045, -0.026818676347611925))"
-7.896274316726144,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.15805658673794265, 0.26573958270655806, 0.3997172901343442, -0.3693430998846541, 0.14324061105995334, -0.25797542063247825, 0.7436291919296774, 0.6114618853239959, 0.2324273700703574, -0.25128128782199144))"
-8.464803554195287,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.39449745853945895, 0.817229160415142, -0.6077058562362969, 0.6182496334554788, 0.2558665508269453, -0.07320145794330979, -0.38884168866510227, 0.07981886851873865, 0.27022202891277614, -0.7474843534024693))"
2.1214592666251364,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.005346215048158909, -0.9453716674280683, -0.9270309666195007, -0.032312290091389695, 0.31010676221964206, -0.20846743965751569, 0.8803449313707621, -0.23077831216541722, 0.29246395759528565, 0.5409312755478819))"
1.0720117616524107,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.7880855916368177, 0.19767407429003536, 0.9520689432368168, -0.845829774129496, 0.5502413918543512, -0.44235539500246457, 0.7984106594591154, -0.2523277127589152, -0.1373808897290778, -0.3353514432305029))"
-13.772441561702871,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.3697050572653644, -0.11452811582755928, -0.807098168238352, 0.4903066124307711, -0.6582805242342049, 0.6107814398427647, -0.7204208094262783, -0.8141063661170889, -0.9459402662357332, 0.09666938346350307))"


In [0]:
from pyspark.ml.regression import LinearRegression

# Load training data


training = spark.read.format('libsvm')\
    .load( 'file:/databricks/driver/sample_linear_regression_data.txt')

lr = LinearRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)


In [0]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [0]:
predictions = lrModel.transform(training)
#predictionsB = modelB.transform(data)

display(predictions)

label,features,prediction
-9.490009878824548,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.4551273600657362, 0.36644694351969087, -0.38256108933468047, -0.4458430198517267, 0.33109790358914726, 0.8067445293443565, -0.2624341731773887, -0.44850386111659524, -0.07269284838169332, 0.5658035575800715))",0.3992228042786485
0.2577820163584905,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.8386555657374337, -0.1270180511534269, 0.499812362510895, -0.22686625128130267, -0.6452430441812433, 0.18869982177936828, -0.5804648622673358, 0.651931743775642, -0.6555641246242951, 0.17485476357259122))",-0.2955974176468648
-4.438869807456516,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.5025608135349202, 0.14208069682973434, 0.16004976900412138, 0.505019897181302, -0.9371635223468384, -0.2841601610457427, 0.6355938616712786, -0.1646249064941625, 0.9480713629917628, 0.42681251564645817))",0.7651496483023066
-19.782762789614537,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.0388509668871313, -0.4166870051763918, 0.8997202693189332, 0.6409836467726933, 0.273289095712564, -0.26175701211620517, -0.2794902492677298, -0.1306778297187794, -0.08536581111046115, -0.05462315824828923))",0.7839239258929726
-7.966593841555266,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.06195495876886281, 0.6546448480299902, -0.6979368909424835, 0.6677324708883314, -0.07938725467767771, -0.43885601665437957, -0.608071585153688, -0.6414531182501653, 0.7313735926547045, -0.026818676347611925))",1.4831466765011343
-7.896274316726144,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.15805658673794265, 0.26573958270655806, 0.3997172901343442, -0.3693430998846541, 0.14324061105995334, -0.25797542063247825, 0.7436291919296774, 0.6114618853239959, 0.2324273700703574, -0.25128128782199144))",-0.9871618140066576
-8.464803554195287,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.39449745853945895, 0.817229160415142, -0.6077058562362969, 0.6182496334554788, 0.2558665508269453, -0.07320145794330979, -0.38884168866510227, 0.07981886851873865, 0.27022202891277614, -0.7474843534024693))",1.5395124755034428
2.1214592666251364,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.005346215048158909, -0.9453716674280683, -0.9270309666195007, -0.032312290091389695, 0.31010676221964206, -0.20846743965751569, 0.8803449313707621, -0.23077831216541722, 0.29246395759528565, 0.5409312755478819))",0.0590614595746521
1.0720117616524107,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.7880855916368177, 0.19767407429003536, 0.9520689432368168, -0.845829774129496, 0.5502413918543512, -0.44235539500246457, 0.7984106594591154, -0.2523277127589152, -0.1373808897290778, -0.3353514432305029))",-2.0397390816430665
-13.772441561702871,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.3697050572653644, -0.11452811582755928, -0.807098168238352, 0.4903066124307711, -0.6582805242342049, 0.6107814398427647, -0.7204208094262783, -0.8141063661170889, -0.9459402662357332, 0.09666938346350307))",2.1211666677165093


## Example 2: Generalized Regression

<table  style="width:100%">

  <thead>
    <tr>
      <th>Family</th>
      <th>Response Type</th>
      <th>Supported Links</th></tr>
  </thead>
  <tbody>
    <tr>
      <td>Gaussian</td>
      <td>Continuous</td>
      <td>Identity*, Log, Inverse</td>
    </tr>
    <tr>
      <td>Binomial</td>
      <td>Binary</td>
      <td>Logit*, Probit, CLogLog</td>
    </tr>
    <tr>
      <td>Poisson</td>
      <td>Count</td>
      <td>Log*, Identity, Sqrt</td>
    </tr>
    <tr>
      <td>Gamma</td>
      <td>Continuous</td>
      <td>Inverse*, Identity, Log</td>
    </tr>
    <tr>
      <td>Tweedie</td>
      <td>Zero-inflated continuous</td>
      <td>Power link function</td>
    </tr>
    </tbody><tfoot><tr><td colspan="4">* Canonical Link</td></tr></tfoot>
  
</table>

For more read here: https://data.princeton.edu/wws509/notes/

In [0]:
from pyspark.ml.regression import GeneralizedLinearRegression

# Load training data
dataset = spark.read.format("libsvm")\
    .load("file:/databricks/driver/sample_linear_regression_data.txt")

glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(dataset)

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

In [0]:
predictions = model.transform(dataset)
#predictionsB = modelB.transform(data)

display(predictions)

label,features,prediction
-9.490009878824548,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.4551273600657362, 0.36644694351969087, -0.38256108933468047, -0.4458430198517267, 0.33109790358914726, 0.8067445293443565, -0.2624341731773887, -0.44850386111659524, -0.07269284838169332, 0.5658035575800715))",1.4843492954223414
0.2577820163584905,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.8386555657374337, -0.1270180511534269, 0.499812362510895, -0.22686625128130267, -0.6452430441812433, 0.18869982177936828, -0.5804648622673358, 0.651931743775642, -0.6555641246242951, 0.17485476357259122))",-0.6294499974835653
-4.438869807456516,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.5025608135349202, 0.14208069682973434, 0.16004976900412138, 0.505019897181302, -0.9371635223468384, -0.2841601610457427, 0.6355938616712786, -0.1646249064941625, 0.9480713629917628, 0.42681251564645817))",0.1576720300223913
-19.782762789614537,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.0388509668871313, -0.4166870051763918, 0.8997202693189332, 0.6409836467726933, 0.273289095712564, -0.26175701211620517, -0.2794902492677298, -0.1306778297187794, -0.08536581111046115, -0.05462315824828923))",0.6289046454051012
-7.966593841555266,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.06195495876886281, 0.6546448480299902, -0.6979368909424835, 0.6677324708883314, -0.07938725467767771, -0.43885601665437957, -0.608071585153688, -0.6414531182501653, 0.7313735926547045, -0.026818676347611925))",2.303825503787376
-7.896274316726144,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.15805658673794265, 0.26573958270655806, 0.3997172901343442, -0.3693430998846541, 0.14324061105995334, -0.25797542063247825, 0.7436291919296774, 0.6114618853239959, 0.2324273700703574, -0.25128128782199144))",-1.8806684210461533
-8.464803554195287,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.39449745853945895, 0.817229160415142, -0.6077058562362969, 0.6182496334554788, 0.2558665508269453, -0.07320145794330979, -0.38884168866510227, 0.07981886851873865, 0.27022202891277614, -0.7474843534024693))",2.19913586165398
2.1214592666251364,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.005346215048158909, -0.9453716674280683, -0.9270309666195007, -0.032312290091389695, 0.31010676221964206, -0.20846743965751569, 0.8803449313707621, -0.23077831216541722, 0.29246395759528565, 0.5409312755478819))",0.0060632141226649
1.0720117616524107,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(0.7880855916368177, 0.19767407429003536, 0.9520689432368168, -0.845829774129496, 0.5502413918543512, -0.44235539500246457, 0.7984106594591154, -0.2523277127589152, -0.1373808897290778, -0.3353514432305029))",-2.9087014762613568
-13.772441561702871,"Map(vectorType -> sparse, length -> 10, indices -> List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), values -> List(-0.3697050572653644, -0.11452811582755928, -0.807098168238352, 0.4903066124307711, -0.6582805242342049, 0.6107814398427647, -0.7204208094262783, -0.8141063661170889, -0.9459402662357332, 0.09666938346350307))",3.452776710366661


## Example 3: Decision Tree Regression

In [0]:
%sh curl -O 'https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt'
# saves file to file:/databricks/driver/sample_libsvm_data.txt

In [0]:
#path
path = 'file:/databricks/driver/sample_linear_regression_data.txt'

data = spark.read.format('libsvm')\
    .load( 'file:/databricks/driver/sample_libsvm_data.txt')
display(data)

label,features
0.0,"Map(vectorType -> sparse, length -> 692, indices -> List(127, 128, 129, 130, 131, 154, 155, 156, 157, 158, 159, 181, 182, 183, 184, 185, 186, 187, 188, 189, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 289, 290, 291, 292, 293, 294, 295, 296, 297, 300, 301, 302, 316, 317, 318, 319, 320, 321, 328, 329, 330, 343, 344, 345, 346, 347, 348, 349, 356, 357, 358, 371, 372, 373, 374, 384, 385, 386, 399, 400, 401, 412, 413, 414, 426, 427, 428, 429, 440, 441, 442, 454, 455, 456, 457, 466, 467, 468, 469, 470, 482, 483, 484, 493, 494, 495, 496, 497, 510, 511, 512, 520, 521, 522, 523, 538, 539, 540, 547, 548, 549, 550, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603, 604, 622, 623, 624, 625, 626, 627, 628, 629, 630, 651, 652, 653, 654, 655, 656, 657), values -> List(51.0, 159.0, 253.0, 159.0, 50.0, 48.0, 238.0, 252.0, 252.0, 252.0, 237.0, 54.0, 227.0, 253.0, 252.0, 239.0, 233.0, 252.0, 57.0, 6.0, 10.0, 60.0, 224.0, 252.0, 253.0, 252.0, 202.0, 84.0, 252.0, 253.0, 122.0, 163.0, 252.0, 252.0, 252.0, 253.0, 252.0, 252.0, 96.0, 189.0, 253.0, 167.0, 51.0, 238.0, 253.0, 253.0, 190.0, 114.0, 253.0, 228.0, 47.0, 79.0, 255.0, 168.0, 48.0, 238.0, 252.0, 252.0, 179.0, 12.0, 75.0, 121.0, 21.0, 253.0, 243.0, 50.0, 38.0, 165.0, 253.0, 233.0, 208.0, 84.0, 253.0, 252.0, 165.0, 7.0, 178.0, 252.0, 240.0, 71.0, 19.0, 28.0, 253.0, 252.0, 195.0, 57.0, 252.0, 252.0, 63.0, 253.0, 252.0, 195.0, 198.0, 253.0, 190.0, 255.0, 253.0, 196.0, 76.0, 246.0, 252.0, 112.0, 253.0, 252.0, 148.0, 85.0, 252.0, 230.0, 25.0, 7.0, 135.0, 253.0, 186.0, 12.0, 85.0, 252.0, 223.0, 7.0, 131.0, 252.0, 225.0, 71.0, 85.0, 252.0, 145.0, 48.0, 165.0, 252.0, 173.0, 86.0, 253.0, 225.0, 114.0, 238.0, 253.0, 162.0, 85.0, 252.0, 249.0, 146.0, 48.0, 29.0, 85.0, 178.0, 225.0, 253.0, 223.0, 167.0, 56.0, 85.0, 252.0, 252.0, 252.0, 229.0, 215.0, 252.0, 252.0, 252.0, 196.0, 130.0, 28.0, 199.0, 252.0, 252.0, 253.0, 252.0, 252.0, 233.0, 145.0, 25.0, 128.0, 252.0, 253.0, 252.0, 141.0, 37.0))"
1.0,"Map(vectorType -> sparse, length -> 692, indices -> List(158, 159, 160, 161, 185, 186, 187, 188, 189, 213, 214, 215, 216, 217, 240, 241, 242, 243, 244, 245, 267, 268, 269, 270, 271, 295, 296, 297, 298, 322, 323, 324, 325, 326, 349, 350, 351, 352, 353, 377, 378, 379, 380, 381, 404, 405, 406, 407, 408, 431, 432, 433, 434, 435, 459, 460, 461, 462, 463, 486, 487, 488, 489, 490, 514, 515, 516, 517, 518, 542, 543, 544, 545, 569, 570, 571, 572, 573, 596, 597, 598, 599, 600, 601, 624, 625, 626, 627, 652, 653, 654, 655, 680, 681, 682, 683), values -> List(124.0, 253.0, 255.0, 63.0, 96.0, 244.0, 251.0, 253.0, 62.0, 127.0, 251.0, 251.0, 253.0, 62.0, 68.0, 236.0, 251.0, 211.0, 31.0, 8.0, 60.0, 228.0, 251.0, 251.0, 94.0, 155.0, 253.0, 253.0, 189.0, 20.0, 253.0, 251.0, 235.0, 66.0, 32.0, 205.0, 253.0, 251.0, 126.0, 104.0, 251.0, 253.0, 184.0, 15.0, 80.0, 240.0, 251.0, 193.0, 23.0, 32.0, 253.0, 253.0, 253.0, 159.0, 151.0, 251.0, 251.0, 251.0, 39.0, 48.0, 221.0, 251.0, 251.0, 172.0, 234.0, 251.0, 251.0, 196.0, 12.0, 253.0, 251.0, 251.0, 89.0, 159.0, 255.0, 253.0, 253.0, 31.0, 48.0, 228.0, 253.0, 247.0, 140.0, 8.0, 64.0, 251.0, 253.0, 220.0, 64.0, 251.0, 253.0, 220.0, 24.0, 193.0, 253.0, 220.0))"
1.0,"Map(vectorType -> sparse, length -> 692, indices -> List(124, 125, 126, 127, 151, 152, 153, 154, 155, 179, 180, 181, 182, 183, 208, 209, 210, 211, 235, 236, 237, 238, 239, 263, 264, 265, 266, 267, 268, 292, 293, 294, 295, 296, 321, 322, 323, 324, 349, 350, 351, 352, 377, 378, 379, 380, 405, 406, 407, 408, 433, 434, 435, 436, 461, 462, 463, 464, 489, 490, 491, 492, 493, 517, 518, 519, 520, 521, 545, 546, 547, 548, 549, 574, 575, 576, 577, 578, 602, 603, 604, 605, 606, 630, 631, 632, 633, 634, 658, 659, 660, 661, 662), values -> List(145.0, 255.0, 211.0, 31.0, 32.0, 237.0, 253.0, 252.0, 71.0, 11.0, 175.0, 253.0, 252.0, 71.0, 144.0, 253.0, 252.0, 71.0, 16.0, 191.0, 253.0, 252.0, 71.0, 26.0, 221.0, 253.0, 252.0, 124.0, 31.0, 125.0, 253.0, 252.0, 252.0, 108.0, 253.0, 252.0, 252.0, 108.0, 255.0, 253.0, 253.0, 108.0, 253.0, 252.0, 252.0, 108.0, 253.0, 252.0, 252.0, 108.0, 253.0, 252.0, 252.0, 108.0, 255.0, 253.0, 253.0, 170.0, 253.0, 252.0, 252.0, 252.0, 42.0, 149.0, 252.0, 252.0, 252.0, 144.0, 109.0, 252.0, 252.0, 252.0, 144.0, 218.0, 253.0, 253.0, 255.0, 35.0, 175.0, 252.0, 252.0, 253.0, 35.0, 73.0, 252.0, 252.0, 253.0, 35.0, 31.0, 211.0, 252.0, 253.0, 35.0))"
1.0,"Map(vectorType -> sparse, length -> 692, indices -> List(152, 153, 154, 180, 181, 182, 183, 208, 209, 210, 211, 236, 237, 238, 239, 264, 265, 266, 267, 292, 293, 294, 295, 320, 321, 322, 323, 349, 350, 351, 377, 378, 379, 405, 406, 407, 433, 434, 435, 461, 462, 463, 489, 490, 491, 492, 517, 518, 519, 520, 546, 547, 548, 574, 575, 576, 602, 603, 604, 630, 631, 632, 658, 659, 660, 686, 687, 688), values -> List(5.0, 63.0, 197.0, 20.0, 254.0, 230.0, 24.0, 20.0, 254.0, 254.0, 48.0, 20.0, 254.0, 255.0, 48.0, 20.0, 254.0, 254.0, 57.0, 20.0, 254.0, 254.0, 108.0, 16.0, 239.0, 254.0, 143.0, 178.0, 254.0, 143.0, 178.0, 254.0, 143.0, 178.0, 254.0, 162.0, 178.0, 254.0, 240.0, 113.0, 254.0, 240.0, 83.0, 254.0, 245.0, 31.0, 79.0, 254.0, 246.0, 38.0, 214.0, 254.0, 150.0, 144.0, 241.0, 8.0, 144.0, 240.0, 2.0, 144.0, 254.0, 82.0, 230.0, 247.0, 40.0, 168.0, 209.0, 31.0))"
1.0,"Map(vectorType -> sparse, length -> 692, indices -> List(151, 152, 153, 154, 179, 180, 181, 182, 208, 209, 210, 236, 237, 238, 264, 265, 266, 267, 292, 293, 294, 295, 320, 321, 322, 323, 348, 349, 350, 351, 376, 377, 378, 379, 404, 405, 406, 407, 432, 433, 434, 435, 460, 461, 462, 463, 488, 489, 490, 491, 516, 517, 518, 519, 544, 545, 546, 547, 572, 573, 574, 575, 600, 601, 602, 603, 629, 630, 631, 657, 658, 659, 685, 686, 687), values -> List(1.0, 168.0, 242.0, 28.0, 10.0, 228.0, 254.0, 100.0, 190.0, 254.0, 122.0, 83.0, 254.0, 162.0, 29.0, 254.0, 248.0, 25.0, 29.0, 255.0, 254.0, 103.0, 29.0, 254.0, 254.0, 109.0, 29.0, 254.0, 254.0, 109.0, 29.0, 254.0, 254.0, 109.0, 29.0, 255.0, 254.0, 109.0, 29.0, 254.0, 254.0, 109.0, 29.0, 254.0, 254.0, 63.0, 29.0, 254.0, 254.0, 28.0, 29.0, 254.0, 254.0, 28.0, 29.0, 254.0, 254.0, 35.0, 29.0, 254.0, 254.0, 109.0, 6.0, 212.0, 254.0, 109.0, 203.0, 254.0, 178.0, 155.0, 254.0, 190.0, 32.0, 199.0, 104.0))"
0.0,"Map(vectorType -> sparse, length -> 692, indices -> List(129, 130, 131, 132, 156, 157, 158, 159, 160, 161, 162, 183, 184, 185, 186, 187, 188, 189, 190, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 316, 317, 318, 319, 320, 322, 323, 324, 325, 327, 328, 329, 330, 343, 344, 345, 346, 347, 348, 350, 351, 352, 353, 355, 356, 357, 358, 371, 372, 373, 374, 378, 379, 384, 385, 386, 398, 399, 400, 412, 413, 414, 425, 426, 427, 428, 439, 440, 441, 442, 453, 454, 455, 456, 467, 468, 469, 470, 481, 482, 483, 484, 494, 495, 496, 497, 498, 509, 510, 511, 512, 521, 522, 523, 524, 525, 537, 538, 539, 540, 547, 548, 549, 550, 551, 552, 565, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603, 604, 605, 623, 624, 625, 626, 627, 628, 629, 630, 631, 632, 653, 654, 655, 656, 657, 658), values -> List(64.0, 253.0, 255.0, 63.0, 96.0, 205.0, 251.0, 253.0, 205.0, 111.0, 4.0, 96.0, 189.0, 251.0, 251.0, 253.0, 251.0, 251.0, 31.0, 16.0, 64.0, 223.0, 244.0, 251.0, 251.0, 211.0, 213.0, 251.0, 251.0, 31.0, 80.0, 181.0, 251.0, 253.0, 251.0, 251.0, 251.0, 94.0, 96.0, 251.0, 251.0, 31.0, 92.0, 253.0, 253.0, 253.0, 255.0, 253.0, 253.0, 253.0, 95.0, 96.0, 253.0, 253.0, 31.0, 92.0, 236.0, 251.0, 243.0, 220.0, 233.0, 251.0, 251.0, 243.0, 82.0, 96.0, 251.0, 251.0, 31.0, 80.0, 253.0, 251.0, 251.0, 188.0, 96.0, 251.0, 251.0, 109.0, 96.0, 251.0, 251.0, 31.0, 96.0, 240.0, 253.0, 243.0, 188.0, 42.0, 96.0, 204.0, 109.0, 4.0, 12.0, 197.0, 251.0, 31.0, 221.0, 251.0, 253.0, 121.0, 36.0, 23.0, 190.0, 251.0, 31.0, 48.0, 234.0, 253.0, 191.0, 253.0, 31.0, 44.0, 221.0, 251.0, 251.0, 12.0, 197.0, 251.0, 31.0, 190.0, 251.0, 251.0, 251.0, 96.0, 251.0, 251.0, 31.0, 190.0, 251.0, 251.0, 113.0, 40.0, 234.0, 251.0, 219.0, 23.0, 190.0, 251.0, 251.0, 94.0, 40.0, 217.0, 253.0, 231.0, 47.0, 191.0, 253.0, 253.0, 253.0, 12.0, 174.0, 253.0, 253.0, 219.0, 39.0, 67.0, 236.0, 251.0, 251.0, 191.0, 190.0, 111.0, 72.0, 190.0, 191.0, 197.0, 251.0, 243.0, 121.0, 39.0, 63.0, 236.0, 251.0, 253.0, 251.0, 251.0, 251.0, 251.0, 253.0, 251.0, 188.0, 94.0, 27.0, 129.0, 253.0, 251.0, 251.0, 251.0, 251.0, 229.0, 168.0, 15.0, 95.0, 212.0, 251.0, 211.0, 94.0, 59.0))"
1.0,"Map(vectorType -> sparse, length -> 692, indices -> List(158, 159, 160, 185, 186, 187, 188, 189, 212, 213, 214, 215, 216, 217, 240, 241, 242, 243, 244, 267, 268, 269, 270, 271, 272, 295, 296, 297, 298, 299, 323, 324, 325, 326, 350, 351, 352, 353, 354, 377, 378, 379, 380, 381, 404, 405, 406, 407, 408, 432, 433, 434, 435, 436, 459, 460, 461, 462, 463, 486, 487, 488, 489, 490, 513, 514, 515, 516, 517, 541, 542, 543, 544, 545, 569, 570, 571, 572, 573, 597, 598, 599, 600, 624, 625, 626, 627, 628, 652, 653, 654, 655, 681, 682, 683), values -> List(121.0, 254.0, 136.0, 13.0, 230.0, 253.0, 248.0, 99.0, 4.0, 118.0, 253.0, 253.0, 225.0, 42.0, 61.0, 253.0, 253.0, 253.0, 74.0, 32.0, 206.0, 253.0, 253.0, 186.0, 9.0, 211.0, 253.0, 253.0, 239.0, 69.0, 254.0, 253.0, 253.0, 133.0, 142.0, 255.0, 253.0, 186.0, 8.0, 149.0, 229.0, 254.0, 207.0, 21.0, 54.0, 229.0, 253.0, 254.0, 105.0, 152.0, 254.0, 254.0, 213.0, 26.0, 112.0, 251.0, 253.0, 253.0, 26.0, 29.0, 212.0, 253.0, 250.0, 149.0, 36.0, 214.0, 253.0, 253.0, 137.0, 75.0, 253.0, 253.0, 253.0, 59.0, 93.0, 253.0, 253.0, 189.0, 17.0, 224.0, 253.0, 253.0, 84.0, 43.0, 235.0, 253.0, 126.0, 1.0, 99.0, 248.0, 253.0, 119.0, 225.0, 235.0, 49.0))"
1.0,"Map(vectorType -> sparse, length -> 692, indices -> List(99, 100, 101, 127, 128, 129, 130, 154, 155, 156, 157, 158, 182, 183, 184, 185, 186, 209, 210, 211, 212, 213, 237, 238, 239, 240, 241, 264, 265, 266, 267, 268, 269, 291, 292, 293, 294, 295, 296, 297, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 371, 372, 373, 374, 378, 379, 380, 381, 406, 407, 408, 409, 435, 436, 437, 463, 464, 465, 491, 492, 493, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 594, 595, 596, 597, 598, 599, 600, 622, 623, 624, 625), values -> List(166.0, 222.0, 55.0, 197.0, 254.0, 218.0, 5.0, 29.0, 249.0, 254.0, 254.0, 9.0, 45.0, 254.0, 254.0, 174.0, 2.0, 4.0, 164.0, 254.0, 254.0, 85.0, 146.0, 254.0, 254.0, 254.0, 85.0, 101.0, 245.0, 254.0, 254.0, 254.0, 85.0, 97.0, 248.0, 254.0, 204.0, 254.0, 254.0, 85.0, 12.0, 59.0, 98.0, 151.0, 237.0, 254.0, 254.0, 109.0, 35.0, 254.0, 254.0, 85.0, 41.0, 216.0, 254.0, 254.0, 239.0, 153.0, 37.0, 4.0, 32.0, 254.0, 254.0, 85.0, 7.0, 44.0, 44.0, 30.0, 32.0, 254.0, 254.0, 96.0, 19.0, 230.0, 254.0, 174.0, 197.0, 254.0, 110.0, 197.0, 254.0, 85.0, 197.0, 253.0, 63.0, 37.0, 54.0, 54.0, 45.0, 26.0, 84.0, 221.0, 84.0, 21.0, 31.0, 162.0, 78.0, 6.0, 41.0, 141.0, 244.0, 254.0, 254.0, 248.0, 236.0, 254.0, 254.0, 254.0, 233.0, 239.0, 254.0, 138.0, 23.0, 167.0, 254.0, 254.0, 254.0, 254.0, 229.0, 228.0, 185.0, 138.0, 138.0, 138.0, 138.0, 138.0, 138.0, 44.0, 113.0, 254.0, 254.0, 254.0, 179.0, 64.0, 5.0, 32.0, 209.0, 183.0, 97.0))"
0.0,"Map(vectorType -> sparse, length -> 692, indices -> List(154, 155, 156, 157, 158, 159, 182, 183, 184, 185, 186, 187, 188, 189, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 290, 291, 292, 293, 294, 295, 298, 299, 300, 301, 318, 319, 320, 321, 322, 326, 327, 328, 329, 346, 347, 348, 349, 350, 353, 354, 355, 356, 357, 374, 375, 376, 377, 378, 381, 382, 383, 384, 385, 402, 403, 404, 405, 406, 409, 410, 411, 412, 413, 429, 430, 431, 432, 437, 438, 439, 440, 456, 457, 458, 459, 460, 464, 465, 466, 467, 468, 484, 485, 486, 487, 488, 491, 492, 493, 494, 495, 512, 513, 514, 515, 516, 519, 520, 521, 522, 523, 540, 541, 542, 543, 544, 546, 547, 548, 549, 550, 551, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 596, 597, 598, 599, 600, 601, 602, 603, 604, 605, 624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 653, 654, 655, 656, 657, 658, 682, 683, 684, 685, 686), values -> List(53.0, 255.0, 253.0, 253.0, 253.0, 124.0, 180.0, 253.0, 251.0, 251.0, 251.0, 251.0, 145.0, 62.0, 32.0, 217.0, 241.0, 253.0, 251.0, 251.0, 251.0, 251.0, 253.0, 107.0, 37.0, 251.0, 251.0, 253.0, 251.0, 251.0, 251.0, 251.0, 253.0, 107.0, 166.0, 251.0, 251.0, 253.0, 251.0, 96.0, 148.0, 251.0, 253.0, 107.0, 73.0, 253.0, 253.0, 253.0, 253.0, 130.0, 110.0, 253.0, 255.0, 108.0, 73.0, 251.0, 251.0, 251.0, 251.0, 109.0, 251.0, 253.0, 107.0, 202.0, 251.0, 251.0, 251.0, 225.0, 6.0, 129.0, 251.0, 253.0, 107.0, 150.0, 251.0, 251.0, 251.0, 71.0, 115.0, 251.0, 251.0, 253.0, 107.0, 253.0, 251.0, 251.0, 173.0, 20.0, 217.0, 251.0, 251.0, 253.0, 107.0, 182.0, 255.0, 253.0, 216.0, 218.0, 253.0, 253.0, 182.0, 63.0, 221.0, 253.0, 251.0, 215.0, 84.0, 236.0, 251.0, 251.0, 77.0, 109.0, 251.0, 253.0, 251.0, 215.0, 11.0, 160.0, 251.0, 251.0, 96.0, 109.0, 251.0, 253.0, 251.0, 137.0, 150.0, 251.0, 251.0, 251.0, 71.0, 109.0, 251.0, 253.0, 251.0, 35.0, 130.0, 253.0, 251.0, 251.0, 173.0, 20.0, 110.0, 253.0, 255.0, 253.0, 98.0, 150.0, 253.0, 255.0, 253.0, 164.0, 109.0, 251.0, 253.0, 251.0, 251.0, 251.0, 251.0, 253.0, 251.0, 35.0, 93.0, 241.0, 253.0, 251.0, 251.0, 251.0, 251.0, 216.0, 112.0, 5.0, 103.0, 253.0, 251.0, 251.0, 251.0, 251.0, 124.0, 251.0, 225.0, 71.0, 71.0))"
0.0,"Map(vectorType -> sparse, length -> 692, indices -> List(127, 128, 129, 130, 131, 155, 156, 157, 158, 159, 181, 182, 183, 184, 185, 186, 187, 209, 210, 211, 212, 213, 214, 215, 216, 237, 238, 239, 240, 241, 242, 243, 244, 245, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 344, 345, 346, 347, 348, 349, 353, 354, 355, 356, 357, 358, 372, 373, 374, 375, 376, 377, 381, 382, 383, 384, 385, 386, 399, 400, 401, 402, 403, 404, 409, 410, 411, 412, 413, 414, 427, 428, 429, 430, 431, 437, 438, 439, 440, 441, 455, 456, 457, 458, 459, 460, 465, 466, 467, 468, 483, 484, 485, 486, 487, 488, 491, 492, 493, 494, 495, 496, 511, 512, 513, 514, 515, 519, 520, 521, 522, 523, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 595, 596, 597, 598, 599, 600, 601, 602, 603, 604, 605, 623, 624, 625, 626, 627, 628, 629, 630, 631, 652, 653, 654, 655, 656, 657, 658), values -> List(73.0, 253.0, 227.0, 73.0, 21.0, 73.0, 251.0, 251.0, 251.0, 174.0, 16.0, 166.0, 228.0, 251.0, 251.0, 251.0, 122.0, 62.0, 220.0, 253.0, 251.0, 251.0, 251.0, 251.0, 79.0, 79.0, 231.0, 253.0, 251.0, 251.0, 251.0, 251.0, 232.0, 77.0, 145.0, 253.0, 253.0, 253.0, 255.0, 253.0, 253.0, 253.0, 253.0, 255.0, 108.0, 144.0, 251.0, 251.0, 251.0, 253.0, 168.0, 107.0, 169.0, 251.0, 253.0, 189.0, 20.0, 27.0, 89.0, 236.0, 251.0, 235.0, 215.0, 164.0, 15.0, 6.0, 129.0, 251.0, 253.0, 251.0, 35.0, 47.0, 211.0, 253.0, 251.0, 251.0, 142.0, 37.0, 251.0, 251.0, 253.0, 251.0, 35.0, 109.0, 251.0, 253.0, 251.0, 251.0, 142.0, 11.0, 148.0, 251.0, 253.0, 251.0, 164.0, 11.0, 150.0, 253.0, 255.0, 211.0, 25.0, 11.0, 150.0, 253.0, 255.0, 211.0, 25.0, 140.0, 251.0, 251.0, 253.0, 107.0, 37.0, 251.0, 251.0, 211.0, 46.0, 190.0, 251.0, 251.0, 253.0, 128.0, 5.0, 37.0, 251.0, 251.0, 51.0, 115.0, 251.0, 251.0, 253.0, 188.0, 20.0, 32.0, 109.0, 129.0, 251.0, 173.0, 103.0, 217.0, 251.0, 251.0, 201.0, 30.0, 73.0, 251.0, 251.0, 251.0, 71.0, 166.0, 253.0, 253.0, 255.0, 149.0, 73.0, 150.0, 253.0, 255.0, 253.0, 253.0, 143.0, 140.0, 251.0, 251.0, 253.0, 251.0, 251.0, 251.0, 251.0, 253.0, 251.0, 230.0, 61.0, 190.0, 251.0, 251.0, 253.0, 251.0, 251.0, 251.0, 251.0, 242.0, 215.0, 55.0, 21.0, 189.0, 251.0, 253.0, 251.0, 251.0, 251.0, 173.0, 103.0, 31.0, 200.0, 253.0, 251.0, 96.0, 71.0, 20.0))"


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:/databricks/driver/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(15)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)

# Classification

## Example 4: Decision Tree Classifier

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:/databricks/driver/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(15)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy = %g " % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)

## Example 5: Random Forest Classifier

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer


# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("file:/databricks/driver/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(15)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy = %g " % (accuracy))
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

## Example 6: Linear Support Vector Machine Classifier

In [0]:
from pyspark.ml.classification import LinearSVC


# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("file:/databricks/driver/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])


lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, lsvc, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(15)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy = %g " % (accuracy))
print("Test Error = %g" % (1.0 - accuracy))



lsvcModel = model.stages[2]
print(lsvcModel)  # summary only


# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))


## Example 7: Multilayer Perceptron Classifier

In [0]:
%sh curl -O 'https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_multiclass_classification_data.txt'
# saves file to file:/databricks/driver/sample_multiclass_classification_data.txt

In [0]:
#path
path = 'file:/databricks/driver/sample_linear_regression_data.txt'

data = spark.read.format('libsvm')\
    .load( 'file:/databricks/driver/sample_multiclass_classification_data.txt')
display(data)

label,features
1.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 1, 2, 3), values -> List(-0.222222, 0.5, -0.762712, -0.833333))"
1.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 1, 2, 3), values -> List(-0.555556, 0.25, -0.864407, -0.916667))"
1.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 1, 2, 3), values -> List(-0.722222, -0.166667, -0.864407, -0.833333))"
1.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 1, 2, 3), values -> List(-0.722222, 0.166667, -0.694915, -0.916667))"
0.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 1, 2, 3), values -> List(0.166667, -0.416667, 0.457627, 0.5))"
1.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 2, 3), values -> List(-0.833333, -0.864407, -0.916667))"
2.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 1, 2, 3), values -> List(-1.32455E-7, -0.166667, 0.220339, 0.0833333))"
2.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 1, 2, 3), values -> List(-1.32455E-7, -0.333333, 0.0169491, -4.03573E-8))"
1.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 1, 2, 3), values -> List(-0.5, 0.75, -0.830508, -1.0))"
0.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0, 2, 3), values -> List(0.611111, 0.694915, 0.416667))"


In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier


# Load training data
data = spark.read.format("libsvm")\
    .load("file:/databricks/driver/sample_multiclass_classification_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.8, 0.2], 1234)
train = splits[0]
test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 6, 5, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=200, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
result = model.transform(test)
# Select example rows to display.
result.select("prediction", "label", "features").show(15)


predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

# Clustering and Topic Modeling

## Example 8: K-means Clustering

In [0]:
%sh curl -O 'https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_kmeans_data.txt'
# saves file to file:/databricks/driver/sample_kmeans_data.txt

In [0]:
#path
path = 'file:/databricks/driver/sample_kmeans_data.txt'

data = spark.read.format('libsvm')\
    .load( 'file:/databricks/driver/sample_kmeans_data.txt')
display(data)

label,features
0.0,"Map(vectorType -> sparse, length -> 3, indices -> List(), values -> List())"
1.0,"Map(vectorType -> sparse, length -> 3, indices -> List(0, 1, 2), values -> List(0.1, 0.1, 0.1))"
2.0,"Map(vectorType -> sparse, length -> 3, indices -> List(0, 1, 2), values -> List(0.2, 0.2, 0.2))"
3.0,"Map(vectorType -> sparse, length -> 3, indices -> List(0, 1, 2), values -> List(9.0, 9.0, 9.0))"
4.0,"Map(vectorType -> sparse, length -> 3, indices -> List(0, 1, 2), values -> List(9.1, 9.1, 9.1))"
5.0,"Map(vectorType -> sparse, length -> 3, indices -> List(0, 1, 2), values -> List(9.2, 9.2, 9.2))"


In [0]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("file:/databricks/driver/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

## Example 9: Latent Dirichlet allocation (LDA)

In [0]:
%sh curl -O 'https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_lda_libsvm_data.txt'
# saves file to file:/databricks/driver/sample_lda_libsvm_data.txt


In [0]:
#path
path = 'file:/databricks/driver/sample_lda_libsvm_data.txt'

data = spark.read.format('libsvm')\
    .load( 'file:/databricks/driver/sample_lda_libsvm_data.txt')
display(data)

label,features
0.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 2, 4, 5, 6, 7, 10), values -> List(1.0, 2.0, 6.0, 2.0, 3.0, 1.0, 1.0, 3.0))"
1.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 3, 4, 7, 10), values -> List(1.0, 3.0, 1.0, 3.0, 2.0, 1.0))"
2.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 2, 5, 6, 8, 9), values -> List(1.0, 4.0, 1.0, 4.0, 9.0, 1.0, 2.0))"
3.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 3, 6, 8, 9, 10), values -> List(2.0, 1.0, 3.0, 5.0, 2.0, 3.0, 9.0))"
4.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 2, 3, 4, 6, 9, 10), values -> List(3.0, 1.0, 1.0, 9.0, 3.0, 2.0, 1.0, 3.0))"
5.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 3, 4, 5, 6, 7, 8, 9), values -> List(4.0, 2.0, 3.0, 4.0, 5.0, 1.0, 1.0, 1.0, 4.0))"
6.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 3, 6, 8, 9, 10), values -> List(2.0, 1.0, 3.0, 5.0, 2.0, 2.0, 9.0))"
7.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 2, 3, 4, 5, 6, 9, 10), values -> List(1.0, 1.0, 1.0, 9.0, 2.0, 1.0, 2.0, 1.0, 3.0))"
8.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 3, 4, 5, 6, 7), values -> List(4.0, 4.0, 3.0, 4.0, 2.0, 1.0, 3.0))"
9.0,"Map(vectorType -> sparse, length -> 11, indices -> List(0, 1, 2, 4, 6, 8, 9, 10), values -> List(2.0, 8.0, 2.0, 3.0, 2.0, 2.0, 7.0, 2.0))"


In [0]:
from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("file:/databricks/driver/sample_lda_libsvm_data.txt")

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)

# Model Selection (a.k.a. Hyperparameter Tuning)

## Example 10: Model Selection via Train-Validation Split

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Prepare training and test data.
data = spark.read.format("libsvm")\
    .load("file:/databricks/driver/sample_multiclass_classification_data.txt")

train, test = data.randomSplit([0.9, 0.1], seed=12345)

mlp = MultilayerPerceptronClassifier(maxIter=100)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.

layers = [[4, 6, 5, 3],
          [4, 4, 4, 3],
          [4, 9, 6, 3],
          [4, 10, 6, 3]]


paramGrid = ParamGridBuilder()\
    .addGrid(mlp.layers, layers) \
    .addGrid(mlp.blockSize, [64, 128, 256])\
    .build()


# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=mlp,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
# compute accuracy on the test set

result = model.transform(test)
# Select example rows to display.
result.select("prediction", "label", "features").show(15)

predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))


## Example 11: Model Selection via Cross-Validation

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Prepare training and test data.
data = spark.read.format("libsvm")\
    .load("file:/databricks/driver/sample_multiclass_classification_data.txt")

train, test = data.randomSplit([0.9, 0.1], seed=12345)

mlp = MultilayerPerceptronClassifier(maxIter=100)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# CrossValidator will try all combinations of values and determine best model using
# the evaluator.

layers = [[4, 6, 5, 3],
          [4, 4, 4, 3],
          [4, 9, 6, 3],
          [4, 10, 6, 3]]


paramGrid = ParamGridBuilder()\
    .addGrid(mlp.layers, layers) \
    .addGrid(mlp.blockSize, [64, 128, 256])\
    .build()

# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
crossval  = CrossValidator(estimator=mlp,
                     estimatorParamMaps=paramGrid,
                     evaluator=MulticlassClassificationEvaluator(),
                     numFolds=5)

# Run cross-validation, and choose the best set of parameters.
model = crossval.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
# compute accuracy on the test set

result = model.transform(test)
# Select example rows to display.
result.select("prediction", "label", "features").show(15)

predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))


# Example 12: Putting It All Together - San Francisco Crime Classification

**Data Description**

This dataset contains incidents derived from SFPD Crime Incident Reporting system. The data ranges from 1/1/2003 to 5/13/2015. The training set and test set rotate every week, meaning week 1,3,5,7... belong to test set, week 2,4,6,8 belong to training set. 

**Data fields**

 * Dates - timestamp of the crime incident
 *    Category - category of the crime incident (only in train.csv). This is the target variable you are going to predict.
 *   Descript - detailed description of the crime incident (only in train.csv)
 *  DayOfWeek - the day of the week
 * PdDistrict - name of the Police Department District
 * Resolution - how the crime incident was resolved (only in train.csv)
 * Address - the approximate street address of the crime incident 
 * X - Longitude
 * Y - Latitude

_ref:https://www.kaggle.com/c/sf-crime/data_

In [0]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import col

data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/FileStore/tables/train.csv')

drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']

data = data.select([column for column in data.columns if column not in drop_list])
data.show(15,  truncate=False)

In [0]:
data.groupBy("Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(truncate=False)

In [0]:
data.groupBy("Descript") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(truncate=False)

In [0]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression# regular expression tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")# stop words

add_stopwords = ["http","https","amp","rt","t","c","the"] 

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)# bag of words count

countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)


label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])# Fit the pipeline to training documents.

pipelineFit = pipeline.fit(data)

dataset = pipelineFit.transform(data)

dataset.show(15)

In [0]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)

print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

#model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)


predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)


evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

In [0]:
#cross validation
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.3, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.1, 0.2]) \
    .build()

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")


crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # use 3+ folds in practice


cvModel = crossval.fit(trainingData)

predictions = cvModel.transform(testData)


evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)