<a href="https://colab.research.google.com/github/KAMBLE/ML-with-Pyspark/blob/main/ML_Pipeline_in_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Configure Pyspark in Colab 

In [5]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

### for permanent colab library installation 

In [2]:
import os, sys
from google.colab import drive
drive.mount('/content/drive')
nb_path = '/content/notebooks'
os.symlink('/content/drive/My Drive/Colab Notebooks', nb_path)
sys.path.insert(0,nb_path)

Mounted at /content/drive


In [4]:
!pip install --target=$nb_path findspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2


In [None]:
!tar -xvzf '/content/drive/MyDrive/SPARK/spark-3.1.1-bin-hadoop2.7.tgz'

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.2-bin-hadoop2.7"

In [8]:
import findspark
findspark.init("/content/spark-3.1.1-bin-hadoop2.7")
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [9]:
spark

# Build ML pipeline in pyspark

In [39]:
# read a csv file
my_data = spark.read.csv('/content/Employee-Attrition.csv',header=True)

# see the default schema of the dataframe
my_data.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- BusinessTravel: string (nullable = true)
 |-- DailyRate: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- DistanceFromHome: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- EducationField: string (nullable = true)
 |-- EmployeeCount: string (nullable = true)
 |-- EmployeeNumber: string (nullable = true)
 |-- EnvironmentSatisfaction: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HourlyRate: string (nullable = true)
 |-- JobInvolvement: string (nullable = true)
 |-- JobLevel: string (nullable = true)
 |-- JobRole: string (nullable = true)
 |-- JobSatisfaction: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- MonthlyIncome: string (nullable = true)
 |-- MonthlyRate: string (nullable = true)
 |-- NumCompaniesWorked: string (nullable = true)
 |-- Over18: string (nullable = true)
 |-- OverTime: string (nullable = tr

In [11]:
#check shape of data 
(my_data.count() , len(my_data.columns))

(1470, 35)

In [12]:
#describe data 
my_data.describe().show()

+-------+------------------+---------+--------------+------------------+---------------+----------------+------------------+----------------+-------------+-----------------+-----------------------+------+------------------+------------------+------------------+--------------------+------------------+-------------+-----------------+------------------+------------------+------+--------+------------------+-------------------+------------------------+-------------+------------------+------------------+---------------------+------------------+------------------+------------------+-----------------------+--------------------+
|summary|               Age|Attrition|BusinessTravel|         DailyRate|     Department|DistanceFromHome|         Education|  EducationField|EmployeeCount|   EmployeeNumber|EnvironmentSatisfaction|Gender|        HourlyRate|    JobInvolvement|          JobLevel|             JobRole|   JobSatisfaction|MaritalStatus|    MonthlyIncome|       MonthlyRate|NumCompaniesWorked|O

In [13]:
#show top 5 rows 
my_data.show(n=5)

+---+---------+-----------------+---------+--------------------+----------------+---------+--------------+-------------+--------------+-----------------------+------+----------+--------------+--------+--------------------+---------------+-------------+-------------+-----------+------------------+------+--------+-----------------+-----------------+------------------------+-------------+----------------+-----------------+---------------------+---------------+--------------+------------------+-----------------------+--------------------+
|Age|Attrition|   BusinessTravel|DailyRate|          Department|DistanceFromHome|Education|EducationField|EmployeeCount|EmployeeNumber|EnvironmentSatisfaction|Gender|HourlyRate|JobInvolvement|JobLevel|             JobRole|JobSatisfaction|MaritalStatus|MonthlyIncome|MonthlyRate|NumCompaniesWorked|Over18|OverTime|PercentSalaryHike|PerformanceRating|RelationshipSatisfaction|StandardHours|StockOptionLevel|TotalWorkingYears|TrainingTimesLastYear|WorkLifeBalanc

In [14]:
# import sql function pyspark
import pyspark.sql.functions as f

# null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
data_agg.show()

+---+---------+--------------+---------+----------+----------------+---------+--------------+-------------+--------------+-----------------------+------+----------+--------------+--------+-------+---------------+-------------+-------------+-----------+------------------+------+--------+-----------------+-----------------+------------------------+-------------+----------------+-----------------+---------------------+---------------+--------------+------------------+-----------------------+--------------------+
|Age|Attrition|BusinessTravel|DailyRate|Department|DistanceFromHome|Education|EducationField|EmployeeCount|EmployeeNumber|EnvironmentSatisfaction|Gender|HourlyRate|JobInvolvement|JobLevel|JobRole|JobSatisfaction|MaritalStatus|MonthlyIncome|MonthlyRate|NumCompaniesWorked|Over18|OverTime|PercentSalaryHike|PerformanceRating|RelationshipSatisfaction|StandardHours|StockOptionLevel|TotalWorkingYears|TrainingTimesLastYear|WorkLifeBalance|YearsAtCompany|YearsInCurrentRole|YearsSinceLastPr

In [15]:
# value counts of Batsman_Name column
my_data.groupBy('BusinessTravel').count().show()

+-----------------+-----+
|   BusinessTravel|count|
+-----------------+-----+
|Travel_Frequently|  277|
|       Non-Travel|  150|
|    Travel_Rarely| 1043|
+-----------------+-----+



In [16]:
my_data.groupBy('Department').count().show()

+--------------------+-----+
|          Department|count|
+--------------------+-----+
|               Sales|  446|
|Research & Develo...|  961|
|     Human Resources|   63|
+--------------------+-----+



## Encoding Variable 

In [40]:
from pyspark.ml.feature import StringIndexer

#create object StringIndexer class and specify input and ouput column 
SI_BTravel = StringIndexer(inputCol='BusinessTravel',outputCol='BTravel_Index')
SI_Department = StringIndexer(inputCol='Department',outputCol='Department_Index')

# transform the data
my_data = SI_BTravel.fit(my_data).transform(my_data)
my_data = SI_Department.fit(my_data).transform(my_data)

#my_data.select('BusinessTravel','BTravel_Index','Department','Department_Index')

In [41]:
my_data.select('BusinessTravel','BTravel_Index','Department','Department_Index').show(20)

+-----------------+-------------+--------------------+----------------+
|   BusinessTravel|BTravel_Index|          Department|Department_Index|
+-----------------+-------------+--------------------+----------------+
|    Travel_Rarely|          0.0|               Sales|             1.0|
|Travel_Frequently|          1.0|Research & Develo...|             0.0|
|    Travel_Rarely|          0.0|Research & Develo...|             0.0|
|Travel_Frequently|          1.0|Research & Develo...|             0.0|
|    Travel_Rarely|          0.0|Research & Develo...|             0.0|
|Travel_Frequently|          1.0|Research & Develo...|             0.0|
|    Travel_Rarely|          0.0|Research & Develo...|             0.0|
|    Travel_Rarely|          0.0|Research & Develo...|             0.0|
|Travel_Frequently|          1.0|Research & Develo...|             0.0|
|    Travel_Rarely|          0.0|Research & Develo...|             0.0|
|    Travel_Rarely|          0.0|Research & Develo...|          

In [25]:
from pyspark.ml.feature import OneHotEncoder

In [43]:
OHE = OneHotEncoder(inputCols=['BTravel_Index','Department_Index'],outputCols=['BTravel_OHE','Department_OHE'])
# transform the data
my_data = OHE.fit(my_data).transform(my_data)

my_data.select('BusinessTravel','BTravel_Index','BTravel_OHE','Department','Department_Index','Department_OHE').show(20)

+-----------------+-------------+-------------+--------------------+----------------+--------------+
|   BusinessTravel|BTravel_Index|  BTravel_OHE|          Department|Department_Index|Department_OHE|
+-----------------+-------------+-------------+--------------------+----------------+--------------+
|    Travel_Rarely|          0.0|(2,[0],[1.0])|               Sales|             1.0| (2,[1],[1.0])|
|Travel_Frequently|          1.0|(2,[1],[1.0])|Research & Develo...|             0.0| (2,[0],[1.0])|
|    Travel_Rarely|          0.0|(2,[0],[1.0])|Research & Develo...|             0.0| (2,[0],[1.0])|
|Travel_Frequently|          1.0|(2,[1],[1.0])|Research & Develo...|             0.0| (2,[0],[1.0])|
|    Travel_Rarely|          0.0|(2,[0],[1.0])|Research & Develo...|             0.0| (2,[0],[1.0])|
|Travel_Frequently|          1.0|(2,[1],[1.0])|Research & Develo...|             0.0| (2,[0],[1.0])|
|    Travel_Rarely|          0.0|(2,[0],[1.0])|Research & Develo...|             0.0| (2,[0

In [44]:
my_data.columns

['Age',
 'Attrition',
 'BusinessTravel',
 'DailyRate',
 'Department',
 'DistanceFromHome',
 'Education',
 'EducationField',
 'EmployeeCount',
 'EmployeeNumber',
 'EnvironmentSatisfaction',
 'Gender',
 'HourlyRate',
 'JobInvolvement',
 'JobLevel',
 'JobRole',
 'JobSatisfaction',
 'MaritalStatus',
 'MonthlyIncome',
 'MonthlyRate',
 'NumCompaniesWorked',
 'Over18',
 'OverTime',
 'PercentSalaryHike',
 'PerformanceRating',
 'RelationshipSatisfaction',
 'StandardHours',
 'StockOptionLevel',
 'TotalWorkingYears',
 'TrainingTimesLastYear',
 'WorkLifeBalance',
 'YearsAtCompany',
 'YearsInCurrentRole',
 'YearsSinceLastPromotion',
 'YearsWithCurrManager',
 'BTravel_Index',
 'Department_Index',
 'BTravel_OHE',
 'Department_OHE']

## vector assembler 

In [None]:
 # Change column type
# df_new = my_data.withColumn("DailyRate", my_data["DailyRate"].cast(IntegerType()))
# df_new.printSchema()

In [49]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Department_Index',
                                       #'DistanceFromHome',
                                       #'Education',
                                       #'EmployeeCount',
                                       #'MonthlyIncome',
                                       ],
                           outputCol='vector')

# fill the null values
my_data = my_data.fillna(0)

# transform the data
final_data = assembler.transform(my_data)

# view the transformed vector
final_data.select('vector').show()

+------+
|vector|
+------+
| [1.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [0.0]|
| [1.0]|
| [0.0]|
+------+
only showing top 20 rows

