# Classify 1994 Census Income Data

 **Starting a spark session**

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName('Classify 1994 Census Income Data').getOrCreate()

In [3]:
spark

**Loading the dataset**

In [4]:
data=spark.read.csv('adult.csv')

**Reading the dataset**

In [5]:
data=spark.read.option('header','true').csv('adult.csv',inferSchema=True)

In [6]:
data.show()

+---+----------------+------+------------+-------------+--------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|education_num|marital_status|       occupation|  relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+------------+-------------+--------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
| 90|            null| 77053|     HS-grad|            9|       Widowed|             null| Not-in-family|White|Female|           0|        4356|            40| United-States| <=50K|
| 82|         Private|132870|     HS-grad|            9|       Widowed|  Exec-managerial| Not-in-family|White|Female|           0|        4356|            18| United-States| <=50K|
| 66|            null|186061|Some-college|           10|       Widowed|             null|     U

**Checking the number of null values**

In [7]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
0,0,1836,0,0,0,0,1843,0,0,0,0,0,0,583,0


**Cleaning the data from the dataset**

In [8]:
# Dropping all the null values in the data set.
cleaned_data=data.na.drop()

**Encoding the categorical data into numerical data**

In [9]:
from pyspark.ml.feature import VectorAssembler,StringIndexer

In [10]:
workclassEncoder = StringIndexer(inputCol='workclass',outputCol='workclass_Encoded').fit(cleaned_data)
educationEncoder = StringIndexer(inputCol='education',outputCol='education_Encoded').fit(cleaned_data)
marital_statusEncoder = StringIndexer(inputCol='marital_status',outputCol='marital_status_Encoded').fit(cleaned_data)
occupationEncoder = StringIndexer(inputCol='occupation',outputCol='occupation_Encoded').fit(cleaned_data)
relationshipEncoder = StringIndexer(inputCol='relationship',outputCol='relationship_Encoded').fit(cleaned_data)
raceEncoder = StringIndexer(inputCol='race',outputCol='race_Encoded').fit(cleaned_data)
sexEncoder = StringIndexer(inputCol='sex',outputCol='sex_Encoded').fit(cleaned_data)
native_countryEncoder = StringIndexer(inputCol='native_country',outputCol='native_country_Encoded').fit(cleaned_data)
incomeEncoder = StringIndexer(inputCol='income',outputCol='income_Encoded').fit(cleaned_data)

In [11]:
cleaned_data=workclassEncoder.transform(cleaned_data)
cleaned_data=educationEncoder.transform(cleaned_data)
cleaned_data=marital_statusEncoder.transform(cleaned_data)
cleaned_data=occupationEncoder.transform(cleaned_data)
cleaned_data=relationshipEncoder.transform(cleaned_data)
cleaned_data=raceEncoder.transform(cleaned_data)
cleaned_data=sexEncoder.transform(cleaned_data)
cleaned_data=native_countryEncoder.transform(cleaned_data)
cleaned_data=incomeEncoder.transform(cleaned_data)

In [12]:
cleaned_data.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native_country',
 'income',
 'workclass_Encoded',
 'education_Encoded',
 'marital_status_Encoded',
 'occupation_Encoded',
 'relationship_Encoded',
 'race_Encoded',
 'sex_Encoded',
 'native_country_Encoded',
 'income_Encoded']

**Selecting all the numerical variables**

In [13]:
cleaned_data=cleaned_data.select('age',
 'fnlwgt',
 'education_num',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'workclass_Encoded',
 'education_Encoded',
 'marital_status_Encoded',
 'occupation_Encoded',
 'relationship_Encoded',
 'race_Encoded',
 'sex_Encoded',
 'native_country_Encoded',
 'income_Encoded')

In [14]:
cleaned_data.show()

Unnamed: 0,age,fnlwgt,education_num,capital_gain,capital_loss,hours_per_week,workclass_Encoded,education_Encoded,marital_status_Encoded,occupation_Encoded,relationship_Encoded,race_Encoded,sex_Encoded,native_country_Encoded,income_Encoded
0,82,132870,9,0,4356,18,0.0,0.0,4.0,2.0,1.0,0.0,1.0,0.0,0.0
1,54,140359,4,0,3900,40,0.0,8.0,2.0,6.0,3.0,0.0,1.0,0.0,0.0
2,41,264663,10,0,3900,40,0.0,1.0,3.0,0.0,2.0,0.0,1.0,0.0,0.0
3,34,216864,9,0,3770,45,0.0,0.0,2.0,5.0,3.0,0.0,1.0,0.0,0.0
4,38,150601,6,0,3770,40,0.0,7.0,3.0,3.0,3.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
30157,22,310152,10,0,0,40,0.0,1.0,1.0,11.0,1.0,0.0,0.0,0.0,0.0
30158,27,257302,12,0,0,38,0.0,6.0,0.0,10.0,4.0,0.0,1.0,0.0,0.0
30159,40,154374,9,0,0,40,0.0,0.0,0.0,6.0,0.0,0.0,0.0,0.0,1.0
30160,58,151910,9,0,0,40,0.0,0.0,4.0,3.0,3.0,0.0,1.0,0.0,0.0


**Selecting all the features and vectorising them using VectorAssembler**

In [15]:
required_features = ['age',
 'fnlwgt',
 'education_num',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'workclass_Encoded',
 'education_Encoded',
 'marital_status_Encoded',
 'occupation_Encoded',
 'relationship_Encoded',
 'race_Encoded',
 'sex_Encoded',
 'native_country_Encoded']

In [16]:
vec_assembler = VectorAssembler(inputCols=required_features,outputCol='features')
vec_df = vec_assembler.transform(cleaned_data)
vec_df.show()

+---+------+-------------+------------+------------+--------------+-----------------+-----------------+----------------------+------------------+--------------------+------------+-----------+----------------------+--------------+--------------------+
|age|fnlwgt|education_num|capital_gain|capital_loss|hours_per_week|workclass_Encoded|education_Encoded|marital_status_Encoded|occupation_Encoded|relationship_Encoded|race_Encoded|sex_Encoded|native_country_Encoded|income_Encoded|            features|
+---+------+-------------+------------+------------+--------------+-----------------+-----------------+----------------------+------------------+--------------------+------------+-----------+----------------------+--------------+--------------------+
| 82|132870|            9|           0|        4356|            18|              0.0|              0.0|                   4.0|               2.0|                 1.0|         0.0|        1.0|                   0.0|           0.0|[82.0,132870.0,9..

**Spliting the data into test set and training set**

In [17]:
x_train,x_test = vec_df.randomSplit([0.8,0.2])

**Using a Logistic Regression model and fitting training set into it**

In [18]:
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier

In [19]:
 l=LogisticRegression(featuresCol='features',labelCol='income_Encoded')

In [20]:
l_model = l.fit(x_train)

In [21]:
y_pred = l_model.transform(x_test)

In [22]:
y_pred.show()

+---+------+-------------+------------+------------+--------------+-----------------+-----------------+----------------------+------------------+--------------------+------------+-----------+----------------------+--------------+--------------------+--------------------+--------------------+----------+
|age|fnlwgt|education_num|capital_gain|capital_loss|hours_per_week|workclass_Encoded|education_Encoded|marital_status_Encoded|occupation_Encoded|relationship_Encoded|race_Encoded|sex_Encoded|native_country_Encoded|income_Encoded|            features|       rawPrediction|         probability|prediction|
+---+------+-------------+------------+------------+--------------+-----------------+-----------------+----------------------+------------------+--------------------+------------+-----------+----------------------+--------------+--------------------+--------------------+--------------------+----------+
| 17| 27415|            7|           0|           0|            20|              0.0|   

In [23]:
y_pred.select('income_Encoded', 'prediction').show()

+--------------+----------+
|income_Encoded|prediction|
+--------------+----------+
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
+--------------+----------+
only showing top 20 rows



**Measuring the accuracy of the machine model**

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(labelCol='income_Encoded',metricName='accuracy')
multi_evaluator.evaluate(y_pred)

0.8351684470008217