In [None]:
import os
import sys
# Here you need to have same Python version on your local machine and on worker node i.e. EC2. here both should have python3.
os.environ["PYSPARK_PYTHON"] = "/bin/python3"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [None]:
# Creating Spark Session 

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Mllib_Overview').getOrCreate()

In [None]:
#Reading Data from a CSV file 
#Inferring Schema and Setting Header as True

df1 = spark.read.csv('auto-miles-per-gallon-Raw.csv', header=True, inferSchema=False)

In [None]:
#Displaying samples
df1.show() 

In [None]:
#Printing Schema
df1.printSchema() 

In [None]:
#Statistically summarizing about the data
df1.describe().show()

In [None]:
df1.columns

In [None]:
#Convert the datatype of the columns
#Casting MPG, HORSEPOWER, and other columns from String to Double

from pyspark.sql.types import * 

cols = ['MPG', 'CYLINDERS', 'DISPLACEMENT', 'HORSEPOWER', 'WEIGHT', 'ACCELERATION', 'MODELYEAR' ]

for i in cols:
    df1 = df1.withColumn(i, df1[i].cast(DoubleType()))

In [None]:
df1.show()

### Handling Missing Values

In [None]:
df1.show() #Displaying samples 

In [None]:
#Removing Rows with missing values
#Dataframe.na.drop removes any row with a NULL value

df2 = df1.dropna()

In [None]:
df2.describe().show()

### Replace missing values with approximate mean values

### Estimators 

An estimator is a stage of the learning algorithm that fits a model on a dataset. The whole process can be denoted as follows DataFrame =[fit]=> Model. An estimator is executed during the step of building a model using existing data. A model, which will be made by the estimator, itself serves as a transformer. Further, an estimator is available in Java through the Estimator class. The .fit() method, which is used for building a model, is available through this class.

#### Imputer


The Imputer estimator completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. The input columns should be of DoubleType or FloatType

In [None]:
from pyspark.ml.feature import Imputer

# Imputer method automatically replaces null values with mean values.
imputer = Imputer(inputCols = ["MPG","HORSEPOWER"], outputCols = ["MPG-Out","HORSEPOWER-Out"])

#Fitting DataFrame into a model
imputeModel = imputer.fit(df1) 

#Transforming the DataFrame
df3=imputeModel.transform(df1) 

In [None]:
#Describing the dataframe
df3.show()
df3.describe().show();

In [None]:
#Removing unnecessary columns
df3.drop(df3['MPG'])
df3.drop(df3['HORSEPOWER'])

### Feature Transformers

Feature transformer transforms the data stored in a data frame and store the data back as a new data frame. This transformation generally takes place by appending one or more columns to the existing data frame. It can be broken down to simple following sequence DataFrame =[transform]=> DataFrame. Transformer methods are generally executed during the step of preparing and processing the data sets. A transformer is a part of Spark ML and is implemented in Java through the Transformer base class. The .transform() method, which is used for transforming one DataFrame to another, is available through this class.

Link to Doumentation - https://spark.apache.org/docs/latest/ml-features.html#feature-transformers

#### Vector Assembler

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(
    inputCols=["CYLINDERS", "WEIGHT", "HORSEPOWER-Out", "DISPLACEMENT"],
    outputCol="features")

output = assembler.transform(df3)


In [None]:
output['features']

#### Standard Scalar

StandardScaler is an Estimator which can be fit on a dataset to produce a StandardScalerModel. It transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation and/or zero mean

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)


In [None]:
Scaled_Model = scaler.fit(output)

In [None]:
scaled_data = Scaled_Model.transform(output)

In [None]:
scaled_data.show()

### Pipeline

A pipeline is a series of activities or transformations. Machine learning is performed by implementing a set of tasks, that is collecting the data, cleaning the data, building the model, evaluating the model, etc. All these steps need to be performed one after the other in a particular sequence. In short, a pipeline can be considered as a series of activities wrapped together for better representations. Pipelines primarily have four principal objects- DataFrame, Transformer, Estimator, and Evaluator.

In [None]:
#Initialize a new dataframe
data = df1

In [None]:
#import Pipeline and setting the stages of the pipeline
from pyspark.ml import Pipeline

pipeline =  Pipeline(stages = [imputer,assembler,scaler])

In [None]:
#Use .fit() and .transform() on the pipeline

model = pipeline.fit(data)
Final_output = model.transform(data)

In [None]:
#Output of the pipeline

Final_output.show()