In [1]:
#Importing necessary libraries

# Importing pyspark spark functions in python
import pyspark

# Importing pandas to perform operations using dataframes
import pandas as pd

# Importing numpy to perform matrix operations
import numpy as np

# Importing matlot.lib to plot graphs
import matplotlib.pyplot as plt

# Inporting SparkSession from pyspark.sql to create a spark session
from pyspark.sql import SparkSession

In [2]:
# Creating a spark session
# This defines the entry point to programming spark with dataset and dataframe API

# appName(name)- Sets a name for the application, which will be shown in the Spark web UI
# getOrCreate()- Gets an existing SparkSession or, if there is no existing one, creates a 
#                new one based on the options set in this builder

spark = SparkSession.builder.appName('incomecs').getOrCreate()


In [3]:
# Reading the data
incomeData=spark.read.csv("incomeScaledData.csv",header=True,\
                          inferSchema=True)

In [4]:
type(incomeData)

pyspark.sql.dataframe.DataFrame

In [5]:
# Structure of data
incomeData.printSchema()

root
 |-- age: string (nullable = true)
 |-- JobType: string (nullable = true)
 |-- EdType: string (nullable = true)
 |-- maritalstatus: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capitalgain: double (nullable = true)
 |-- capitalloss: double (nullable = true)
 |-- hoursperweek: double (nullable = true)
 |-- SalStat: string (nullable = true)



In [6]:
incomeData=incomeData.withColumn('age',incomeData['age'].astype('float'))

In [7]:
incomeData.printSchema()

root
 |-- age: float (nullable = true)
 |-- JobType: string (nullable = true)
 |-- EdType: string (nullable = true)
 |-- maritalstatus: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capitalgain: double (nullable = true)
 |-- capitalloss: double (nullable = true)
 |-- hoursperweek: double (nullable = true)
 |-- SalStat: string (nullable = true)



In [8]:
# Columns in dataframe
incomeData.columns
print(incomeData.columns)

['age', 'JobType', 'EdType', 'maritalstatus', 'occupation', 'relationship', 'race', 'gender', 'capitalgain', 'capitalloss', 'hoursperweek', 'SalStat']


In [9]:
# No. of columns
l=len(incomeData.columns)
print(l)

12


In [10]:
# First five rows
incomeData.take(5)

[Row(age=39.0, JobType=' State-gov', EdType=' Bachelors', maritalstatus=' Never-married', occupation=' Adm-clerical', relationship=' Husband', race=' White', gender=' Male', capitalgain=2174.0, capitalloss=0.0, hoursperweek=40.0, SalStat=' less than or equal to 50,000'),
 Row(age=50.0, JobType=' Self-emp-not-inc', EdType=' Bachelors', maritalstatus=' Married-civ-spouse', occupation=' Exec-managerial', relationship=' Other-relative', race=' Amer-Indian-Eskimo', gender=' Male', capitalgain=0.0, capitalloss=0.0, hoursperweek=13.0, SalStat=' less than or equal to 50,000'),
 Row(age=38.0, JobType=' Private', EdType=' HS-grad', maritalstatus=' Divorced', occupation=' Handlers-cleaners', relationship=' Own-child', race=' Black', gender=' Male', capitalgain=0.0, capitalloss=0.0, hoursperweek=40.0, SalStat=' less than or equal to 50,000'),
 Row(age=53.0, JobType=' Private', EdType=' 11th', maritalstatus=' Married-civ-spouse', occupation=' Handlers-cleaners', relationship=' Not-in-family', race=

In [11]:
# Pandas df gives a neater output for understanding

pd.DataFrame(incomeData.take(5), columns=incomeData.columns)

Unnamed: 0,age,JobType,EdType,maritalstatus,occupation,relationship,race,gender,capitalgain,capitalloss,hoursperweek,SalStat
0,39.0,State-gov,Bachelors,Never-married,Adm-clerical,Husband,White,Male,2174.0,0.0,40.0,"less than or equal to 50,000"
1,50.0,Self-emp-not-inc,Bachelors,Married-civ-spouse,Exec-managerial,Other-relative,Amer-Indian-Eskimo,Male,0.0,0.0,13.0,"less than or equal to 50,000"
2,38.0,Private,HS-grad,Divorced,Handlers-cleaners,Own-child,Black,Male,0.0,0.0,40.0,"less than or equal to 50,000"
3,53.0,Private,11th,Married-civ-spouse,Handlers-cleaners,Not-in-family,Black,Male,0.0,0.0,40.0,"less than or equal to 50,000"
4,28.0,Private,Bachelors,Married-civ-spouse,Prof-specialty,Wife,Asian-Pac-Islander,Female,0.0,0.0,40.0,"less than or equal to 50,000"


In [12]:
# Count of categories in JobType
incomeData.groupby('JobType').count().show()

+-----------------+------+
|          JobType| count|
+-----------------+------+
|        State-gov| 15658|
|      Federal-gov| 16452|
|             null|     2|
| Self-emp-not-inc| 23202|
|        Local-gov| 26975|
|          Private|170730|
|               NA| 13544|
|     Self-emp-inc| 11570|
|      Without-pay|    64|
|     Never-worked|    38|
+-----------------+------+



In [13]:
# Count of categories in Occupation
incomeData.groupby('occupation').count().show()

+------------------+-----+
|        occupation|count|
+------------------+-----+
|   Farming-fishing| 6675|
|              null|    2|
| Handlers-cleaners| 8986|
|    Prof-specialty|46420|
|      Adm-clerical|29323|
|   Exec-managerial|43751|
|      Craft-repair|32406|
|             Sales|29816|
|      Tech-support| 8751|
|                NA|13408|
|  Transport-moving|12099|
|   Protective-serv| 9416|
|      Armed-Forces|   71|
| Machine-op-inspct|13835|
|     Other-service|22378|
|   Priv-house-serv|  898|
+------------------+-----+



In [15]:
# Storing categories in df_for_mode
df_for_mode = incomeData.groupBy("JobType").count()
df_for_mode.show()
print(df_for_mode)

+-----------------+------+
|          JobType| count|
+-----------------+------+
|        State-gov| 15658|
|      Federal-gov| 16452|
|             null|     2|
| Self-emp-not-inc| 23202|
|        Local-gov| 26975|
|          Private|170730|
|               NA| 13544|
|     Self-emp-inc| 11570|
|      Without-pay|    64|
|     Never-worked|    38|
+-----------------+------+

DataFrame[JobType: string, count: bigint]


In [16]:
# Finding the mode
mode_jobtype = df_for_mode.orderBy(df_for_mode['count'].desc()).collect()[0][0]

In [20]:
print(mode_jobtype)

 Private


In [21]:
df_for_mode.orderBy(df_for_mode['count'].desc()).collect()

[Row(JobType=' Private', count=170730),
 Row(JobType=' Local-gov', count=26975),
 Row(JobType=' Self-emp-not-inc', count=23202),
 Row(JobType=' Federal-gov', count=16452),
 Row(JobType=' State-gov', count=15658),
 Row(JobType=' NA', count=13544),
 Row(JobType=' Self-emp-inc', count=11570),
 Row(JobType=' Without-pay', count=64),
 Row(JobType=' Never-worked', count=38),
 Row(JobType=None, count=2)]

In [23]:
df_for_mode.orderBy(df_for_mode['count'],ascending=True).collect()
df_for_mode.orderBy(df_for_mode['count'].asc()).collect()

[Row(JobType=None, count=2),
 Row(JobType=' Never-worked', count=38),
 Row(JobType=' Without-pay', count=64),
 Row(JobType=' Self-emp-inc', count=11570),
 Row(JobType=' NA', count=13544),
 Row(JobType=' State-gov', count=15658),
 Row(JobType=' Federal-gov', count=16452),
 Row(JobType=' Self-emp-not-inc', count=23202),
 Row(JobType=' Local-gov', count=26975),
 Row(JobType=' Private', count=170730)]

In [24]:
#Currently Imputer does not support categorical features and possibly creates incorrect 
#values for a categorical feature.
#So let's impute NA with mode by ourselves.
from pyspark.sql.functions import when

In [25]:
# Impute NA with mode
incomeData = incomeData.withColumn("JobType",\
             when(incomeData["JobType"] == " NA", mode_jobtype)\
                                   .otherwise(incomeData["JobType"]))

df_for_mode = incomeData.groupBy("occupation").count()

#Mode for occupation
mode_occ = df_for_mode.orderBy(df_for_mode['count'].desc()).collect()[0][0]

incomeData = incomeData.withColumn("occupation",\
             when(incomeData["occupation"] == " NA", mode_occ)\
                                   .otherwise(incomeData["occupation"]))

In [26]:
incomeData.groupby('JobType').count().show()

+-----------------+------+
|          JobType| count|
+-----------------+------+
|        State-gov| 15658|
|      Federal-gov| 16452|
|             null|     2|
| Self-emp-not-inc| 23202|
|        Local-gov| 26975|
|          Private|184274|
|     Self-emp-inc| 11570|
|      Without-pay|    64|
|     Never-worked|    38|
+-----------------+------+



In [27]:
print("Mode of JobType:"+mode_jobtype)
print("Mode of Occupation:"+mode_occ)

Mode of JobType: Private
Mode of Occupation: Prof-specialty


In [28]:
incomeData.groupby('JobType').count().show()

+-----------------+------+
|          JobType| count|
+-----------------+------+
|        State-gov| 15658|
|      Federal-gov| 16452|
|             null|     2|
| Self-emp-not-inc| 23202|
|        Local-gov| 26975|
|          Private|184274|
|     Self-emp-inc| 11570|
|      Without-pay|    64|
|     Never-worked|    38|
+-----------------+------+



In [29]:
incomeData.groupby('occupation').count().show()
#All NAs have been replaced

+------------------+-----+
|        occupation|count|
+------------------+-----+
|   Farming-fishing| 6675|
|              null|    2|
| Handlers-cleaners| 8986|
|    Prof-specialty|59828|
|      Adm-clerical|29323|
|   Exec-managerial|43751|
|      Craft-repair|32406|
|             Sales|29816|
|      Tech-support| 8751|
|  Transport-moving|12099|
|   Protective-serv| 9416|
|      Armed-Forces|   71|
| Machine-op-inspct|13835|
|     Other-service|22378|
|   Priv-house-serv|  898|
+------------------+-----+



In [30]:
#Convert type of age,capitalgain, capitalloss,hoursperweek into double from string
from pyspark.sql.types import DoubleType
incomeData = incomeData.withColumn("age", incomeData["age"].cast(DoubleType()))
incomeData = incomeData.withColumn("capitalgain", incomeData["capitalgain"].cast(DoubleType()))
incomeData = incomeData.withColumn("capitalloss", incomeData["capitalloss"].cast(DoubleType()))
incomeData = incomeData.withColumn("hoursperweek", incomeData["hoursperweek"].cast(DoubleType()))
incomeData.printSchema()


root
 |-- age: double (nullable = true)
 |-- JobType: string (nullable = true)
 |-- EdType: string (nullable = true)
 |-- maritalstatus: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capitalgain: double (nullable = true)
 |-- capitalloss: double (nullable = true)
 |-- hoursperweek: double (nullable = true)
 |-- SalStat: string (nullable = true)



In [31]:
cols=incomeData.columns
pd.DataFrame(incomeData.take(5), columns=cols).transpose()

Unnamed: 0,0,1,2,3,4
age,39,50,38,53,28
JobType,State-gov,Self-emp-not-inc,Private,Private,Private
EdType,Bachelors,Bachelors,HS-grad,11th,Bachelors
maritalstatus,Never-married,Married-civ-spouse,Divorced,Married-civ-spouse,Married-civ-spouse
occupation,Adm-clerical,Exec-managerial,Handlers-cleaners,Handlers-cleaners,Prof-specialty
relationship,Husband,Other-relative,Own-child,Not-in-family,Wife
race,White,Amer-Indian-Eskimo,Black,Black,Asian-Pac-Islander
gender,Male,Male,Male,Male,Female
capitalgain,2174,0,0,0,0
capitalloss,0,0,0,0,0


In [32]:
incomeData.dtypes

[('age', 'double'),
 ('JobType', 'string'),
 ('EdType', 'string'),
 ('maritalstatus', 'string'),
 ('occupation', 'string'),
 ('relationship', 'string'),
 ('race', 'string'),
 ('gender', 'string'),
 ('capitalgain', 'double'),
 ('capitalloss', 'double'),
 ('hoursperweek', 'double'),
 ('SalStat', 'string')]

In [33]:
#Summary statistics of numeric variables

numeric_features = [t[0] for t in incomeData.dtypes\
                    if t[1] == 'double']
incomeData.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,278233,40.22300911212262,12.78102910600039,17.0,90.0
capitalgain,278232,1745.313651429856,8919.251552558157,0.0,99999.0
capitalloss,278232,104.14219225224916,428.88637721869634,0.0,4356.0
hoursperweek,278232,41.85216309645483,11.664976178814422,1.0,99.0


In [34]:
numeric_data = incomeData.select(numeric_features).toPandas()
axs = pd.plotting.scatter_matrix(numeric_data, figsize=(8, 8));
#Output is a matrix of 8 plots.
#The off-diagonal plots are scatter plots of the corresponding axes mentioned in the matrix
#Since plotting a variable vs itself would obviously be a straight line,
#the function gives extra details in diagonal element in the form of histogram about that feature


In [35]:
#Preparing Data for Machine Learning
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['JobType','EdType','maritalstatus','occupation','relationship','race','gender']
stages = []


In [36]:
#Converting categorical variable to integer indexes
#Converts the indexed categories into one-hot encoded variables
for categoricalCol in categoricalColumns:
    stringIndexer=StringIndexer(inputCol= categoricalCol, \
                                outputCol= categoricalCol + 'Index')\
                                .setHandleInvalid("skip")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer\
                                                .getOutputCol()], \
                                     outputCols=[categoricalCol + \
                                                 "classVec"])
    stages += [stringIndexer, encoder]


In [37]:
#StringIndexer to encode our labels to label indices
label_stringIdx = StringIndexer(inputCol = 'SalStat', \
                                outputCol = 'label')\
                                .setHandleInvalid("skip")
stages += [label_stringIdx]

numericCols = ['age','capitalgain','capitalloss','hoursperweek']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + \
                   numericCols
#We use the VectorAssembler to combine all the feature columns into a single vector column    
assembler = VectorAssembler(inputCols=assemblerInputs, \
                            outputCol="features")
stages += [assembler]

In [38]:
#SQL commands
sql_sc = SQLContext(sc)
incomeData.registerTempTable("incomeData")


In [39]:
sql_sc.sql("SELECT age,JobType,hoursperweek FROM incomeData").show(5)

+----+-----------------+------------+
| age|          JobType|hoursperweek|
+----+-----------------+------------+
|39.0|        State-gov|        40.0|
|50.0| Self-emp-not-inc|        13.0|
|38.0|          Private|        40.0|
|53.0|          Private|        40.0|
|28.0|          Private|        40.0|
+----+-----------------+------------+
only showing top 5 rows



In [40]:
sql_sc.sql("SELECT age,JobType,hoursperweek FROM incomeData WHERE age>40 AND gender==' Female'").show(5)

+----+-----------------+------------+
| age|          JobType|hoursperweek|
+----+-----------------+------------+
|49.0|          Private|        16.0|
|43.0| Self-emp-not-inc|        45.0|
|54.0|          Private|        20.0|
|59.0|          Private|        40.0|
|49.0|          Private|        40.0|
+----+-----------------+------------+
only showing top 5 rows



In [41]:
#Pipeline for our machine learning workflow. 
from pyspark.ml import Pipeline


In [42]:
pipeline = Pipeline(stages = stages)

In [43]:
pipelineModel = pipeline.fit(incomeData)

In [44]:
incomeData = pipelineModel.transform(incomeData)

In [45]:
#Add 2 columns label, features - the format needed to apply ml models in spark
selectedCols = ['label', 'features'] + cols
incomeData = incomeData.select(selectedCols)
incomeData.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: double (nullable = true)
 |-- JobType: string (nullable = true)
 |-- EdType: string (nullable = true)
 |-- maritalstatus: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capitalgain: double (nullable = true)
 |-- capitalloss: double (nullable = true)
 |-- hoursperweek: double (nullable = true)
 |-- SalStat: string (nullable = true)



In [46]:
#Random split into 70% training and 30% testing data
train, test = incomeData.randomSplit([0.7, 0.3], seed = 40)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 194984
Test Dataset Count: 83248


In [48]:
#LOGISTIC REGRESSION
from pyspark.ml.classification import LogisticRegression

# Simplest
lr1 = LogisticRegression(featuresCol = 'features', \
                         labelCol = 'label', maxIter=10)
# Fit the model
lr1Model = lr1.fit(train)

#lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
trainingSummary1 = lr1Model.summary

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lr1Model.coefficients))
print("Intercept: " + str(lr1Model.intercept))

Coefficients: [-1.2167510667640105,-0.6034407219140154,-1.0374477319649429,0.27325106902107715,-0.5258837994762674,-0.6838946205179618,-6.6902543034397315,-1.0453770453610145,-0.6277182386146432,0.13135009892027352,0.523857363397051,-0.04633030963708384,-0.2373138071602859,-1.5435123583088863,1.0057996108081797,-1.9826773108135807,0.9745606122509515,-2.992552789758456,-2.3446992690900137,-1.3596981578515948,-2.333062828239811,-3.161849481820656,0.029130625147540924,-1.392259781032186,-0.7558725013703184,-1.728488658931451,-1.7632356899288397,-1.0203445129916326,-0.22616898487738069,0.09309561036361755,-0.43076274856813945,-0.37391288180975446,-0.8157938545458632,-1.2434588423152786,-0.8122513952348925,-0.7976282100451864,0.46346655116049945,-1.2276125306010266,-0.1465788025328594,-2.1258172154146577,-4.860585243144027,-0.5320721628726202,-0.5256840593527324,-0.5410336824971408,-0.5451262653996516,-0.5455253047769468,-0.5322589284934202,0.0,-0.5391828301299658,-0.5544901602277448,-0.539

In [49]:
#Make predictions on the test set.
predictions1 = lr1Model.transform(test)
#Evaluate performance on Test Set
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Performance on Test Data:" + str(evaluator.evaluate(predictions1)))

Performance on Test Data:0.8784780198385825


In [50]:
#DECISION TREE MODEL
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)


In [51]:
#Make predictions on the test set.
predictions = dtModel.transform(test)
#Evaluate our Decision Tree model on test set.
evaluator = BinaryClassificationEvaluator()
print("Performance on Test Data:" + str(evaluator.evaluate(predictions)))

#Prediction accuracy of decision trees can be improved by Ensemble methods

Performance on Test Data:0.6504354193001778


In [27]:
#RANDOM FOREST
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)

In [28]:
#Make predictions on Test Set
predictions = rfModel.transform(test)

#Evaluate our Random Forest Classifier on test set.
evaluator = BinaryClassificationEvaluator()
print("Performance on Test Data:" + str(evaluator.evaluate(predictions)))

Performance on Test Data:0.874208519839909


In [29]:
#Gradient-Boosted Classifier
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)

In [30]:
#Make predictions on Test Set
predictions = gbtModel.transform(test)
#Evaluate our Gradient-Boosted Tree Classifier on test set.
evaluator = BinaryClassificationEvaluator()
print("Performance on Test Data:" + str(evaluator.evaluate(predictions)))

Performance on Test Data:0.8912530269853451


In [31]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nbModel = nb.fit(train)

In [32]:
#Make predictions on Test Set
predictions = nbModel.transform(test)
#Evaluate our Naive Bayes Classifier.
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Performance on Test Data:" + str(evaluator.evaluate(predictions)))

Performance on Test Data:0.2233801293438053
