## **Apache Spark Machine Learning**

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:6 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:10 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages [1,023 kB]
Get:11 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
Get:12 h

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

In [3]:
!ls

sample_data  spark-2.3.1-bin-hadoop2.7	spark-2.3.1-bin-hadoop2.7.tgz


In [4]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc

In [5]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [7]:
import pandas as pd

# Raihan's Google Drive directory
# data = pd.read_csv('/content/drive/My Drive/Colab Notebooks/Datasets/hotelReview_train.csv')

# Ibnu's Google Drive directory
# data = pd.read_csv('/content/drive/My Drive/Colab Notebooks/Hotel Review/train.csv')

# Local directory
emp_df = spark.read.csv('/content/drive/My Drive/colab_notes/datasets/exercises/01_04/employee.txt', header=True)
emp_df.cache()
emp_df.count()
emp_df.show(10)

+---+------------+--------------------+--------+------------+------------+------+--------------------+---------+
| id|   last_name|               email|  gender|  department|  start_date|salary|           job_title|region_id|
+---+------------+--------------------+--------+------------+------------+------+--------------------+---------+
|  1|    'Kelley'|'rkelley0@soundcl...|'Female'| 'Computers'| '10/2/2009'| 67470|'Structural Engin...|        2|
|  2| 'Armstrong'|'sarmstrong1@info...|  'Male'|    'Sports'| '3/31/2008'| 71869| 'Financial Advisor'|        2|
|  3|      'Carr'|'fcarr2@woothemes...|  'Male'|'Automotive'| '7/12/2009'|101768|'Recruiting Manager'|        3|
|  4|    'Murray'|   'jmurray3@gov.uk'|'Female'|  'Jewelery'|'12/25/2014'| 96897|'Desktop Support ...|        3|
|  5|     'Ellis'|'jellis4@scienced...|'Female'|   'Grocery'| '9/19/2002'| 63702|'Software Enginee...|        7|
|  6|  'Phillips'|'bphillips5@time....|  'Male'|     'Tools'| '8/21/2013'|118497|'Executive Secr

In [8]:
from pyspark.ml.feature import MinMaxScaler, StandardScaler, Bucketizer
from pyspark.ml.linalg import Vectors

## **Creating DataFrame**

In [9]:
features_df = spark.createDataFrame([
                                     (1, Vectors.dense([10.0, 10000.0, 1.0]),),
                                     (2, Vectors.dense([20.0, 30000.0, 2.0]),),
                                     (3, Vectors.dense([30.0, 40000.0, 3.0]),)
                                      
],['id', 'features'])

In [10]:
features_df.take(2)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0])),
 Row(id=2, features=DenseVector([20.0, 30000.0, 2.0]))]

## **Transforming DataFrame**

#### **Normalization**

In [11]:
feature_scaler = MinMaxScaler(inputCol='features', outputCol='sfeatures')

In [12]:
smodel = feature_scaler.fit(features_df)

In [13]:
sfeatures = smodel.transform(features_df)

In [14]:
# comparison between scaled and unscaled features
sfeatures.select('features', 'sfeatures').show()

+------------------+--------------------+
|          features|           sfeatures|
+------------------+--------------------+
|[10.0,10000.0,1.0]|       [0.0,0.0,0.0]|
|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



#### **Standardization**

In [15]:
 feature_std_scaler = StandardScaler(inputCol='features', outputCol='sfeatures', withStd=True, withMean=True)

In [16]:
std_smodel = feature_std_scaler.fit(features_df)

In [17]:
std_sfeatures_df = std_smodel.transform(features_df)

In [18]:
std_sfeatures_df.show(truncate=False)

+---+------------------+-------------------------------+
|id |features          |sfeatures                      |
+---+------------------+-------------------------------+
|1  |[10.0,10000.0,1.0]|[-1.0,-1.0910894511799618,-1.0]|
|2  |[20.0,30000.0,2.0]|[0.0,0.21821789023599256,0.0]  |
|3  |[30.0,40000.0,3.0]|[1.0,0.8728715609439698,1.0]   |
+---+------------------+-------------------------------+



### **Bucketizing Numeric Data** / **Binning**

In [19]:
# Defining buckets
splits = [-float('inf'), -10.0, 0.0, 10.0, float('inf')]

In [20]:
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ['features'])

In [21]:
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [22]:
# Defining a bucketizer
bucketizer = Bucketizer(splits=splits, inputCol='features', outputCol='bfeatures')

In [23]:
bucketed_df = bucketizer.transform(b_df)

In [24]:
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



## **Tokenization**

In [25]:
from pyspark.ml.feature import Tokenizer

In [26]:
sentences_df = spark.createDataFrame([
                                      (1, 'This is an introduction to Spark MLlib'),
                                      (2, 'MLlib includes libraries for classification and regression'),
                                      (3, 'It also contatins supporting tools for pipelines')
], ['id', 'sentence'] )

In [27]:
sentences_df.show(truncate=False)

+---+----------------------------------------------------------+
|id |sentence                                                  |
+---+----------------------------------------------------------+
|1  |This is an introduction to Spark MLlib                    |
|2  |MLlib includes libraries for classification and regression|
|3  |It also contatins supporting tools for pipelines          |
+---+----------------------------------------------------------+



In [28]:
sent_token = Tokenizer(inputCol='sentence', outputCol='words')

In [29]:
sent_tokenized_df = sent_token.transform(sentences_df)

In [30]:
sent_tokenized_df.show(truncate=False) 

+---+----------------------------------------------------------+------------------------------------------------------------------+
|id |sentence                                                  |words                                                             |
+---+----------------------------------------------------------+------------------------------------------------------------------+
|1  |This is an introduction to Spark MLlib                    |[this, is, an, introduction, to, spark, mllib]                    |
|2  |MLlib includes libraries for classification and regression|[mllib, includes, libraries, for, classification, and, regression]|
|3  |It also contatins supporting tools for pipelines          |[it, also, contatins, supporting, tools, for, pipelines]          |
+---+----------------------------------------------------------+------------------------------------------------------------------+



## **TF-IDF**

In [31]:
from pyspark.ml.feature import HashingTF, IDF

In [32]:
sentences_df

DataFrame[id: bigint, sentence: string]

In [33]:
sentences_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib')]

In [34]:
sent_tokenized_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'])]

In [35]:
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=20)

In [36]:
sent_hfTF_df = hashingTF.transform(sent_tokenized_df)

In [37]:
sent_hfTF_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {1: 2.0, 5: 1.0, 6: 1.0, 8: 1.0, 12: 1.0, 13: 1.0}))]

In [38]:
idf = IDF(inputCol='rawFeatures', outputCol='idf_features')

In [39]:
idfModel = idf.fit(sent_hfTF_df)

In [40]:
tfidf_df = idfModel.transform(sent_hfTF_df)

In [41]:
tfidf_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {1: 2.0, 5: 1.0, 6: 1.0, 8: 1.0, 12: 1.0, 13: 1.0}), idf_features=SparseVector(20, {1: 0.5754, 5: 0.6931, 6: 0.2877, 8: 0.6931, 12: 0.0, 13: 0.2877}))]

In [42]:
tfidf_df.show(truncate=False)

+---+----------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|id |sentence                                                  |words                                                             |rawFeatures                                           |idf_features                                                                                                                                         |
+---+----------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------

## **Clustering**

In [43]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [44]:
cluster_df = spark.read.csv('/content/drive/My Drive/colab_notes/datasets/exercises/03_02/clustering_dataset.csv', header=True, inferSchema=True)
cluster_df.cache()
cluster_df.count()
cluster_df.show(5, truncate=False)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|7   |4   |1   |
|7   |7   |9   |
|7   |9   |6   |
|1   |6   |5   |
|6   |7   |7   |
+----+----+----+
only showing top 5 rows



In [45]:
# transforming the columns into feature vectors
vectorAssembler = VectorAssembler(inputCols=['col1', 'col2', 'col3'], outputCol='features')

In [46]:
vcluster_df = vectorAssembler.transform(cluster_df)

In [47]:
# kmeans words with feature vector
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [48]:
kmeans = KMeans().setK(3)

In [49]:
kmeans = kmeans.setSeed(1)

In [50]:
kmodel = kmeans.fit(vcluster_df)

In [51]:
centers = kmodel.clusterCenters()

In [52]:
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([5.12, 5.84, 4.84]),
 array([80.        , 79.20833333, 78.29166667])]

In [53]:
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [54]:
from pyspark.ml.clustering import BisectingKMeans

In [55]:
bkmeans = BisectingKMeans().setK(3)

In [56]:
bkmeans = bkmeans.setSeed(1)

In [57]:
bkmodel = bkmeans.fit(vcluster_df)

In [58]:
bkecenters = bkmodel.clusterCenters()

In [59]:
# Center for bisekting kmeans clustering
bkecenters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

In [60]:
# Center for kmeans clustering
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([5.12, 5.84, 4.84]),
 array([80.        , 79.20833333, 78.29166667])]

## **Classification**

In [61]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [62]:
# Loading the data
iris_df = spark.read.csv('/content/drive/My Drive/colab_notes/datasets/exercises/iris.data', inferSchema=True)
iris_df.cache()
iris_df.count()
iris_df.show(5, truncate=False)

+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3|_c4        |
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
|4.6|3.1|1.5|0.2|Iris-setosa|
|5.0|3.6|1.4|0.2|Iris-setosa|
+---+---+---+---+-----------+
only showing top 5 rows



In [63]:
iris_df = iris_df.select(
    col('_c0').alias('sepal_length'),
    col('_c1').alias('sepal_width'),
    col('_c2').alias('petal_length'),
    col('_c3').alias('petal_width'),
    col('_c4').alias('species'))

In [64]:
iris_df.show()

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|
|         4.8|        3.0|         1.4| 

In [65]:
vectorAssembler = VectorAssembler(
    inputCols=['sepal_length','sepal_width', 'petal_length','petal_width'], outputCol='features')

In [66]:
viris_df = vectorAssembler.transform(iris_df)

In [67]:
viris_df.show()

+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|
+------------+-----------+------------+-----------+-----------+-----------------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|[4.4,2.9,1.4,0.2]|
|         4.9|  

In [68]:
# transforming label name into a numeric value using string indexer
indexer = StringIndexer(inputCol='species', outputCol='label')

In [69]:
iviris_df = indexer.fit(viris_df).transform(viris_df)

In [70]:
iviris_df.show()

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|  0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|  0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|  0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|  0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|  0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|  0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|  0.0|
|         4.4|        2.9|      

## **Naive Bayes Classifier**

In [71]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [72]:
# Splitting the data
splits = iviris_df.randomSplit([0.6,0.4],1)

In [73]:
train_df = splits[0]

In [74]:
test_df = splits[1]

In [75]:
train_df.count()

92

In [76]:
test_df.count()

58

In [77]:
iviris_df.count()

150

In [78]:
# Defining a naive bayes classifier
nb = NaiveBayes(modelType='multinomial')

In [79]:
# Fit the model to the data
nbModel = nb.fit(train_df)

In [80]:
predictions_df = nbModel.transform(test_df)

In [81]:
predictions_df.show()

+------------+-----------+------------+-----------+---------------+-----------------+-----+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|        species|         features|label|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+---------------+-----------------+-----+--------------------+--------------------+----------+
|         4.5|        2.3|         1.3|        0.3|    Iris-setosa|[4.5,2.3,1.3,0.3]|  0.0|[-10.360506349494...|[0.56204387804619...|       0.0|
|         4.6|        3.1|         1.5|        0.2|    Iris-setosa|[4.6,3.1,1.5,0.2]|  0.0|[-11.322221219128...|[0.67258246553895...|       0.0|
|         4.6|        3.4|         1.4|        0.3|    Iris-setosa|[4.6,3.4,1.4,0.3]|  0.0|[-11.815206070650...|[0.69767779143661...|       0.0|
|         4.7|        3.2|         1.6|        0.2|    Iris-setosa|[4.7,3.2,1.6,0.2]|  0.0|[-11.696201471654...|[0.67305222919201.

In [82]:
# Evaluating the algorithm
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

In [83]:
nbaccuracy = evaluator.evaluate(predictions_df)

In [84]:
nbaccuracy

0.5862068965517241

## **Multilayer Perceptron**

In [85]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [86]:
iviris_df.show()

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|  0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|  0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|  0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|  0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|[5.4,3.9,1.7,0.4]|  0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|[4.6,3.4,1.4,0.3]|  0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|  0.0|
|         4.4|        2.9|      

In [87]:
train_df.count()

92

In [88]:
test_df.count()

58

In [89]:
iviris_df.count()

150

In [90]:
# Defining the neural network
layers = [4, 5, 5, 3]

In [91]:
mlp = MultilayerPerceptronClassifier(layers=layers, seed=1)

In [92]:
mlp_model = mlp.fit(train_df)

In [93]:
mlp_predictions = mlp_model.transform(test_df)

In [94]:
mlp_evaluator = MulticlassClassificationEvaluator(metricName='accuracy')

In [95]:
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)

In [96]:
mlp_accuracy

0.9482758620689655

## **Decision Tree Classifier**

In [97]:
from pyspark.ml.classification import DecisionTreeClassifier

In [98]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')

In [99]:
dt_model = dt.fit(train_df)

In [100]:
dt_predictions = dt_model.transform(test_df)

In [101]:
dt_evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction', metricName='accuracy')

In [102]:
dt_accuracy = dt_evaluator.evaluate(dt_predictions)

In [103]:
dt_accuracy

0.9310344827586207

# **Regression**

In [104]:
from pyspark.ml.regression import LinearRegression

In [105]:
pp_df = spark.read.csv('/content/drive/My Drive/colab_notes/datasets/exercises/CCPP/power_plant.csv', header=True, inferSchema=True)
pp_df.cache()
pp_df.count()
pp_df.show(5, truncate=False)

+-----+-----+-------+-----+------+
|AT   |V    |AP     |RH   |PE    |
+-----+-----+-------+-----+------+
|8.34 |40.77|1010.84|90.01|480.48|
|23.64|58.49|1011.4 |74.2 |445.75|
|29.74|56.9 |1007.15|41.91|438.76|
|19.07|49.69|1007.22|76.79|453.09|
|11.8 |40.66|1017.13|97.2 |464.43|
+-----+-----+-------+-----+------+
only showing top 5 rows



In [106]:
pp_df.printSchema()

root
 |-- AT: double (nullable = true)
 |-- V: double (nullable = true)
 |-- AP: double (nullable = true)
 |-- RH: double (nullable = true)
 |-- PE: double (nullable = true)



In [107]:
vectorAssembler = VectorAssembler(inputCols=['AT', 'V', 'AP', 'RH', 'PE'], outputCol='features')

In [109]:
vpp_df = vectorAssembler.transform(pp_df)

In [111]:
vpp_df.show(truncate=False)

+-----+-----+-------+-----+------+----------------------------------+
|AT   |V    |AP     |RH   |PE    |features                          |
+-----+-----+-------+-----+------+----------------------------------+
|8.34 |40.77|1010.84|90.01|480.48|[8.34,40.77,1010.84,90.01,480.48] |
|23.64|58.49|1011.4 |74.2 |445.75|[23.64,58.49,1011.4,74.2,445.75]  |
|29.74|56.9 |1007.15|41.91|438.76|[29.74,56.9,1007.15,41.91,438.76] |
|19.07|49.69|1007.22|76.79|453.09|[19.07,49.69,1007.22,76.79,453.09]|
|11.8 |40.66|1017.13|97.2 |464.43|[11.8,40.66,1017.13,97.2,464.43]  |
|13.97|39.16|1016.05|84.6 |470.96|[13.97,39.16,1016.05,84.6,470.96] |
|22.1 |71.29|1008.2 |75.38|442.35|[22.1,71.29,1008.2,75.38,442.35]  |
|14.47|41.76|1021.98|78.41|464.0 |[14.47,41.76,1021.98,78.41,464.0] |
|31.25|69.51|1010.25|36.83|428.77|[31.25,69.51,1010.25,36.83,428.77]|
|6.77 |38.18|1017.8 |81.13|484.31|[6.77,38.18,1017.8,81.13,484.31]  |
|28.28|68.67|1006.36|69.9 |435.29|[28.28,68.67,1006.36,69.9,435.29] |
|22.99|46.93|1014.15

## **Linear Regression**

In [112]:
lr = LinearRegression(featuresCol='features', labelCol='PE')

In [113]:
lr_model = lr.fit(vpp_df)

In [114]:
lr_model.coefficients

DenseVector([0.0, 0.0, -0.0, 0.0, 1.0])

In [115]:
lr_model.intercept

-3.098834962028515e-10

In [116]:
lr_model.summary.rootMeanSquaredError

3.1065745851191783e-12

In [117]:
lr_model.save('lr1model')

## **Decision Tree Regression**

In [118]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [120]:
vpp_df.show(truncate=False)

+-----+-----+-------+-----+------+----------------------------------+
|AT   |V    |AP     |RH   |PE    |features                          |
+-----+-----+-------+-----+------+----------------------------------+
|8.34 |40.77|1010.84|90.01|480.48|[8.34,40.77,1010.84,90.01,480.48] |
|23.64|58.49|1011.4 |74.2 |445.75|[23.64,58.49,1011.4,74.2,445.75]  |
|29.74|56.9 |1007.15|41.91|438.76|[29.74,56.9,1007.15,41.91,438.76] |
|19.07|49.69|1007.22|76.79|453.09|[19.07,49.69,1007.22,76.79,453.09]|
|11.8 |40.66|1017.13|97.2 |464.43|[11.8,40.66,1017.13,97.2,464.43]  |
|13.97|39.16|1016.05|84.6 |470.96|[13.97,39.16,1016.05,84.6,470.96] |
|22.1 |71.29|1008.2 |75.38|442.35|[22.1,71.29,1008.2,75.38,442.35]  |
|14.47|41.76|1021.98|78.41|464.0 |[14.47,41.76,1021.98,78.41,464.0] |
|31.25|69.51|1010.25|36.83|428.77|[31.25,69.51,1010.25,36.83,428.77]|
|6.77 |38.18|1017.8 |81.13|484.31|[6.77,38.18,1017.8,81.13,484.31]  |
|28.28|68.67|1006.36|69.9 |435.29|[28.28,68.67,1006.36,69.9,435.29] |
|22.99|46.93|1014.15

In [123]:
# Splitting the data into train and test
splits = vpp_df.randomSplit([0.7,0.3])

In [124]:
train_df = splits[0]

In [125]:
test_df = splits[1]

In [126]:
train_df.count()

6662

In [127]:
test_df.count()

2906

In [128]:
vpp_df.count()

9568

In [141]:
dt_reg = DecisionTreeRegressor(featuresCol='features', labelCol='PE')

In [142]:
dt_model = dt_reg.fit(train_df)

In [143]:
dt_predictions = dt_model.transform(test_df)

In [145]:
dt_evaluator = RegressionEvaluator(labelCol='PE', predictionCol='prediction', metricName='rmse')

In [146]:
rmse = dt_evaluator.evaluate(dt_predictions)

In [147]:
rmse

0.7915764124303604

## **Gradient Boosted Tree Regressor**

In [148]:
from pyspark.ml.regression import GBTRegressor

In [149]:
gbt = GBTRegressor(featuresCol='features', labelCol='PE')

In [150]:
gbt_model = gbt.fit(train_df)

In [151]:
gbt_predictions = gbt_model.transform(test_df)

In [153]:
gbt_evaluator = RegressionEvaluator(labelCol='PE', predictionCol='prediction', metricName='rmse')

In [154]:
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
gbt_rmse

0.7068436861730462