In [3]:
import os 

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark

# Install Spark NLP
! pip install --ignore-installed spark-nlp

openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-8u265-b01-0ubuntu2~18.04-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 58kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 45.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=24e7e413ff35c4c3ea3f1ac8e902e42f5b67e3c9bbc0b0f725db9d271f7f6caf
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec84

In [4]:
! mkdir -p data

In [5]:
! wget https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data

--2020-09-14 23:17:38--  https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4551 (4.4K) [application/x-httpd-php]
Saving to: ‘iris.data’


2020-09-14 23:17:38 (77.0 MB/s) - ‘iris.data’ saved [4551/4551]



In [6]:
! mv iris.data ./data

In [7]:
! ls ./data

iris.data


In [8]:

from time import sleep

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as fun
from pyspark.sql.types import *

In [9]:
%matplotlib inline
import matplotlib.pyplot as plt

In [10]:
packages = ','.join([
    "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5",
])

spark = SparkSession.builder.getOrCreate()

In [11]:
from pyspark.sql.types import *

# **Classification using Spark MLib**

The goal is to build a model to predict what class an iris flower is (virginica, setosa, versicolor) based on its sepals and petals. The iris dataset does not have a header so a schema is contructed to define the columns and their types in the Dataframe.

In [12]:
schema = StructType([
                     StructField('sepal_length', DoubleType(), nullable=False),
                     StructField('sepal_width', DoubleType(), nullable=False),
                     StructField('petal_length', DoubleType(), nullable=False),
                     StructField('petal_width', DoubleType(), nullable=False),
                     StructField('class', StringType(), nullable=False)                               
])

In [13]:
iris = spark.read.csv('./data/iris.data', schema=schema)

In [14]:
iris.describe().toPandas()

Unnamed: 0,summary,sepal_length,sepal_width,petal_length,petal_width,class
0,count,150.0,150.0,150.0,150.0,150
1,mean,5.843333333333335,3.0540000000000007,3.758666666666669,1.1986666666666672,
2,stddev,0.8280661279778637,0.4335943113621737,1.764420419952262,0.7631607417008414,
3,min,4.3,2.0,1.0,0.1,Iris-setosa
4,max,7.9,4.4,6.9,2.5,Iris-virginica


Here are some of the summary statistics for the Iris setosa class.

In [15]:
iris.select('class').distinct().toPandas()

Unnamed: 0,class
0,Iris-virginica
1,Iris-setosa
2,Iris-versicolor


It is possible to register a Dataframe which will allow to interact with it using SQL. The Dataframe will be registered as a temporary table. This means that the table will exist only fo the lifetime of teh application.

In [16]:
iris.registerTempTable('iris')

In [17]:
spark.sql('''
SELECT *
FROM iris
LIMIT 5
''').toPandas()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa


Here are some of the fields grouped by their class.

In [18]:
spark.sql('''
SELECT class,
    min(sepal_length), avg(sepal_length), max(sepal_length),
    min(sepal_width), avg(sepal_width), max(sepal_width),
    min(petal_length), avg(petal_length), max(petal_length),
    min(petal_width), avg(petal_width), max(petal_width)
FROM iris
GROUP BY class
''').toPandas()

Unnamed: 0,class,min(sepal_length),avg(sepal_length),max(sepal_length),min(sepal_width),avg(sepal_width),max(sepal_width),min(petal_length),avg(petal_length),max(petal_length),min(petal_width),avg(petal_width),max(petal_width)
0,Iris-virginica,4.9,6.588,7.9,2.2,2.974,3.8,4.5,5.552,6.9,1.4,2.026,2.5
1,Iris-setosa,4.3,5.006,5.8,2.3,3.418,4.4,1.0,1.464,1.9,0.1,0.244,0.6
2,Iris-versicolor,4.9,5.936,7.0,2.0,2.77,3.4,3.0,4.26,5.1,1.0,1.326,1.8


**Transformers**

A Transformer can be used to transform the data without needing to learn or fit anything from the data. Transformers represent functions that are used to map over our data.

SQLTransformer has one parameter, which is the SQL statement that will be executed on the Dataframe. SQL transformer is used here to do the group-by performed previously.

In [19]:
from pyspark.ml.feature import SQLTransformer

statement = '''
SELECT 
    class, 
    min(sepal_length), avg(sepal_length), max(sepal_length),
    min(sepal_width), avg(sepal_width), max(sepal_width),
    min(petal_length), avg(petal_length), max(petal_length),
    min(petal_width), avg(petal_width), max(petal_width)
FROM iris
GROUP BY class
'''

sql_transformer = SQLTransformer(statement=statement)


In [20]:
sql_transformer.transform(iris).toPandas()

Unnamed: 0,class,min(sepal_length),avg(sepal_length),max(sepal_length),min(sepal_width),avg(sepal_width),max(sepal_width),min(petal_length),avg(petal_length),max(petal_length),min(petal_width),avg(petal_width),max(petal_width)
0,Iris-virginica,4.9,6.588,7.9,2.2,2.974,3.8,4.5,5.552,6.9,1.4,2.026,2.5
1,Iris-setosa,4.3,5.006,5.8,2.3,3.418,4.4,1.0,1.464,1.9,0.1,0.244,0.6
2,Iris-versicolor,4.9,5.936,7.0,2.0,2.77,3.4,3.0,4.26,5.1,1.0,1.326,1.8


The output is the same one obtained when ran the SQL command. SQLTransformer is useful when having preprocessing that needs to be performed on the data before other steps in the pipeline.

**Binarizer**

The Binarizer is a Transformer that applies a threshold to a numeric field, turning is into 0s (when belowe the threshold) and 1s (when above teh threshold). The Binarizer returns a modified version of the input Dataframe and it is used to convert a real valued property into a class.

In [21]:
from pyspark.ml.feature import Binarizer

binarizer = Binarizer(
    inputCol = 'sepal_length',
    outputCol = 'sepal_lenght_above_5',
    threshold = 5.0
)

In [22]:
binarizer.transform(iris).limit(5).toPandas()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class,sepal_lenght_above_5
0,5.1,3.5,1.4,0.2,Iris-setosa,1.0
1,4.9,3.0,1.4,0.2,Iris-setosa,0.0
2,4.7,3.2,1.3,0.2,Iris-setosa,0.0
3,4.6,3.1,1.5,0.2,Iris-setosa,0.0
4,5.0,3.6,1.4,0.2,Iris-setosa,0.0


**VectorAssembler**

Another important Transformer is the VectorAssembler. It takes a list of numeric and vector-valued columns and constructs a single vector. This is useful because all MLlib's machine learning algorithms expect a single-valued input colums for features.

In [23]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
    outputCol= 'features'
)

In [24]:
iris_w_vecs = assembler.transform(iris).persist()

In [25]:
iris_w_vecs.limit(5).toPandas()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class,features
0,5.1,3.5,1.4,0.2,Iris-setosa,"[5.1, 3.5, 1.4, 0.2]"
1,4.9,3.0,1.4,0.2,Iris-setosa,"[4.9, 3.0, 1.4, 0.2]"
2,4.7,3.2,1.3,0.2,Iris-setosa,"[4.7, 3.2, 1.3, 0.2]"
3,4.6,3.1,1.5,0.2,Iris-setosa,"[4.6, 3.1, 1.5, 0.2]"
4,5.0,3.6,1.4,0.2,Iris-setosa,"[5.0, 3.6, 1.4, 0.2]"


**MinMaxScaler**

The MinMaxScaler allows to scale the data to be between 0 and 1.

In [26]:
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(
    inputCol = 'features',
    outputCol = 'petal_length_scaled'
)

In [27]:
scaler_model = scaler.fit(iris_w_vecs)

In [28]:
scaler_model.transform(iris_w_vecs).limit(5).toPandas()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class,features,petal_length_scaled
0,5.1,3.5,1.4,0.2,Iris-setosa,"[5.1, 3.5, 1.4, 0.2]","[0.22222222222222213, 0.625, 0.067796610169491..."
1,4.9,3.0,1.4,0.2,Iris-setosa,"[4.9, 3.0, 1.4, 0.2]","[0.1666666666666668, 0.41666666666666663, 0.06..."
2,4.7,3.2,1.3,0.2,Iris-setosa,"[4.7, 3.2, 1.3, 0.2]","[0.11111111111111119, 0.5, 0.05084745762711865..."
3,4.6,3.1,1.5,0.2,Iris-setosa,"[4.6, 3.1, 1.5, 0.2]","[0.08333333333333327, 0.4583333333333333, 0.08..."
4,5.0,3.6,1.4,0.2,Iris-setosa,"[5.0, 3.6, 1.4, 0.2]","[0.19444444444444448, 0.6666666666666666, 0.06..."


The petal_length_scaled colums now has values between 0 and 1. This can hels some algorithms, specifically those that have difficulty combining features of different scales.

## **Building the Model** 

The goal is to predict the class from the other features, and for this I will use a decision tree. First, the target must be converted into index values.

The StringIdexer will turn the class values into indices, as it is simpler to implement most training algorithms with the assumption that the target is a number.

IndexToString is used to map the predictions, which will be indices, back to string values.

In [29]:
from pyspark.ml.feature import StringIndexer, IndexToString

indexer = StringIndexer(inputCol = 'class', outputCol= 'class_ix')
indexer_model = indexer.fit(iris_w_vecs)

index2string = IndexToString(
    inputCol = indexer_model.getOrDefault('outputCol'),
    outputCol = 'pred_class',
    labels = indexer_model.labels
)

In [30]:
iris_indexed = indexer_model.transform(iris_w_vecs)

The next step consists of training the DecisionTreeClassifier.

In [31]:
from pyspark.ml.classification import DecisionTreeClassifier

dt_clfr = DecisionTreeClassifier(
    featuresCol = 'features',
    labelCol = 'class_ix',
    maxDepth = 5,
    impurity ='gini'
)

In [32]:
dt_clfr_model = dt_clfr.fit(iris_indexed)

In [33]:
iris_w_pred = dt_clfr_model.transform(iris_indexed)

In [34]:
iris_w_pred.limit(5).toPandas()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class,features,class_ix,rawPrediction,probability,prediction
0,5.1,3.5,1.4,0.2,Iris-setosa,"[5.1, 3.5, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0
1,4.9,3.0,1.4,0.2,Iris-setosa,"[4.9, 3.0, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0
2,4.7,3.2,1.3,0.2,Iris-setosa,"[4.7, 3.2, 1.3, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0
3,4.6,3.1,1.5,0.2,Iris-setosa,"[4.6, 3.1, 1.5, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0
4,5.0,3.6,1.4,0.2,Iris-setosa,"[5.0, 3.6, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0


Here the predicted classes are mapped back to their string form using IndexToString.

In [35]:
iris_w_pred_class = index2string.transform(iris_w_pred)

In [36]:
iris_w_pred_class.limit(10).toPandas()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class,features,class_ix,rawPrediction,probability,prediction,pred_class
0,5.1,3.5,1.4,0.2,Iris-setosa,"[5.1, 3.5, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa,"[4.9, 3.0, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa,"[4.7, 3.2, 1.3, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa,"[4.6, 3.1, 1.5, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa,"[5.0, 3.6, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
5,5.4,3.9,1.7,0.4,Iris-setosa,"[5.4, 3.9, 1.7, 0.4]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
6,4.6,3.4,1.4,0.3,Iris-setosa,"[4.6, 3.4, 1.4, 0.3]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
7,5.0,3.4,1.5,0.2,Iris-setosa,"[5.0, 3.4, 1.5, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
8,4.4,2.9,1.4,0.2,Iris-setosa,"[4.4, 2.9, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
9,4.9,3.1,1.5,0.1,Iris-setosa,"[4.9, 3.1, 1.5, 0.1]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa


**Evaluator**

In order to see how weel did the model fit the data, MultiClassificationEvaluator is used to see how many predictions match the true class.

In [37]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol = 'class_ix',
    metricName ='accuracy'
)

In [38]:
evaluator.evaluate(iris_w_pred_class)

1.0

This seems to good. The model might be overfitting, so using cross-validation to evaluate the model is a better idea.

**Building a Pipeline**

In [39]:
from pyspark.ml import Pipeline

pipeline = Pipeline(
    stages = [assembler, indexer, dt_clfr, index2string]
)

In [40]:
pipeline_model = pipeline.fit(iris)

In [41]:
pipeline_model.transform(iris).limit(5).toPandas()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class,features,class_ix,rawPrediction,probability,prediction,pred_class
0,5.1,3.5,1.4,0.2,Iris-setosa,"[5.1, 3.5, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa,"[4.9, 3.0, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa,"[4.7, 3.2, 1.3, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa,"[4.6, 3.1, 1.5, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa,"[5.0, 3.6, 1.4, 0.2]",0.0,"[50.0, 0.0, 0.0]","[1.0, 0.0, 0.0]",0.0,Iris-setosa


In [42]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

param_grid = ParamGridBuilder().\
    addGrid(dt_clfr.maxDepth, [5]).\
    build()
cv = CrossValidator(
    estimator=pipeline, 
    estimatorParamMaps=param_grid,
    evaluator=evaluator, 
    numFolds=3, 
    seed=123
)

In [43]:
cv_model = cv.fit(iris)


Now, it is possible to see how the model does when trained on two-thirds and evaluated on on-third.

In [44]:
cv_model.avgMetrics


[0.9470457079152732]

95% accuracy is much more believable than 100%.

**Saving the Pipeline**

In [45]:
pipeline_model.write().overwrite().save('pipeline.model')

In [46]:
! ls pipeline.model/*

pipeline.model/metadata:
part-00000  _SUCCESS

pipeline.model/stages:
0_VectorAssembler_44aabc937917	2_DecisionTreeClassifier_cb14d561c0a6
1_StringIndexer_54235b4e5898	3_IndexToString_832505ba40cb
