# Income Prediction Model
## Name: Lusanda Mdhlalose
## Student Number: 2332720

### Abstract:
In the following assigment I will make use of the income dataset from the US census to predict whether an individual will have an income greater or less than 50000 dollars using the Random Forest Classifier and Decision Tree Classifier.

In [1]:
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=e315d3f0804b6ed10e7fab6cbb63ec9edfd237776778e9b3fce8ef6e023ecd1b
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

In [3]:
spark = SparkSession.builder.appName('Income Prediction').getOrCreate()

In [4]:
spark

# Data Preprocessing

In [5]:
# Read the dataset
df = spark.read.csv('/content/income.csv', header=True, inferSchema=True)
df.printSchema()
df.show()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- weight: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_years: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- citizenship: string (nullable = true)
 |-- income_class: string (nullable = true)

+---+-----------------+--------+-------------+---------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------------+
|age|        workclass|  weight|    education|education_years|      marital_status|        occupation|  relationship|               race|    sex|capital

In [6]:
# The columns in the dataset
df.columns

['age',
 'workclass',
 'weight',
 'education',
 'education_years',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'citizenship',
 'income_class']

In [7]:
# Finding duplicates in the dataset

duplicates_df = df.groupBy(df.columns).count().filter(col('count')>1)
#duplicates_df.show()


In [8]:
# Removing duplicates

df_2 = df.dropDuplicates()
#updated_df.show()

In [9]:
duplicates = df.count() - df_2.count()
print('The number of duplicated rows in the dataset is {}.'.format(duplicates))

The number of duplicated rows in the dataset is 24.


In [24]:
# Removing ? from the dataset

string_labels = ['workclass','education','marital_status','occupation','relationship','race','sex','citizenship']
for column_name in string_labels:
    #df_3 = df_2.filter(col(column_name) != '?')
    df_3 = df_2.where(" workclass = '\\?' ")
df_3.show()

+---+---------+------+---------+---------------+--------------+----------+------------+----+---+------------+------------+--------------+-----------+------------+
|age|workclass|weight|education|education_years|marital_status|occupation|relationship|race|sex|capital_gain|capital_loss|hours_per_week|citizenship|income_class|
+---+---------+------+---------+---------------+--------------+----------+------------+----+---+------------+------------+--------------+-----------+------------+
+---+---------+------+---------+---------------+--------------+----------+------------+----+---+------------+------------+--------------+-----------+------------+



In [11]:
# Indexing and categorical features within the dataset

index = StringIndexer(
    inputCols=['workclass','education','marital_status','occupation','relationship','race','sex','citizenship', 'income_class'],
    outputCols = ['{}_indexed'.format(column) for column in ['workclass','education','marital_status','occupation','relationship','race','sex','citizenship', 'income_class']]
    )

df_4 = index.fit(df_3).transform(df_3)
#df_4.show()


In [12]:
# The indexed features

indexed_df = df_4.select(
    'age','workclass_indexed','weight', 'education_indexed', 'education_years', 'marital_status_indexed', 'occupation_indexed',
    'relationship_indexed', 'race_indexed', 'sex_indexed', 'capital_gain', 'capital_loss', 'hours_per_week', 'citizenship_indexed', 'income_class_indexed'
                             ).show()

+---+-----------------+--------+-----------------+---------------+----------------------+------------------+--------------------+------------+-----------+------------+------------+--------------+-------------------+--------------------+
|age|workclass_indexed|  weight|education_indexed|education_years|marital_status_indexed|occupation_indexed|relationship_indexed|race_indexed|sex_indexed|capital_gain|capital_loss|hours_per_week|citizenship_indexed|income_class_indexed|
+---+-----------------+--------+-----------------+---------------+----------------------+------------------+--------------------+------------+-----------+------------+------------+--------------+-------------------+--------------------+
| 41|              4.0|101603.0|              4.0|           11.0|                   0.0|               1.0|                 0.0|         0.0|        0.0|         0.0|         0.0|          40.0|                0.0|                 0.0|
| 26|              0.0| 94936.0|              6.0|  

In [13]:
# Creating a feature vector

vec = VectorAssembler(
    inputCols= ['age','workclass_indexed','weight', 'education_indexed', 'education_years', 'marital_status_indexed', 'occupation_indexed',
    'relationship_indexed', 'race_indexed', 'sex_indexed', 'capital_gain', 'capital_loss', 'hours_per_week', 'citizenship_indexed']
               , outputCol= 'Feature Vector'
                      )

final_df = vec.transform(df_4)


# The final preprocessed dataset

final_df.show()


+---+-----------------+--------+-------------+---------------+-------------------+------------------+---------------+------+-------+------------+------------+--------------+--------------+------------+-----------------+-----------------+----------------------+------------------+--------------------+------------+-----------+-------------------+--------------------+--------------------+
|age|        workclass|  weight|    education|education_years|     marital_status|        occupation|   relationship|  race|    sex|capital_gain|capital_loss|hours_per_week|   citizenship|income_class|workclass_indexed|education_indexed|marital_status_indexed|occupation_indexed|relationship_indexed|race_indexed|sex_indexed|citizenship_indexed|income_class_indexed|      Feature Vector|
+---+-----------------+--------+-------------+---------------+-------------------+------------------+---------------+------+-------+------------+------------+--------------+--------------+------------+-----------------+-----

In [14]:
# Splitting the dataset into 70% training and 30% testing data

train, test = final_df.randomSplit([.70, .30])
print('Train Size:', train.count())
print('Test Size:', test.count())

Train Size: 22750
Test Size: 9787


## Random Forest Classifier

In [15]:
# Creating the Random Forest Classifier
rf = RandomForestClassifier(featuresCol='Feature Vector', labelCol='income_class_indexed', maxBins=50)
model_1 = rf.fit(train)
pred_income = model_1.transform(test)

In [16]:

# Creating a Confusion Matrix
preds = pred_income.select(['prediction', 'income_class_indexed']).withColumn('income_class_indexed', col('income_class_indexed').cast(FloatType()))
metric = MulticlassMetrics(preds.rdd.map(tuple))
confusion_matrix = metric.confusionMatrix().toArray()
print('Random Forest Confusion Matrix:', '\n', confusion_matrix)



Random Forest Confusion Matrix: 
 [[7131.  287.]
 [1204. 1165.]]


In [17]:
# Accuracy of the Random Forest Classifier

evaluator = MulticlassClassificationEvaluator(labelCol='income_class_indexed', predictionCol='prediction')
accuracy = evaluator.evaluate(pred_income)
print('The accuracy of the Random Forest Classifier is {}'.format(accuracy))


The accuracy of the Random Forest Classifier is 0.8338085424466447


## Decision Tree Classifier

In [18]:
# Creating the Decision Tree Classifier
dt = DecisionTreeClassifier(featuresCol='Feature Vector', labelCol='income_class_indexed', maxBins=50)
model_2 = dt.fit(train)
pred_income_2 = model_2.transform(test)

In [19]:
# Creating the confusion matrix
preds_2 = pred_income_2.select(['prediction', 'income_class_indexed']).withColumn('income_class_indexed', col('income_class_indexed').cast(FloatType()))
metric = MulticlassMetrics(preds_2.rdd.map(tuple))
confusion_matrix_2 = metric.confusionMatrix().toArray()
print('Decision Tree Confusion Matrix:','\n', confusion_matrix_2)





Decision Tree Confusion Matrix: 
 [[7118.  300.]
 [1258. 1111.]]


In [20]:
# Accuracy of the Decision Classifier

evaluator_2 = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='income_class_indexed')
accuracy_2 = evaluator_2.evaluate(pred_income_2)
print('The accuracy of the Decision Tree Classifier is {}'.format(accuracy_2))

The accuracy of the Decision Tree Classifier is 0.8254645837188347
