In [1]:
import pyspark
import opendatasets as od
import os
import pandas as pd
import numpy as np

In [2]:
data = od.download("https://www.kaggle.com/uciml/iris?select=Iris.csv")
data_dir = '.\iris'
data = os.listdir(data_dir)
data_df = pd.read_csv('iris/Iris.csv')
data = np.array(data_df)
X = data[:,1:5]
y = data[:,5]

Skipping, found downloaded files in ".\iris" (use force=True to force download)


### To work with PySpark you have to start a Spark session

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [5]:
spark

In [16]:
df_pyspark = spark.read.option('header', 'true').csv('iris/Iris.csv', inferSchema=True)
df_pyspark.show()

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
| 11|          5.4|         3.7|          1.5|         0.2|Iris-

In [17]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [23]:
df_pyspark.head(2)

[Row(Id=1, SepalLengthCm=5.1, SepalWidthCm=3.5, PetalLengthCm=1.4, PetalWidthCm=0.2, Species='Iris-setosa'),
 Row(Id=2, SepalLengthCm=4.9, SepalWidthCm=3.0, PetalLengthCm=1.4, PetalWidthCm=0.2, Species='Iris-setosa')]

In [19]:
df_pyspark.printSchema

<bound method DataFrame.printSchema of DataFrame[Id: int, SepalLengthCm: double, SepalWidthCm: double, PetalLengthCm: double, PetalWidthCm: double, Species: string]>

### Selecting columns

In [21]:
df_pyspark.columns

['Id',
 'SepalLengthCm',
 'SepalWidthCm',
 'PetalLengthCm',
 'PetalWidthCm',
 'Species']

In [26]:
df_pyspark.select('SepalLengthCm').show()

+-------------+
|SepalLengthCm|
+-------------+
|          5.1|
|          4.9|
|          4.7|
|          4.6|
|          5.0|
|          5.4|
|          4.6|
|          5.0|
|          4.4|
|          4.9|
|          5.4|
|          4.8|
|          4.8|
|          4.3|
|          5.8|
|          5.7|
|          5.4|
|          5.1|
|          5.7|
|          5.1|
+-------------+
only showing top 20 rows



In [28]:
df_pyspark.select(['SepalLengthCm', 'SepalWidthCm']).show()

+-------------+------------+
|SepalLengthCm|SepalWidthCm|
+-------------+------------+
|          5.1|         3.5|
|          4.9|         3.0|
|          4.7|         3.2|
|          4.6|         3.1|
|          5.0|         3.6|
|          5.4|         3.9|
|          4.6|         3.4|
|          5.0|         3.4|
|          4.4|         2.9|
|          4.9|         3.1|
|          5.4|         3.7|
|          4.8|         3.4|
|          4.8|         3.0|
|          4.3|         3.0|
|          5.8|         4.0|
|          5.7|         4.4|
|          5.4|         3.9|
|          5.1|         3.5|
|          5.7|         3.8|
|          5.1|         3.8|
+-------------+------------+
only showing top 20 rows



In [29]:
df_pyspark.dtypes

[('Id', 'int'),
 ('SepalLengthCm', 'double'),
 ('SepalWidthCm', 'double'),
 ('PetalLengthCm', 'double'),
 ('PetalWidthCm', 'double'),
 ('Species', 'string')]

### Check the describe function (similar to Pandas)

In [31]:
df_pyspark.describe().show()

+-------+------------------+------------------+-------------------+------------------+------------------+--------------+
|summary|                Id|     SepalLengthCm|       SepalWidthCm|     PetalLengthCm|      PetalWidthCm|       Species|
+-------+------------------+------------------+-------------------+------------------+------------------+--------------+
|  count|               150|               150|                150|               150|               150|           150|
|   mean|              75.5| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|          null|
| stddev|43.445367992456916|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|          null|
|    min|                 1|               4.3|                2.0|               1.0|               0.1|   Iris-setosa|
|    max|               150|               7.9|                4.4|               6.9|               2.5|Iris-virginica|
+-------+------------------+----

### Adding/removing columns

In [37]:
# adding
df_new = df_pyspark.withColumn('SepalWidthCm_new', df_pyspark['SepalWidthCm']+1)#.show()
df_new.show()

+---+-------------+------------+-------------+------------+-----------+----------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|SepalWidthCm_new|
+---+-------------+------------+-------------+------------+-----------+----------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|             4.5|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|             4.0|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|             4.2|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|             4.1|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|             4.6|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|             4.9|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|             4.4|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|             4.4|
|  9|          4.4|  

In [38]:
# dropping
df_new = df_pyspark.drop('SepalWidthCm_new')
df_new.show()

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
| 11|          5.4|         3.7|          1.5|         0.2|Iris-

### Renaming columns

In [39]:
df_pyspark.withColumnRenamed('Species','Other Species').show()

+---+-------------+------------+-------------+------------+-------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Other Species|
+---+-------------+------------+-------------+------------+-------------+
|  1|          5.1|         3.5|          1.4|         0.2|  Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|  Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|  Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|  Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|  Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|  Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|  Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|  Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|  Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|  Iris-setosa|
| 11|          5.4|         3.7|      

### PySpark Handling Missing Values
- Dropping columns
- Dropping rows
- Various parameter in dropping functionalities
- Handling missing values by mean/median/mode replacement

In [45]:
# dropping columns
df = df_pyspark
df.drop('Id').show()

+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+-------------+------------+-------------+------------+-----------+
|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
|          5.4|         3.7|          1.5|         0.2|Iris-setosa|
|          4.8|         3.4|          1.6|      

In [53]:
df.na.drop().show() #this will drop the rows with null values

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
| 11|          5.4|         3.7|          1.5|         0.2|Iris-

In [56]:
df.na.drop(how='any') # [default] will drop when there is at least one null on that row
df.na.drop(how='all') # will drop when all elements are null on that row
df.na.drop(thresh=2) # will drop when there are at least TWO nulls on that row
df.na.drop(subset=['SepalWidthCm']) # will drop on specific columns

DataFrame[Id: int, SepalLengthCm: double, SepalWidthCm: double, PetalLengthCm: double, PetalWidthCm: double, Species: string]

### Filling the missing values

In [59]:
df.na.fill(-9999, ['SepalWidthCm', 'SepalLengthCm']) # this will replace nulls on selected columns with -9999 

DataFrame[Id: int, SepalLengthCm: double, SepalWidthCm: double, PetalLengthCm: double, PetalWidthCm: double, Species: string]

In [63]:
# fill with the mean

from pyspark.ml.feature import Imputer

imputer = Imputer(
                  inputCols = ['SepalLengthCm', 'SepalWidthCm'],
                  outputCols = ["{}_imputed".format(c) for c in ['SepalLengthCm', 'SepalWidthCm']]).setStrategy('mean')
imputer.fit(df).transform(df).show()

# filling with median
imputer = Imputer(
                  inputCols = ['SepalLengthCm', 'SepalWidthCm'],
                  outputCols = ["{}_imputed".format(c) for c in ['SepalLengthCm', 'SepalWidthCm']]).setStrategy('median')
imputer.fit(df).transform(df).show()

+---+-------------+------------+-------------+------------+-----------+---------------------+--------------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|SepalLengthCm_imputed|SepalWidthCm_imputed|
+---+-------------+------------+-------------+------------+-----------+---------------------+--------------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|                  5.1|                 3.5|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|                  4.9|                 3.0|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|                  4.7|                 3.2|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|                  4.6|                 3.1|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|                  5.0|                 3.6|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|         

# Filter operation in PySpark
- Filter operation
- &,",==
- ~

In [67]:
# find sepal length > 5
#df.filter("SepalLengthCm>5").show()
df.filter("SepalLengthCm>5").select(['SepalLengthCm']).show()

+-------------+
|SepalLengthCm|
+-------------+
|          5.1|
|          5.4|
|          5.4|
|          5.8|
|          5.7|
|          5.4|
|          5.1|
|          5.7|
|          5.1|
|          5.4|
|          5.1|
|          5.1|
|          5.2|
|          5.2|
|          5.4|
|          5.2|
|          5.5|
|          5.5|
|          5.1|
|          5.1|
+-------------+
only showing top 20 rows



In [68]:
df.filter(df['SepalLengthCm']>5).show()

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
| 11|          5.4|         3.7|          1.5|         0.2|Iris-setosa|
| 15|          5.8|         4.0|          1.2|         0.2|Iris-setosa|
| 16|          5.7|         4.4|          1.5|         0.4|Iris-setosa|
| 17|          5.4|         3.9|          1.3|         0.4|Iris-setosa|
| 18|          5.1|         3.5|          1.4|         0.3|Iris-setosa|
| 19|          5.7|         3.8|          1.7|         0.3|Iris-setosa|
| 20|          5.1|         3.8|          1.5|         0.3|Iris-setosa|
| 21|          5.4|         3.4|          1.7|         0.2|Iris-setosa|
| 22|          5.1|         3.7|          1.5|         0.4|Iris-

In [74]:
# multiple filters
df.filter(  (df['SepalLengthCm']>5) &
            (df['SepalLengthCm']<=5.5)  ).show()

# ~ not condition
# | or conditionn
# & and condition

+---+-------------+------------+-------------+------------+---------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|        Species|
+---+-------------+------------+-------------+------------+---------------+
|  1|          5.1|         3.5|          1.4|         0.2|    Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|    Iris-setosa|
| 11|          5.4|         3.7|          1.5|         0.2|    Iris-setosa|
| 17|          5.4|         3.9|          1.3|         0.4|    Iris-setosa|
| 18|          5.1|         3.5|          1.4|         0.3|    Iris-setosa|
| 20|          5.1|         3.8|          1.5|         0.3|    Iris-setosa|
| 21|          5.4|         3.4|          1.7|         0.2|    Iris-setosa|
| 22|          5.1|         3.7|          1.5|         0.4|    Iris-setosa|
| 24|          5.1|         3.3|          1.7|         0.5|    Iris-setosa|
| 28|          5.2|         3.5|          1.5|         0.2|    Iris-setosa|
| 29|       

# PySpark GroupBy and Aggregate functions

In [85]:
# groupby species to find the sum/max/min/mean per class
df.groupBy('Species').mean().show()
df.groupBy('Species').count().show() # number of samples in each class

+---------------+-------+------------------+------------------+------------------+------------------+
|        Species|avg(Id)|avg(SepalLengthCm)| avg(SepalWidthCm)|avg(PetalLengthCm)| avg(PetalWidthCm)|
+---------------+-------+------------------+------------------+------------------+------------------+
| Iris-virginica|  125.5| 6.587999999999998|2.9739999999999998|             5.552|             2.026|
|    Iris-setosa|   25.5| 5.005999999999999|3.4180000000000006|             1.464|0.2439999999999999|
|Iris-versicolor|   75.5|             5.936|2.7700000000000005|              4.26|1.3259999999999998|
+---------------+-------+------------------+------------------+------------------+------------------+

+---------------+-----+
|        Species|count|
+---------------+-----+
| Iris-virginica|   50|
|    Iris-setosa|   50|
|Iris-versicolor|   50|
+---------------+-----+



In [87]:
df.agg({'SepalLengthCm': 'sum'}).show() # show the total sum of sepal length

+------------------+
|sum(SepalLengthCm)|
+------------------+
| 876.5000000000002|
+------------------+



# Fitting a model and prediction

In [99]:
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols = ['SepalLengthCm', 'SepalWidthCm'], outputCol = 'Independent')
output = feature_assembler.transform(df)
output.show()

+---+-------------+------------+-------------+------------+-----------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|Independent|
+---+-------------+------------+-------------+------------+-----------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|  [5.1,3.5]|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|  [4.9,3.0]|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|  [4.7,3.2]|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|  [4.6,3.1]|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|  [5.0,3.6]|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|  [5.4,3.9]|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|  [4.6,3.4]|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|  [5.0,3.4]|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|  [4.

In [104]:
df_final = output.select("Independent",'PetalLengthCm')
df_final.show()

+-----------+-------------+
|Independent|PetalLengthCm|
+-----------+-------------+
|  [5.1,3.5]|          1.4|
|  [4.9,3.0]|          1.4|
|  [4.7,3.2]|          1.3|
|  [4.6,3.1]|          1.5|
|  [5.0,3.6]|          1.4|
|  [5.4,3.9]|          1.7|
|  [4.6,3.4]|          1.4|
|  [5.0,3.4]|          1.5|
|  [4.4,2.9]|          1.4|
|  [4.9,3.1]|          1.5|
|  [5.4,3.7]|          1.5|
|  [4.8,3.4]|          1.6|
|  [4.8,3.0]|          1.4|
|  [4.3,3.0]|          1.1|
|  [5.8,4.0]|          1.2|
|  [5.7,4.4]|          1.5|
|  [5.4,3.9]|          1.3|
|  [5.1,3.5]|          1.4|
|  [5.7,3.8]|          1.7|
|  [5.1,3.8]|          1.5|
+-----------+-------------+
only showing top 20 rows



In [107]:
# train-test split
from pyspark.ml.regression import LinearRegression
train_data, test_data = df_final.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='Independent', labelCol='PetalLengthCm')
regressor = regressor.fit(train_data)
print(regressor.coefficients)
print(regressor.intercept)

[1.7836386691079102,-1.39280659182604]
-2.44840734665302


## Prediction

In [108]:
preds = regressor.evaluate(test_data)
preds.predictions.show()

+-----------+-------------+------------------+
|Independent|PetalLengthCm|        prediction|
+-----------+-------------+------------------+
|  [4.6,3.6]|          1.0|0.7422268006696222|
|  [4.9,2.4]|          3.3|2.9486863115932445|
|  [5.0,2.0]|          3.5|3.6841728152344513|
|  [5.0,3.4]|          1.6|1.7342435866779957|
|  [5.0,3.5]|          1.3|1.5949629274953918|
|  [5.1,2.5]|          3.0| 3.166133386232222|
|  [5.1,3.3]|          1.7| 2.051888112771391|
|  [5.1,3.8]|          1.5|1.3554848168583704|
|  [5.1,3.8]|          1.9|1.3554848168583704|
|  [5.2,3.5]|          1.5|1.9516906613169742|
|  [5.2,4.1]|          1.5|  1.11600670622135|
|  [5.4,3.9]|          1.7|1.7512957584081401|
|  [5.5,2.4]|          3.7| 4.018869513057991|
|  [5.6,2.5]|          3.9| 4.057952720786176|
|  [5.7,2.9]|          4.2| 3.679193950966552|
|  [5.7,4.4]|          1.5|1.5899840632274915|
|  [5.8,2.7]|          5.1|4.1361191362425505|
|  [5.8,2.8]|          5.1| 3.996838477059948|
|  [5.9,3.2]|



In [109]:
preds.meanAbsoluteError

0.49019888417641805