# Loading Spark

In [1]:
sc.version

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1638061304448_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'2.4.5-amzn-0'

# Preprocessing 

In [58]:
# Reading the csv file from S3 bucket
df = spark.read.csv ("s3://isaacgarciateyunbucket/adult.csv", header = True, inferSchema = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We read the csv file and assigning to a variable df. We set header = True to make sure the header columns are not considered as a row and also set inferSchema = True to get Spark to infer the schema of each column. 

In [59]:
# Displaying the top 5 rows of the dataset
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---------+------+------------+-------------+--------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|   education|education.num|marital.status|       occupation| relationship| race|   sex|capital.gain|capital.loss|hours.per.week|native.country|income|
+---+---------+------+------------+-------------+--------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 90|        ?| 77053|     HS-grad|            9|       Widowed|                ?|Not-in-family|White|Female|           0|        4356|            40| United-States| <=50K|
| 82|  Private|132870|     HS-grad|            9|       Widowed|  Exec-managerial|Not-in-family|White|Female|           0|        4356|            18| United-States| <=50K|
| 66|        ?|186061|Some-college|           10|       Widowed|                ?|    Unmarried|Black|Female|           0|        4356|

We displayed the top 5 rows and immediately noticed that we have ? values in some of the columns. We also noticed that the column names are not in the right format. 

In [60]:
# Changing "." from column names to "_"
df = df.withColumnRenamed("education.num", "education_num")\
       .withColumnRenamed("marital.status", "marital_status")\
       .withColumnRenamed("capital.gain", "capital_gain")\
       .withColumnRenamed("capital.loss", "capital_loss")\
       .withColumnRenamed("hours.per.week", "hours_per_week")\
       .withColumnRenamed("native.country", "native_country")
       
# Displaying the frist row
df.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---------+------+---------+-------------+--------------+----------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education_num|marital_status|occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+---------+-------------+--------------+----------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 90|        ?| 77053|  HS-grad|            9|       Widowed|         ?|Not-in-family|White|Female|           0|        4356|            40| United-States| <=50K|
+---+---------+------+---------+-------------+--------------+----------+-------------+-----+------+------------+------------+--------------+--------------+------+
only showing top 1 row

We changed the column names to appropiate formats and display a row to make sure the changes are in palced. 

In [61]:
# Columns Discription
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)

We printed out the schema to see if the columns are in wrong data types. It seemed that all the columns are in right data types. 

In [62]:
# Checking exact duplicates
df.count(), df.distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(32561, 32537)

We checked the count to see if there are exact duplicated rows in the dataset and there seem to be a few. 

In [63]:
# Dropping the exact duplicated rows
df1 = df.dropDuplicates()

# Counting the number of rows of the new_df
df1.count(),df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(32537, 32561)

We dropped the exact duplicated rows and re-count the data with a new name. 

In [64]:
# Dropping rows with '?' values
df_clean = df1.filter(
    ((df1.workclass != "?")
    & (df1.occupation != "?")
    & (df1.native_country != "?"))
)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We filtered out the ? values from the certain columns and assinged the dataset to a new name. 

In [65]:
from pyspark.sql.functions import isnan, when, count, col

# Checking all columns to ensure all Nan values have been dropped
df_clean.select([count(when(isnan(c), c)).alias(c) for c in df_clean.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education_num|marital_status|occupation|relationship|race|sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|        0|     0|        0|            0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+

Double checking to see if there's any NAN or NULL vlaues left in any of the columns. It seemed to be all NAN and NULL values have been cleaned up. 

In [66]:
# Displaying the number of rows left
df_clean.count(),df1.count(),df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(30139, 32537, 32561)

We counted the rows of the new dataset to see how much rows we have lost after cleaning up and dropping duplicates. We lost over 2000 rows  either duplicates or with NAN values. 

In part 2 of this project, we removed the outliers by using boxplots for suspicious columns. We removed the graphs in this notebook to save time. The following are the columns with outliers and the process that we use to remove outliers from each columns. 

In [67]:
#remove outliars from Hours per week
df_clean2 = df_clean.filter(
     ((df_clean.hours_per_week != '81')
    & (df_clean.hours_per_week != '82')
    & (df_clean.hours_per_week != '83')
    & (df_clean.hours_per_week != '84')
    & (df_clean.hours_per_week != '85')
    & (df_clean.hours_per_week != '86')
    & (df_clean.hours_per_week != '87')
    & (df_clean.hours_per_week != '88')
    & (df_clean.hours_per_week != '89')
    & (df_clean.hours_per_week != '90')
    & (df_clean.hours_per_week != '91')
    & (df_clean.hours_per_week != '92')
    & (df_clean.hours_per_week != '93')
    & (df_clean.hours_per_week != '94')
    & (df_clean.hours_per_week != '95')
    & (df_clean.hours_per_week != '96')
    & (df_clean.hours_per_week != '97')
    & (df_clean.hours_per_week != '98')
    & (df_clean.hours_per_week != '99')
   ))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
# Checking the count of dataset after removing outliers
df_clean.count(),df_clean2.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(30139, 29944)

In [69]:
# remove all outliars in age
df_clean3 = df_clean2.filter(
    ((df_clean2.age != '78')
    & (df_clean2.age != '79')
    & (df_clean2.age != '80')
    & (df_clean2.age != '81')
    & (df_clean2.age != '82')
    & (df_clean2.age != '83')
    & (df_clean2.age != '84')
    & (df_clean2.age != '85')
    & (df_clean2.age != '86')
    & (df_clean2.age != '87')
    & (df_clean2.age != '88')
    & (df_clean2.age != '89')
    & (df_clean2.age != '90')

    ))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
# Checking the count of dataset after removing outliers
df_clean.count(),df_clean3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(30139, 29826)

In [71]:
# Removing outliers from capital_gain column
df_clean3 = df_clean3[df_clean3['capital_gain'] <= 40000]

# Counting the data after remoivng outliers 
df_clean.count(),df_clean3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(30139, 29677)

In [72]:
# Removing outliers from capital_loss column
df_clean3 = df_clean3[df_clean3['capital_loss'] <= 3000]

# Counting data after removing outliers
df_clean.count(),df_clean3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(30139, 29670)

In [73]:
# Removing outliers from fnlwgt column
df_clean3 = df_clean3[df_clean3['fnlwgt'] <= 850000]

# Counting data after removing outliers
df_clean.count(),df_clean3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(30139, 29648)

In [74]:
# Displaying the dataset after removing outliers
df_clean3.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---------+------+----------+-------------+--------------+--------------+-------------+-----+----+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt| education|education_num|marital_status|    occupation| relationship| race| sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+----------+-------------+--------------+--------------+-------------+-----+----+------------+------------+--------------+--------------+------+
| 36|  Private|224566|Assoc-acdm|           12| Never-married|Prof-specialty|Not-in-family|White|Male|           0|        1669|            45| United-States| <=50K|
+---+---------+------+----------+-------------+--------------+--------------+-------------+-----+----+------------+------------+--------------+--------------+------+
only showing top 1 row

After removing the outliers, we have left with 29648 columns. We assigned the dataset to a new name. we lost about 1.62% of the data after cleaning up the outliers. 

# Correlation

In [75]:
# Selecting features for correlation
features = ['age','fnlwgt','education_num','capital_gain','capital_loss','hours_per_week']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We selected the numeric columns and assigned to a name. 

In [76]:
# Creating a custom loop 
n_features = len(features)

corr = []

for i in range(0, n_features):
    temp = [None] * i

    for j in range(i, n_features):
        temp.append(df_clean3.corr(features[i], features[j]))
    corr.append([features[i]] + temp)

# Creating a dataframe for correlation
correlations = spark.createDataFrame(corr, ['Column'] + features)

# Displaying correlation 
correlations.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----+--------------------+--------------------+--------------------+--------------------+--------------------+
|        Column| age|              fnlwgt|       education_num|        capital_gain|        capital_loss|      hours_per_week|
+--------------+----+--------------------+--------------------+--------------------+--------------------+--------------------+
|           age| 1.0|-0.07402590383297354|0.044755755036027675| 0.11956987435377789|0.059667785652745914| 0.11515012046745453|
|        fnlwgt|null|                 1.0|-0.04776248530128201|-0.00660668203431...|-0.00921512191419...|-0.01946837548896...|
| education_num|null|                null|                 1.0|   0.149253317665428| 0.08306844753399072| 0.15742165542635028|
|  capital_gain|null|                null|                null|                 1.0|-0.05144216730478446| 0.09004351144791092|
|  capital_loss|null|                null|                null|                null|                 1.0| 0.055

It appears that the two strongest correlations within this data set are education_num and capital_gain (.149), and education_num and hours_per_week(.157). These seem to be positively correlated due to the effect of education_num, it seem the more educated you are, the more hours you will work and the more capital gain you will experience

# Data Transforming

In [77]:
# Printing out the schema before data transforming 
df_clean3.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)

We printed out the schema again before we started transforming the data. 

In [78]:
# Renaming some columns and dependent column income as label
cols = ['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We have decided to rename some of the columns and changed our dependent varaible name "income" to "label".

In [79]:
# Assigning the renamed columns to a new df
df_new = df_clean3.toDF(*cols)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Then, we created a dataset with new column names. 

In [80]:
# Import all from `sql.types`
from pyspark.sql.types import *

# A custom function to convert the data type of DataFrame columns
def convertColumn(df_new, names, newType):
    for name in names: 
        df_new = df_new.withColumn(name, df_new[name].cast(newType))
    return df_new 

# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']

# Converting continuous features into floats
df_new = convertColumn(df_new, CONTI_FEATURES, FloatType())

# Checking the schema of the dataset
df_new.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

We created a custom function to change the numeric columns to float and printed out the schema to ensure the changes are in placed. 

In [81]:
# Displaying the to 5 rows
df_new.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------------+--------+------------+-------------+------------------+---------------+-------------+-----+------+------------+------------+----------+--------------+-----+
| age|       workclass|  fnlwgt|   education|education_num|           marital|     occupation| relationship| race|   sex|capital_gain|capital_loss|hours_week|native_country|label|
+----+----------------+--------+------------+-------------+------------------+---------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|36.0|         Private|224566.0|  Assoc-acdm|         12.0|     Never-married| Prof-specialty|Not-in-family|White|  Male|         0.0|      1669.0|      45.0| United-States|<=50K|
|66.0|       Local-gov|174486.0|   Doctorate|         16.0|Married-civ-spouse| Prof-specialty|      Husband|Black|  Male|     20051.0|         0.0|      35.0|       Jamaica| >50K|
|33.0|         Private|356823.0|   Bachelors|         13.0|     Never-married| Prof-specialty|Not-in

We also displayed the top 5 rows of the dataset to see everything is what we wanted to be at this point. 

In [82]:
# Importing libraries 
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.feature import OneHotEncoderEstimator

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [83]:
# Selecting the categorical columns
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']

# stages in Pipeline
stages = [] 

# Mapping categorical columns with StringIndexer to label indices
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    
# Mapping StringIndexer into a column of binary vectors    
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
# State 1 for pipeline
    stages += [stringIndexer, encoder]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We selected all the categorical columns except out dependent coulumn "label" and used stringindexer to change the columns to label indices. Then, we used onehotencoder to changed the columns into binary vectors. We set up the first stage for  pipeline.

In [84]:
# Converting label column into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")

# stage 2 for pipeline
stages += [label_stringIdx]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Then, we also used stringIndexer to change our dependent column "label" into indencies and set up stage 2 for pipeline. 

In [85]:
# Adding continuous variable
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Then we added the numeric columns. 

In [86]:
# Assemble the steps with VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

# Stage 3 for pipeline
stages += [assembler]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We used VectorAseembler to assemble all the columns together and we out put the results as "features". We set up our final stage for pipeline. 

In [87]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)

# Fitting the pipeline model to data
pipelineModel = pipeline.fit(df_new)

# Transforming the pipeline model to data
model = pipelineModel.transform(df_new)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We assembled all the stages into our pipeline, fitted and transformed to our dataset. 

In [88]:
from pyspark.ml.linalg import DenseVector

# Selcting newlabel and features from model using map
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We changed the model into dense vector. 

In [89]:
# Converting the model to a dataframe
df_final = spark.createDataFrame(input_data, ["label", "features"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We changed our model to a dataframe to make faster computation.

In [90]:
# Displaying the model
df_final.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,1.0,0.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
|  1.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,1.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 20 rows

We displayed our final data before we started our classification methods. 

# Logistic Regression

In [91]:
# Splitting the model into train and test sets
train_data, test_data = (
    df_final
    .randomSplit([0.7, 0.3], seed=666)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We randomly split our data into 70% and 30% as training data and testing data. 

In [92]:
from pyspark.ml.classification import LogisticRegression
from time import *

# Marking the time before fitting model
start_time = time()


# Initializing logistic regression
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        regParam=0.01,
                        elasticNetParam = 1.00,
                        family = 'multinomial')

# Fit the data to the model
linearModel = lr.fit(train_data)

# Marking the end time after fitting model
end_time = time()

# Calculating the duration of model fitting
elapsed_time = end_time - start_time

# Printing out the time to train the model in seconds
print("Time to train model: %.3f seconds" % elapsed_time)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time to train model: 113.507 seconds

We set up our logistic regression and imported time to see how long it take to train our model. Approximately 2 mins to train. 

In [93]:
# Transforming the data to test model
predictions_lr = linearModel.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Then we transformed into our test data. 

In [94]:
# Selecting columns and storing in a new variable
results_logReg = predictions_lr.select('label', 'probability', 'prediction')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We selected columns for our evalutation.

In [95]:
import pyspark.ml.evaluation as ev

#evaluate performance of the multinomial model (7 classes)
evaluator = ev.MulticlassClassificationEvaluator(
    predictionCol='prediction'
    , labelCol='label')

(
    #F-1 score
    evaluator.evaluate(results_logReg)
    
    #weighted precision
    , evaluator.evaluate(
        results_logReg
        , {evaluator.metricName: 'weightedPrecision'}
    ) 
    #accuracy
    , evaluator.evaluate(
        results_logReg
        , {evaluator.metricName: 'accuracy'}
    )
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(0.82885905861642, 0.8315289733339646, 0.8399909726923945)

Our accuracy with logistic regression is approximately 83% with 0.827 F1 score and 0.834 weighted precisions. 

# Random Forest Classifier

In [96]:
from pyspark.ml.classification import RandomForestClassifier

# Marking the time before fitting the model
start_time = time()

# Initializing random forest classifer
rf = RandomForestClassifier(
    labelCol="label"
    , featuresCol= "features"
    , minInstancesPerNode=10
    , numTrees=10
)

# Fitting the data in the model
rf_model = rf.fit(train_data)

# Marking the time after fitting the model
end_time = time()

# Calculating the duration of model fitting
elapsed_time = end_time - start_time

# Printing out the time to train the model in seconds
print("Time to train model: %.3f seconds" % elapsed_time)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time to train model: 133.862 seconds

We set up our random forest classifer with 10 tress and this model takes approximately 2 mins to train. 

In [97]:
# Transforming the data to test model
predictions_rf = rf_model.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Then, we tranformed back to our testing data. 

In [98]:
# Selecting the columns and assigning to a new variable 
results_rf = predictions_rf.select('label', 'probability', 'prediction')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We slected the required columns for evaluations. 

In [99]:
#evaluate performance of the multinomial model (7 classes)
evaluator_rf = ev.MulticlassClassificationEvaluator(
    predictionCol='prediction'
    , labelCol='label')

(
    #F-1 score
    evaluator_rf.evaluate(results_rf)
    #weighted precision
    , evaluator_rf.evaluate(
        results_rf
        , {evaluator_rf.metricName: 'weightedPrecision'}
    ) 
    #accuracy
    , evaluator_rf.evaluate(
        results_rf
        , {evaluator_rf.metricName: 'accuracy'}
    )
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(0.8008346799543602, 0.8166333365229521, 0.8237418190024826)

For our random forest model, the accuracy is approximately 82% with 0.80 F1 score and 0.82 weighted precision. So far, our logitic regression is higher accuracy with 84%. 

# Hyperparametertuning for Logistic Regression

In [100]:
import pyspark.ml.tuning as tune

# Using ParamGridBuilder to build a grid of parameters
logReg_grid = (
    tune.ParamGridBuilder()

    # 2 values for regParam
    .addGrid(lr.regParam
            , [0.01, 0.1]
        )
    
    # 2 values for elasticNetParam
    .addGrid(lr.elasticNetParam
            , [1.0, 0.5]
        )
    .build()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

For hyper parameter tuning, we set up our grid with both regparam and eleasticnetparam. 

In [101]:
# Marking the time before fitting the model
start_time = time()

# Initializing CrossValidator
# Default K value is 3
cv_lr= tune.CrossValidator(
    estimator=lr
    , estimatorParamMaps=logReg_grid
    , evaluator=evaluator
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Then, we set up our crossvalidator. 

In [102]:
# Running cross validations
# It will take forever
cvModel = cv_lr.fit(train_data)

# Marking the time after fitting the model
end_time = time()

# Calculating the duration 
elapsed_time = end_time - start_time

# Printing the time to train model in seconds
print("Time to train model: %.3f seconds" % elapsed_time)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-102:
Traceback (most recent call last):
  File "/emr/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/emr/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/emr/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1697



Time to train model: 910.128 seconds

We fiited the model into the training data and approximately it took 15 mins. 

In [103]:
# Fitting the model into testig dataset
prediction_lr_hyper = cvModel.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Then we transformed into our test data. 

In [104]:
import pyspark.ml.evaluation as ev

# Setting Multiclass function evaluator
logReg_ev = ev.MulticlassClassificationEvaluator(
    predictionCol='prediction'
    , labelCol='label')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We set up our evaluator with prediction and our dependent column label. 

In [105]:
# Printing out the weightedPrecision, weightedRecall, and Accuracy of the model
print(logReg_ev.evaluate(prediction_lr_hyper, {logReg_ev.metricName: 'weightedPrecision'}))
print(logReg_ev.evaluate(prediction_lr_hyper, {logReg_ev.metricName: 'weightedRecall'}))
print(logReg_ev.evaluate(prediction_lr_hyper, {logReg_ev.metricName: 'accuracy'}))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.8314005762665297
0.8399909726923944
0.8399909726923945

After hyper tuning, our accuracy for logistic regression went up to 84% with 0.84 weighted precision and recall. 

# Hyperparameter tuning for random forest

In [106]:
# Using ParamGridBuilder to build a grid of parameters
paramGrid_rfc = (
    tune.ParamGridBuilder()
    # Selecting numbers of trees
    .addGrid(rf.numTrees
            , [10, 30]
        )
    .build()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now, we set up our gridbuilder for our random forest model with 2 trees values. 

In [107]:
# Marking the time before fitting the model
start_time = time()

# Initializing CrossValidator
cv_rfc= tune.CrossValidator(
    estimator=rf
    , estimatorParamMaps=paramGrid_rfc 
    , evaluator=evaluator
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We set up our crossvalidator with our random forest as estimator, grid builder, and evaluator. 

In [108]:
# Runing cross validations
cvModel_rfc = cv_rfc.fit(train_data)

# Marking the end time after fitting model
end_time = time()

# Calculating the duration of the time
elapsed_time = end_time - start_time

# Printing the time to train model in seconds
print("Time to train model: %.3f seconds" % elapsed_time)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-108:
Traceback (most recent call last):
  File "/emr/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/emr/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/emr/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 10396



Time to train model: 340.029 seconds

Then we fitted into our training data and approximately it took 5 mins to finish. 

In [109]:
# Transfroming the model into test data
prediction_rfc_hyper = cvModel_rfc.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We transfomed into our test data. 

In [110]:
# Setting multiclass function evaluator
rfc_ev = ev.MulticlassClassificationEvaluator(
    predictionCol='prediction'
    , labelCol='label')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We set up our evaluator with predition and label. 

In [111]:
# Printing out the weightedPrecision, weightedRecall, and Accuracy of the random forest model
print(rfc_ev.evaluate(prediction_rfc_hyper, {rfc_ev.metricName: 'weightedPrecision'}))
print(rfc_ev.evaluate(prediction_rfc_hyper, {rfc_ev.metricName: 'weightedRecall'}))
print(rfc_ev.evaluate(prediction_rfc_hyper, {rfc_ev.metricName: 'accuracy'}))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.8166333365229521
0.8237418190024826
0.8237418190024826

In [112]:
# Printing out the weightedPrecision, weightedRecall, and Accuracy of the logistic regression model
print(logReg_ev.evaluate(prediction_lr_hyper, {logReg_ev.metricName: 'weightedPrecision'}))
print(logReg_ev.evaluate(prediction_lr_hyper, {logReg_ev.metricName: 'weightedRecall'}))
print(logReg_ev.evaluate(prediction_lr_hyper, {logReg_ev.metricName: 'accuracy'}))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.8314005762665297
0.8399909726923944
0.8399909726923945

After hyper tuning of our random forest model, the accuracy actually went down a few decimal with 0.82 weighted precision and recall. From the two methods that we used, logistic regression with a higher accuracy score of 84%. 

# Final Analysis

When we began this project, we were attempting to find what caused adults to earn over 50K a year and if certain features affected income more than others and we did find the answer to this and more. We found that higher education numbers and higher hours worked had the strongest impact on if an individual earned more than 50K a year. We then created a model that predicts whether or not an adult earned more than 50k a year based on their traits, our highest-rated model had an accuracy score of 84% and we now have a decently accurate model to predict adult income. We also found that most people who earn over 50K a year are middle-aged adults with above-average education levels. Those who earn less than 50K a year are quite evenly distributed but skew towards the younger ages.