In [483]:
#importing libraries

In [484]:
from pyspark.sql import SparkSession

In [485]:
#dataset link : https://www.kaggle.com/vardhansiramdasu/income

In [486]:
#Creating basic Spark Session
spark=SparkSession.builder.appName('Income_Dataset').getOrCreate()

In [487]:
df=spark.read.csv('D://M. Tech in Data Science & Machine Learning//Big Data Analytics//Sem_Prep//Income_dataset//Income.csv',header=True,inferSchema=True)
df.show(2)#first 2 observations

+---+------------+--------+--------------+-------------+--------------+------+-------+-----------+-----------+------------+--------------+--------------------+
|age|     JobType|  EdType| maritalstatus|   occupation|  relationship|  race| gender|capitalgain|capitalloss|hoursperweek| nativecountry|             SalStat|
+---+------------+--------+--------------+-------------+--------------+------+-------+-----------+-----------+------------+--------------+--------------------+
| 45|     Private| HS-grad|      Divorced| Adm-clerical| Not-in-family| White| Female|          0|          0|          28| United-States| less than or equ...|
| 24| Federal-gov| HS-grad| Never-married| Armed-Forces|     Own-child| White|   Male|          0|          0|          40| United-States| less than or equ...|
+---+------------+--------+--------------+-------------+--------------+------+-------+-----------+-----------+------------+--------------+--------------------+
only showing top 2 rows



In [488]:
df.columns#list of columns

['age',
 'JobType',
 'EdType',
 'maritalstatus',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capitalgain',
 'capitalloss',
 'hoursperweek',
 'nativecountry',
 'SalStat']

In [489]:
print('Shape of the data: (', len(df.columns),',',df.count(),')')#total records

Shape of the data: ( 13 , 31978 )


In [490]:
df.printSchema()#columns Summary

root
 |-- age: integer (nullable = true)
 |-- JobType: string (nullable = true)
 |-- EdType: string (nullable = true)
 |-- maritalstatus: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capitalgain: integer (nullable = true)
 |-- capitalloss: integer (nullable = true)
 |-- hoursperweek: integer (nullable = true)
 |-- nativecountry: string (nullable = true)
 |-- SalStat: string (nullable = true)



In [491]:
for i , t in df.dtypes:#columns and its datatypes
    print('column name: ',i," | ", 'column datatype: ',t)

column name:  age  |  column datatype:  int
column name:  JobType  |  column datatype:  string
column name:  EdType  |  column datatype:  string
column name:  maritalstatus  |  column datatype:  string
column name:  occupation  |  column datatype:  string
column name:  relationship  |  column datatype:  string
column name:  race  |  column datatype:  string
column name:  gender  |  column datatype:  string
column name:  capitalgain  |  column datatype:  int
column name:  capitalloss  |  column datatype:  int
column name:  hoursperweek  |  column datatype:  int
column name:  nativecountry  |  column datatype:  string
column name:  SalStat  |  column datatype:  string


In [492]:
from pyspark.sql.functions import isnan, col, count, when

In [493]:
#checking for null values
df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c)  for c in df.columns]).show()

+---+-------+------+-------------+----------+------------+----+------+-----------+-----------+------------+-------------+-------+
|age|JobType|EdType|maritalstatus|occupation|relationship|race|gender|capitalgain|capitalloss|hoursperweek|nativecountry|SalStat|
+---+-------+------+-------------+----------+------------+----+------+-----------+-----------+------------+-------------+-------+
|  0|      0|     0|            0|         0|           0|   0|     0|          0|          0|           0|            0|      0|
+---+-------+------+-------------+----------+------------+----+------+-----------+-----------+------------+-------------+-------+



In [494]:
#There are no null values in the dataset

In [495]:
#Bucketizer

#We now group the based on their age group. 
#Here, we will use the Bucketizer transformer. 
#Bucketizer is used for creating group of values of a continuous feature

In [496]:
df.groupby('age').count().orderBy('age',ascending=True).show()

+---+-----+
|age|count|
+---+-----+
| 17|  393|
| 18|  542|
| 19|  707|
| 20|  743|
| 21|  709|
| 22|  753|
| 23|  871|
| 24|  784|
| 25|  830|
| 26|  767|
| 27|  820|
| 28|  848|
| 29|  801|
| 30|  842|
| 31|  870|
| 32|  811|
| 33|  862|
| 34|  862|
| 35|  858|
| 36|  875|
+---+-----+
only showing top 20 rows



In [497]:
df.agg({'age':'min'}).show()

+--------+
|min(age)|
+--------+
|      17|
+--------+



In [498]:
df.agg({'age':'max'}).show()

+--------+
|max(age)|
+--------+
|      90|
+--------+



In [499]:
#we can see age groups are in the range of 17 to 90

In [500]:
from pyspark.ml.feature import Bucketizer

In [501]:
# lets define the age age group splits
splits = [17, 30, 50, 70, 90]
bucketizer=Bucketizer( splits=splits,inputCol='age',outputCol='age_group')
df1=bucketizer.transform(df)

In [502]:
df1.select('age_group').show()

+---------+
|age_group|
+---------+
|      1.0|
|      0.0|
|      1.0|
|      0.0|
|      0.0|
|      1.0|
|      2.0|
|      0.0|
|      0.0|
|      0.0|
|      1.0|
|      2.0|
|      1.0|
|      1.0|
|      1.0|
|      1.0|
|      0.0|
|      1.0|
|      1.0|
|      2.0|
+---------+
only showing top 20 rows



In [503]:
df1.groupby('age_group').count().orderBy('age_group',ascending=True).show()

+---------+-----+
|age_group|count|
+---------+-----+
|      0.0| 9568|
|      1.0|15460|
|      2.0| 6329|
|      3.0|  621|
+---------+-----+



In [504]:
#StringIndexer

#There are three categorical variables in our dataset viz., 'JobType', 'EdType', 'maritalstatus', 'occupation', 'relationship', 'race', 'gender', 'nativecountry'. 
#These variables cannot be directly passed to our ML algorithms. 
#We will converet them into indexes and to do that we will use StringIndexer transformer. 
#StringIndexer converts a string column to an index column. The most frequent label gets index 0

In [505]:
list_categorical_columns=[]
for i , t in df1.dtypes:#getting all the categorical variables in our dataset
    if t == 'string':
        print('column name: ',i," | ", 'column datatype: ',t)
        list_categorical_columns.append(i)
print(' ')
print(list_categorical_columns)

column name:  JobType  |  column datatype:  string
column name:  EdType  |  column datatype:  string
column name:  maritalstatus  |  column datatype:  string
column name:  occupation  |  column datatype:  string
column name:  relationship  |  column datatype:  string
column name:  race  |  column datatype:  string
column name:  gender  |  column datatype:  string
column name:  nativecountry  |  column datatype:  string
column name:  SalStat  |  column datatype:  string
 
['JobType', 'EdType', 'maritalstatus', 'occupation', 'relationship', 'race', 'gender', 'nativecountry', 'SalStat']


In [506]:
from pyspark.ml.feature import StringIndexer

In [507]:
#also reanme the target variable(SalStat) to 'label'
indexer = StringIndexer(inputCols=['JobType', 'EdType', 'maritalstatus', 'occupation', 'relationship', 'race', 'gender', 'nativecountry','SalStat'], 
                      outputCols=['JobType_index', 'EdType_index', 'maritalstatus_index', 'occupation_index', 'relationship_index', 'race_index', 'gender_index', 'nativecountry_index','label'])
df2=indexer.fit(df1).transform(df1)

In [508]:
df2=df2.drop('JobType', 'EdType', 'maritalstatus', 'occupation', 'relationship', 'race', 'gender', 'nativecountry','SalStat')
df2.show(2,False)#dropped unwanted columns

+---+-----------+-----------+------------+---------+-------------------+------------+-----+------------------+----------------+-------------+------------+-------------------+----------+
|age|capitalgain|capitalloss|hoursperweek|age_group|nativecountry_index|EdType_index|label|relationship_index|occupation_index|JobType_index|gender_index|maritalstatus_index|race_index|
+---+-----------+-----------+------------+---------+-------------------+------------+-----+------------------+----------------+-------------+------------+-------------------+----------+
|45 |0          |0          |28          |1.0      |0.0                |0.0         |0.0  |1.0               |3.0             |0.0          |1.0         |2.0                |0.0       |
|24 |0          |0          |40          |0.0      |0.0                |0.0         |0.0  |2.0               |14.0            |6.0          |0.0         |1.0                |0.0       |
+---+-----------+-----------+------------+---------+------------------

In [509]:
#VectorAssembler

#MLlib expects all features to be contained within a single column. 
#VectorAssembler combines multiple columns and gives single column as output

In [510]:
from pyspark.ml.feature import VectorAssembler

In [511]:
df2.columns

['age',
 'capitalgain',
 'capitalloss',
 'hoursperweek',
 'age_group',
 'nativecountry_index',
 'EdType_index',
 'label',
 'relationship_index',
 'occupation_index',
 'JobType_index',
 'gender_index',
 'maritalstatus_index',
 'race_index']

In [512]:
feature_cols=['age','capitalgain','capitalloss','hoursperweek','age_group','nativecountry_index','EdType_index','relationship_index','occupation_index','JobType_index','gender_index','maritalstatus_index','race_index']
assembler=VectorAssembler(inputCols=feature_cols, outputCol="features")
df3=assembler.transform(df2)

In [513]:
df3.show(2,False)

+---+-----------+-----------+------------+---------+-------------------+------------+-----+------------------+----------------+-------------+------------+-------------------+----------+------------------------------------------------------+
|age|capitalgain|capitalloss|hoursperweek|age_group|nativecountry_index|EdType_index|label|relationship_index|occupation_index|JobType_index|gender_index|maritalstatus_index|race_index|features                                              |
+---+-----------+-----------+------------+---------+-------------------+------------+-----+------------------+----------------+-------------+------------+-------------------+----------+------------------------------------------------------+
|45 |0          |0          |28          |1.0      |0.0                |0.0         |0.0  |1.0               |3.0             |0.0          |1.0         |2.0                |0.0       |(13,[0,3,4,7,8,10,11],[45.0,28.0,1.0,1.0,3.0,1.0,2.0])|
|24 |0          |0          |40     

In [514]:
df3.select('features','label').show(2,False)

+------------------------------------------------------+-----+
|features                                              |label|
+------------------------------------------------------+-----+
|(13,[0,3,4,7,8,10,11],[45.0,28.0,1.0,1.0,3.0,1.0,2.0])|0.0  |
|(13,[0,3,7,8,9,11],[24.0,40.0,2.0,14.0,6.0,1.0])      |0.0  |
+------------------------------------------------------+-----+
only showing top 2 rows



In [515]:
#VectorIndexer

#VectorIndexer automatically identifies the categorical features from the feature vector (output from VectorAssembler). 
#It then indexes categorical features inside of a Vector It is the vectorized version of StringIndexer

In [516]:
from pyspark.ml.feature import VectorIndexer

In [517]:
vector_indexer=VectorIndexer(inputCol='features', outputCol="indexed_features")
df4=vector_indexer.fit(df3).transform(df3)

In [518]:
df4.show(2,False)

+---+-----------+-----------+------------+---------+-------------------+------------+-----+------------------+----------------+-------------+------------+-------------------+----------+------------------------------------------------------+------------------------------------------------------+
|age|capitalgain|capitalloss|hoursperweek|age_group|nativecountry_index|EdType_index|label|relationship_index|occupation_index|JobType_index|gender_index|maritalstatus_index|race_index|features                                              |indexed_features                                      |
+---+-----------+-----------+------------+---------+-------------------+------------+-----+------------------+----------------+-------------+------------+-------------------+----------+------------------------------------------------------+------------------------------------------------------+
|45 |0          |0          |28          |1.0      |0.0                |0.0         |0.0  |1.0               |3.

In [519]:
df4.select('features','indexed_features','label').show(2,False)

+------------------------------------------------------+------------------------------------------------------+-----+
|features                                              |indexed_features                                      |label|
+------------------------------------------------------+------------------------------------------------------+-----+
|(13,[0,3,4,7,8,10,11],[45.0,28.0,1.0,1.0,3.0,1.0,2.0])|(13,[0,3,4,7,8,10,11],[45.0,28.0,1.0,1.0,3.0,1.0,2.0])|0.0  |
|(13,[0,3,7,8,9,11],[24.0,40.0,2.0,14.0,6.0,1.0])      |(13,[0,3,7,8,9,11],[24.0,40.0,2.0,14.0,6.0,1.0])      |0.0  |
+------------------------------------------------------+------------------------------------------------------+-----+
only showing top 2 rows



In [520]:
#Train-Test Split

#We split the output of data into training and test sets (30% held out for testing) 
#Note: This train-test split of for logistic regression

In [521]:
train1, test1 = df4.randomSplit([0.7,0.3],seed=2)

In [522]:
#train1.count(),test1.count()

In [523]:
##Supervised Learning - Classification

In [524]:
#1. Logistic Regression

In [525]:
from pyspark.ml.classification import LogisticRegression

In [526]:
lr=LogisticRegression(featuresCol='features',labelCol='label')
lrmodel=lr.fit(train1)
#lr_prediction=lrmodel.transform(test1)

In [527]:
#Model Evaluation

In [528]:
# Create model summary object
lrmodelSummary = lrmodel.summary

# Print the following metrics one by one: 
# 1. Accuracy
# Accuracy is a model summary parameter
print("Accuracy = ", lrmodelSummary.accuracy)
# 2. Area under the ROC curve
# Area under the ROC curve is a model summary parameter
print("Area under the ROC curve = ", lrmodelSummary.areaUnderROC)
# 3. Precision (Positive Predictive Value)
# Precision is a model summary parameter
print("Precision = ", lrmodelSummary.weightedPrecision)
# 4. Recall (True Positive Rate)
# Recall is a model summary parameter
print("Recall = ", lrmodelSummary.weightedRecall)
# 5. F1 Score (F-measure)
# F1 Score is a model summary method
print("F1 Score = ", lrmodelSummary.weightedFMeasure())

Accuracy =  0.815756602426838
Area under the ROC curve =  0.8564887315363008
Precision =  0.80201013241653
Recall =  0.815756602426838
F1 Score =  0.799329362511479


In [529]:
#2. Decision Tree

In [530]:
from pyspark.ml.classification import DecisionTreeClassifier

In [531]:
dt=DecisionTreeClassifier(featuresCol='indexed_features',labelCol='label',maxDepth= 10,maxBins=41)
dt_model=dt.fit(train1)
dt_prediction=dt_model.transform(test1)

In [532]:
dt_prediction.columns

['age',
 'capitalgain',
 'capitalloss',
 'hoursperweek',
 'age_group',
 'nativecountry_index',
 'EdType_index',
 'label',
 'relationship_index',
 'occupation_index',
 'JobType_index',
 'gender_index',
 'maritalstatus_index',
 'race_index',
 'features',
 'indexed_features',
 'rawPrediction',
 'probability',
 'prediction']

In [533]:
dt_prediction.select('rawPrediction','probability','prediction','label').show(10,False)

+-------------+------------------------------------------+----------+-----+
|rawPrediction|probability                               |prediction|label|
+-------------+------------------------------------------+----------+-----+
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
|[280.0,1.0]  |[0.99644128113879,0.0035587188612099642]  |0.0       |0.0  |
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
|[3600.0,1.0] |[0.9997222993612885,2.7770063871146905E-4]|0.0       |0.0  |
+-----------

In [534]:
#Model Evaluation

In [535]:
# import MulticlassClassificationEvaluator from the pyspark.ml.evaluation package
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Build the MulticlassClassificationEvaluator object 'evaluator'
multievaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# 1. Accuracy
print("Accuracy: ", multievaluator.evaluate(dt_prediction, {evaluator.metricName: "accuracy"})) 
# 2. Area under the ROC curve
print('Area under the ROC curve = ', evaluator.evaluate(dt_prediction))
# 3. Precision (Positive Predictive Value)
print("Precision = ", multievaluator.evaluate(dt_prediction, {evaluator.metricName: "weightedPrecision"}))
# 4. Recall (True Positive Rate)
print("Recall = ", multievaluator.evaluate(dt_prediction, {evaluator.metricName: "weightedRecall"}))
# 5. F1 Score (F-measure)
print("F1 Score = ", multievaluator.evaluate(dt_prediction, {evaluator.metricName: "f1"}))

Accuracy:  0.8432048897259496
Area under the ROC curve =  0.7864135261704837
Precision =  0.8432048897259496
Recall =  0.8432048897259496
F1 Score =  0.8432048897259496


In [536]:
#2. Random Forest

In [537]:
from pyspark.ml.classification import RandomForestClassifier

In [538]:
rf=RandomForestClassifier(featuresCol='indexed_features',labelCol='label',maxBins=41)
rf_model=rf.fit(train1)
rf_prediction=rf_model.transform(test1)

In [539]:
rf_prediction.columns

['age',
 'capitalgain',
 'capitalloss',
 'hoursperweek',
 'age_group',
 'nativecountry_index',
 'EdType_index',
 'label',
 'relationship_index',
 'occupation_index',
 'JobType_index',
 'gender_index',
 'maritalstatus_index',
 'race_index',
 'features',
 'indexed_features',
 'rawPrediction',
 'probability',
 'prediction']

In [540]:
rf_prediction.select('rawPrediction','probability','prediction','label').show(10,False)

+---------------------------------------+-----------------------------------------+----------+-----+
|rawPrediction                          |probability                              |prediction|label|
+---------------------------------------+-----------------------------------------+----------+-----+
|[19.249040465247806,0.750959534752191] |[0.9624520232623904,0.03754797673760956] |0.0       |0.0  |
|[19.249040465247806,0.750959534752191] |[0.9624520232623904,0.03754797673760956] |0.0       |0.0  |
|[18.407606713228706,1.5923932867712878]|[0.9203803356614356,0.07961966433856442] |0.0       |0.0  |
|[19.181641709440473,0.8183582905595226]|[0.9590820854720239,0.040917914527976135]|0.0       |0.0  |
|[19.249040465247806,0.750959534752191] |[0.9624520232623904,0.03754797673760956] |0.0       |0.0  |
|[19.249040465247806,0.750959534752191] |[0.9624520232623904,0.03754797673760956] |0.0       |0.0  |
|[19.1686714716429,0.8313285283570966]  |[0.9584335735821451,0.04156642641785484] |0.0     

In [541]:
multievaluator2=MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='label')

In [542]:
# 1. Accuracy
print("Accuracy: ", multievaluator.evaluate(rf_prediction, {evaluator.metricName: "accuracy"})) 
# 2. Area under the ROC curve
print('Area under the ROC curve = ', evaluator.evaluate(rf_prediction))
# 3. Precision (Positive Predictive Value)
print("Precision = ", multievaluator.evaluate(rf_prediction, {evaluator.metricName: "weightedPrecision"}))
# 4. Recall (True Positive Rate)
print("Recall = ", multievaluator.evaluate(rf_prediction, {evaluator.metricName: "weightedRecall"}))
# 5. F1 Score (F-measure)
print("F1 Score = ", multievaluator.evaluate(rf_prediction, {evaluator.metricName: "f1"}))

Accuracy:  0.83113116405949
Area under the ROC curve =  0.8951071472522564
Precision =  0.83113116405949
Recall =  0.83113116405949
F1 Score =  0.83113116405949


In [543]:
#-----------------------------------------END---------------------------#

In [544]:
## We spilt the data into 70-30 set
# Training Set - 70% obesevations
# Testing Set - 30% observations
trainDF, testDF =  df.randomSplit([0.7,0.3], seed = 2)

# print the count of observations in each set
print("Observations in training set = ", trainDF.count())
print("Observations in testing set = ", testDF.count())

Observations in training set =  22416
Observations in testing set =  9562


In [545]:
# import Pipeline from pyspark.ml package
from pyspark.ml import Pipeline

# Build the pipeline object by providing stages(transformers + Estimator) 
# that you need the dataframe to pass through
# Transfoermers - binarizer, bucketizer, indexers, encoder, assembler
# Estimator - lr
lrpipeline = Pipeline(stages=[bucketizer, indexer, assembler, lr])

# fit the pipeline for the trainind data
lrpipelinemodel = lrpipeline.fit(trainDF)

# transform the data
lrpipelinepredicted = lrpipelinemodel.transform(testDF)

# view some of the columns generated
lrpipelinepredicted.select('label', 'rawPrediction', 'probability', 'prediction').show(10)

+-----+--------------------+--------------------+----------+
|label|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|  0.0|[4.39909452909074...|[0.98786071147062...|       0.0|
|  0.0|[3.80032493739012...|[0.97812568244983...|       0.0|
|  0.0|[-7.9715523917419...|[3.45023690571653...|       1.0|
|  0.0|[3.48951829248258...|[0.97038805705483...|       0.0|
|  0.0|[3.11995804025573...|[0.95770852869916...|       0.0|
|  0.0|[2.75039778802888...|[0.93993581146911...|       0.0|
|  0.0|[2.75039778802888...|[0.93993581146911...|       0.0|
|  0.0|[3.50895248600611...|[0.97094142399573...|       0.0|
|  0.0|[3.87189297726296...|[0.97960566570795...|       0.0|
|  0.0|[3.13277247280925...|[0.95822451694094...|       0.0|
+-----+--------------------+--------------------+----------+
only showing top 10 rows



In [546]:
# Build the MulticlassClassificationEvaluator object 'evaluator'
multievaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# 1. Accuracy
print("Accuracy: ", multievaluator.evaluate(lrpipelinepredicted, {evaluator.metricName: "accuracy"})) 
# 2. Area under the ROC curve
print('Area under the ROC curve = ', evaluator.evaluate(lrpipelinepredicted))
# 3. Precision (Positive Predictive Value)
print("Precision = ", multievaluator.evaluate(lrpipelinepredicted, {evaluator.metricName: "weightedPrecision"}))
# 4. Recall (True Positive Rate)
print("Recall = ", multievaluator.evaluate(lrpipelinepredicted, {evaluator.metricName: "weightedRecall"}))
# 5. F1 Score (F-measure)
print("F1 Score = ", multievaluator.evaluate(lrpipelinepredicted, {evaluator.metricName: "f1"}))

Accuracy:  0.8002775080005335
Area under the ROC curve =  0.8584433805887849
Precision =  0.8002775080005335
Recall =  0.8002775080005335
F1 Score =  0.8002775080005335


In [547]:
#--------------------------------end-----------------------------------------------------#