In [1]:
import os
import sys
# Here you need to have same Python version on your local machine and on worker node i.e. EC2. here both should have python3.
os.environ["PYSPARK_PYTHON"] = "/bin/python3"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [4]:
# Creating Spark Session 

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Mllib_Overview').getOrCreate()

22/08/10 05:10:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
#Reading Data from a CSV file 
#Inferring Schema and Setting Header as True

df1 = spark.read.csv('auto-miles-per-gallon-Raw.csv', header=True, inferSchema=False)

                                                                                

In [5]:
#Displaying samples
df1.show() 

+-----+---------+------------+----------+------+------------+---------+--------------------+
|  MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|                NAME|
+-----+---------+------------+----------+------+------------+---------+--------------------+
|   18|        8|         307|       130|  3504|          12|       70|chevrolet chevell...|
|   15|        8|         350|       165|  3693|        11.5|       70| buick skylark 320""|
|   18|        8|         318|       150|  3436|          11|       70|plymouth satellite""|
|   16|        8|         304|       150|  3433|          12|       70|     amc rebel sst""|
|   17|        8|         302|       140|  3449|        10.5|       70|       ford torino""|
|   15|        8|         429|      Null|  4341|          10|       70|  ford galaxie 500""|
|   14|        8|         454|       220|  4354|           9|       70|  chevrolet impala""|
|   14|        8|         440|       215|  4312|         8.5|       70

In [6]:
#Printing Schema
df1.printSchema() 

root
 |-- MPG: string (nullable = true)
 |-- CYLINDERS: string (nullable = true)
 |-- DISPLACEMENT: string (nullable = true)
 |-- HORSEPOWER: string (nullable = true)
 |-- WEIGHT: string (nullable = true)
 |-- ACCELERATION: string (nullable = true)
 |-- MODELYEAR: string (nullable = true)
 |-- NAME: string (nullable = true)



In [7]:
#Statistically summarizing about the data
df1.describe().show()

[Stage 2:>                                                          (0 + 1) / 1]

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|summary|               MPG|        CYLINDERS|      DISPLACEMENT|        HORSEPOWER|            WEIGHT|      ACCELERATION|         MODELYEAR|                NAME|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|  count|               406|              406|               406|               406|               406|               406|               406|                 406|
|   mean|23.514572864321615|5.475369458128079| 194.7795566502463|104.56675062972292|2979.4137931034484|15.519704433497521| 75.92118226600985|                null|
| stddev| 7.815984312565783|1.712159631548529|104.92245837948867|  38.1556978120705| 847.0043282393513|2.8033588163425462|3.7487373454558743|                null|
|    min|             

                                                                                

In [8]:
df1.columns

['MPG',
 'CYLINDERS',
 'DISPLACEMENT',
 'HORSEPOWER',
 'WEIGHT',
 'ACCELERATION',
 'MODELYEAR',
 'NAME']

In [9]:
#Convert the datatype of the columns
#Casting MPG, HORSEPOWER, and other columns from String to Double

from pyspark.sql.types import * 

cols = ['MPG', 'CYLINDERS', 'DISPLACEMENT', 'HORSEPOWER', 'WEIGHT', 'ACCELERATION', 'MODELYEAR' ]

for i in cols:
    df1 = df1.withColumn(i, df1[i].cast(DoubleType()))

In [10]:
df1.show()

+----+---------+------------+----------+------+------------+---------+--------------------+
| MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|                NAME|
+----+---------+------------+----------+------+------------+---------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     70.0|chevrolet chevell...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     70.0| buick skylark 320""|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|     70.0|plymouth satellite""|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|     70.0|     amc rebel sst""|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|     70.0|       ford torino""|
|15.0|      8.0|       429.0|      null|4341.0|        10.0|     70.0|  ford galaxie 500""|
|14.0|      8.0|       454.0|     220.0|4354.0|         9.0|     70.0|  chevrolet impala""|
|14.0|      8.0|       440.0|     215.0|4312.0|         8.5|     70.0| plymouth 

### Handling Missing Values

In [11]:
df1.show() #Displaying samples 

+----+---------+------------+----------+------+------------+---------+--------------------+
| MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|                NAME|
+----+---------+------------+----------+------+------------+---------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     70.0|chevrolet chevell...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     70.0| buick skylark 320""|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|     70.0|plymouth satellite""|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|     70.0|     amc rebel sst""|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|     70.0|       ford torino""|
|15.0|      8.0|       429.0|      null|4341.0|        10.0|     70.0|  ford galaxie 500""|
|14.0|      8.0|       454.0|     220.0|4354.0|         9.0|     70.0|  chevrolet impala""|
|14.0|      8.0|       440.0|     215.0|4312.0|         8.5|     70.0| plymouth 

In [12]:
#Removing Rows with missing values
#Dataframe.na.drop removes any row with a NULL value

df2 = df1.dropna()

In [13]:
df2.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|summary|               MPG|         CYLINDERS|      DISPLACEMENT|        HORSEPOWER|            WEIGHT|      ACCELERATION|         MODELYEAR|                NAME|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|  count|               389|               389|               389|               389|               389|               389|               389|                 389|
|   mean|23.505912596401025|5.4575835475578405| 193.1272493573265|103.93830334190231|2970.8817480719795| 15.56992287917736| 76.02570694087403|                null|
| stddev|7.8035440668103035|1.7024467902237224|103.52041672172308|37.849829402174656|  846.593666699619|2.7406345198432867|3.6600880681307015|                null|
|    min|       

### Replace missing values with approximate mean values

### Estimators 

An estimator is a stage of the learning algorithm that fits a model on a dataset. The whole process can be denoted as follows DataFrame =[fit]=> Model. An estimator is executed during the step of building a model using existing data. A model, which will be made by the estimator, itself serves as a transformer. Further, an estimator is available in Java through the Estimator class. The .fit() method, which is used for building a model, is available through this class.

#### Imputer


The Imputer estimator completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. The input columns should be of DoubleType or FloatType

In [17]:
pip install numpy

Defaulting to user installation because normal site-packages is not writeable
Collecting numpy
  Downloading numpy-1.21.6-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (15.7 MB)
[K     |████████████████████████████████| 15.7 MB 23.4 MB/s eta 0:00:01
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.21.6
Note: you may need to restart the kernel to use updated packages.


In [18]:
import numpy as np
from pyspark.ml.feature import Imputer

# Imputer method automatically replaces null values with mean values.
imputer = Imputer(inputCols = ["MPG","HORSEPOWER"], outputCols = ["MPG-Out","HORSEPOWER-Out"])

#Fitting DataFrame into a model
imputeModel = imputer.fit(df1) 

#Transforming the DataFrame
df3=imputeModel.transform(df1) 

In [19]:
#Describing the dataframe
df3.show()
df3.describe().show();

+----+---------+------------+----------+------+------------+---------+--------------------+------------------+------------------+
| MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|                NAME|           MPG-Out|    HORSEPOWER-Out|
+----+---------+------------+----------+------+------------+---------+--------------------+------------------+------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     70.0|chevrolet chevell...|              18.0|             130.0|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     70.0| buick skylark 320""|              15.0|             165.0|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|     70.0|plymouth satellite""|              18.0|             150.0|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|     70.0|     amc rebel sst""|              16.0|             150.0|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|     70.0|       ford torino""

In [20]:
#Removing unnecessary columns
df3.drop(df3['MPG'])
df3.drop(df3['HORSEPOWER'])

DataFrame[MPG: double, CYLINDERS: double, DISPLACEMENT: double, WEIGHT: double, ACCELERATION: double, MODELYEAR: double, NAME: string, MPG-Out: double, HORSEPOWER-Out: double]

### Feature Transformers

Feature transformer transforms the data stored in a data frame and store the data back as a new data frame. This transformation generally takes place by appending one or more columns to the existing data frame. It can be broken down to simple following sequence DataFrame =[transform]=> DataFrame. Transformer methods are generally executed during the step of preparing and processing the data sets. A transformer is a part of Spark ML and is implemented in Java through the Transformer base class. The .transform() method, which is used for transforming one DataFrame to another, is available through this class.

Link to Doumentation - https://spark.apache.org/docs/latest/ml-features.html#feature-transformers

#### Vector Assembler

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

In [21]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(
    inputCols=["CYLINDERS", "WEIGHT", "HORSEPOWER-Out", "DISPLACEMENT"],
    outputCol="features")

output = assembler.transform(df3)


In [25]:
output.show()

+----+---------+------------+----------+------+------------+---------+--------------------+------------------+------------------+--------------------+
| MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|                NAME|           MPG-Out|    HORSEPOWER-Out|            features|
+----+---------+------------+----------+------+------------+---------+--------------------+------------------+------------------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     70.0|chevrolet chevell...|              18.0|             130.0|[8.0,3504.0,130.0...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     70.0| buick skylark 320""|              15.0|             165.0|[8.0,3693.0,165.0...|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|     70.0|plymouth satellite""|              18.0|             150.0|[8.0,3436.0,150.0...|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|     70.0|     amc rebel sst""|   

#### Standard Scalar

StandardScaler is an Estimator which can be fit on a dataset to produce a StandardScalerModel. It transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation and/or zero mean

In [26]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)


In [27]:
Scaled_Model = scaler.fit(output)

In [28]:
scaled_data = Scaled_Model.transform(output)

In [29]:
scaled_data.show()

+----+---------+------------+----------+------+------------+---------+--------------------+------------------+------------------+--------------------+--------------------+
| MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|                NAME|           MPG-Out|    HORSEPOWER-Out|            features|      scaledFeatures|
+----+---------+------------+----------+------+------------+---------+--------------------+------------------+------------------+--------------------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     70.0|chevrolet chevell...|              18.0|             130.0|[8.0,3504.0,130.0...|[4.67246152320771...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     70.0| buick skylark 320""|              15.0|             165.0|[8.0,3693.0,165.0...|[4.67246152320771...|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|     70.0|plymouth satellite""|              18.0|             150.0|[8.0,3436.0

In [30]:
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0]),),
(1, Vectors.dense([10.0]),),
(3, Vectors.dense([20.0]),),
(4, Vectors.dense([30.0]),),
(5, Vectors.dense([50.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()

[Stage 18:>                                                         (0 + 1) / 1]

+--------+--------------+
|features|scaledFeatures|
+--------+--------------+
|   [1.0]|        [0.02]|
|  [10.0]|         [0.2]|
|  [20.0]|         [0.4]|
|  [30.0]|         [0.6]|
|  [50.0]|         [1.0]|
+--------+--------------+



                                                                                

In [31]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
(0, Vectors.dense([3.0, 4.0, 6.0, 8.0, 10.0]),)
], ["id", "features"])

normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=2.0)
NormData = normalizer.transform(dataFrame)
NormData.show()

+---+--------------------+--------------------+
| id|            features|        normFeatures|
+---+--------------------+--------------------+
|  0|[3.0,4.0,6.0,8.0,...|[0.2,0.2666666666...|
+---+--------------------+--------------------+



### Pipeline

A pipeline is a series of activities or transformations. Machine learning is performed by implementing a set of tasks, that is collecting the data, cleaning the data, building the model, evaluating the model, etc. All these steps need to be performed one after the other in a particular sequence. In short, a pipeline can be considered as a series of activities wrapped together for better representations. Pipelines primarily have four principal objects- DataFrame, Transformer, Estimator, and Evaluator.

In [32]:
#Initialize a new dataframe
data = df1

In [33]:
#import Pipeline and setting the stages of the pipeline
from pyspark.ml import Pipeline

pipeline =  Pipeline(stages = [imputer,assembler,scaler])

In [34]:
#Use .fit() and .transform() on the pipeline

model = pipeline.fit(data)
Final_output = model.transform(data)

In [35]:
#Output of the pipeline

Final_output.show()

+----+---------+------------+----------+------+------------+---------+--------------------+------------------+------------------+--------------------+--------------------+
| MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|                NAME|           MPG-Out|    HORSEPOWER-Out|            features|      scaledFeatures|
+----+---------+------------+----------+------+------------+---------+--------------------+------------------+------------------+--------------------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     70.0|chevrolet chevell...|              18.0|             130.0|[8.0,3504.0,130.0...|[1.0,0.6817120622...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     70.0| buick skylark 320""|              15.0|             165.0|[8.0,3693.0,165.0...|[1.0,0.7184824902...|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|     70.0|plymouth satellite""|              18.0|             150.0|[8.0,3436.0

In [36]:
#The end