In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark

import os
import functools as reduce
from pyspark.context import SparkContext
from pyspark.sql import DataFrame, SQLContext, SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

conf = pyspark.SparkConf().setAppName('MLLib-Machine Learning Model').setMaster('local')
sc = pyspark.SparkContext(conf = conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

In [2]:
spark

In [6]:
# Importing the dataset and libraries
from sklearn.datasets import load_boston
import pandas as pd
boston = load_boston()

df = pd.DataFrame(boston.data, columns = boston.feature_names)
df['price'] = boston.target

In [7]:
sdf = spark.createDataFrame(df)
sdf.show(5, truncate=False)

+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+
|CRIM   |ZN  |INDUS|CHAS|NOX  |RM   |AGE |DIS   |RAD|TAX  |PTRATIO|B     |LSTAT|price|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+
|0.00632|18.0|2.31 |0.0 |0.538|6.575|65.2|4.09  |1.0|296.0|15.3   |396.9 |4.98 |24.0 |
|0.02731|0.0 |7.07 |0.0 |0.469|6.421|78.9|4.9671|2.0|242.0|17.8   |396.9 |9.14 |21.6 |
|0.02729|0.0 |7.07 |0.0 |0.469|7.185|61.1|4.9671|2.0|242.0|17.8   |392.83|4.03 |34.7 |
|0.03237|0.0 |2.18 |0.0 |0.458|6.998|45.8|6.0622|3.0|222.0|18.7   |394.63|2.94 |33.4 |
|0.06905|0.0 |2.18 |0.0 |0.458|7.147|54.2|6.0622|3.0|222.0|18.7   |396.9 |5.33 |36.2 |
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+
only showing top 5 rows



### Linear Regression with MLLib

In [8]:
from pyspark.ml.regression import LinearRegression

In [11]:
from pyspark.ml.feature import VectorAssembler

In [12]:
assembler = VectorAssembler(inputCols=[x for x in sdf.columns if x != "price"],
                           outputCol="features")
dataset = assembler.transform(sdf)

In [13]:
dataset.show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+--------------------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|price|            features|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+--------------------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0|[0.00632,18.0,2.3...|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|[0.02731,0.0,7.07...|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|[0.02729,0.0,7.07...|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|[0.03237,0.0,2.18...|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2|[0.06905,0.0,2.18...|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21| 28.7|[0.02985,0.0,2.18...|
|0.08829|12.5| 7.87

In [14]:
lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(dataset)

In [15]:
summary = model.evaluate(dataset)
print(summary.r2)

0.7406426641094093


`Note:` One thing we need to note here is that different algorithms may have different results

In [16]:
output = model.transform(dataset)
output.select("prediction").show()

+------------------+
|        prediction|
+------------------+
|30.003843377018672|
|25.025562379052605|
|30.567596718599706|
| 28.60703648872773|
|27.943524232871702|
| 25.25628446154206|
|23.001808268484787|
|19.535988428753562|
|11.523636853127133|
|  18.9202621070742|
| 18.99949651112896|
|21.586795681398485|
| 20.90652152783407|
| 19.55290281058366|
|19.283482050092314|
|19.297483208230954|
| 20.52750979116518|
|16.911401346799252|
|16.178011056575237|
|18.406136033335542|
+------------------+
only showing top 20 rows



In [17]:
output.select("price").show()

+-----+
|price|
+-----+
| 24.0|
| 21.6|
| 34.7|
| 33.4|
| 36.2|
| 28.7|
| 22.9|
| 27.1|
| 16.5|
| 18.9|
| 15.0|
| 18.9|
| 21.7|
| 20.4|
| 18.2|
| 19.9|
| 23.1|
| 17.5|
| 20.2|
| 18.2|
+-----+
only showing top 20 rows



In [18]:
# Evaluating in PySpark Regression evaluator
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
evaluator.evaluate(output)

0.7406426641094093

`Oops`

### Classification with MLLib

In [19]:
from pyspark.ml.classification import LogisticRegression

In [30]:
df = spark.read.csv(r"D:\Data Science\IIITB\Data Engineering - II\Module - 6 (Analytics Using PySpark)\2. Linear Regression Using PySpark\iris+(1).csv", header=True, inferSchema=True)

In [31]:
assembler = VectorAssembler(inputCols=[x for x in df.columns if x != "species"],
                           outputCol="features")

In [33]:
iris = assembler.transform(df)

`Note`: Similar to Regression problem, we cannot apply simple Classification fit directly.

In [34]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="species", outputCol="speciesIndex")

iris = indexer.fit(df).transform(df)

iris.sample(fraction = 0.1).show()

+------------+-----------+------------+-------------+---------------+------------+
|sepal_length|sepal_width|petal_length|petal_widthCm|        species|speciesIndex|
+------------+-----------+------------+-------------+---------------+------------+
|         5.2|        3.5|         1.5|          0.2|    Iris-setosa|         0.0|
|         4.8|        3.1|         1.6|          0.2|    Iris-setosa|         0.0|
|         4.4|        3.0|         1.3|          0.2|    Iris-setosa|         0.0|
|         4.4|        3.2|         1.3|          0.2|    Iris-setosa|         0.0|
|         5.0|        3.5|         1.6|          0.6|    Iris-setosa|         0.0|
|         5.1|        3.8|         1.9|          0.4|    Iris-setosa|         0.0|
|         4.8|        3.0|         1.4|          0.3|    Iris-setosa|         0.0|
|         6.9|        3.1|         4.9|          1.5|Iris-versicolor|         1.0|
|         4.9|        2.4|         3.3|          1.0|Iris-versicolor|         1.0|
|   

In [35]:
assembler = VectorAssembler(inputCols=[col for col in df.columns if not col.startswith("species")],
                           outputCol="features")
dataset = assembler.transform(iris)

In [36]:
dataset.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_widthCm: double (nullable = true)
 |-- species: string (nullable = true)
 |-- speciesIndex: double (nullable = false)
 |-- features: vector (nullable = true)



In [37]:
lr = LogisticRegression(featuresCol="features", labelCol="speciesIndex")
model = lr.fit(dataset)

In [38]:
summary = model.evaluate(dataset)
summary.accuracy

0.9866666666666667

In [39]:
# Now we will split datset in train-test set to evaluate because this is too convenient
trainData, testData = dataset.randomSplit([0.7, 0.3])

In [40]:
model = lr.fit(trainData)
summary = model.evaluate(testData)

In [41]:
summary.accuracy

0.9512195121951219

### Using Naive Bayes with MLLib

In [42]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [43]:
trainData, testData = dataset.randomSplit([0.6, 0.4])
nb = NaiveBayes(featuresCol="features", labelCol="speciesIndex")
model = nb.fit(trainData)
output = model.transform(testData)

In [44]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="speciesIndex")
evaluator.evaluate(output)

0.44745395449620795