In [1]:
import findspark
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler,OneHotEncoder
import matplotlib.pyplot as plt
import tensorflow as tf
from pyspark.sql import Row
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
findspark.init()

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

In [3]:
from pyspark import SparkFiles
file = "income.csv"
spark.sparkContext.addFile(file)
df = spark.read.csv(SparkFiles.get("income.csv"), sep=",", header=True)
df.createOrReplaceTempView('income_spark')

In [4]:
df.show()

+---+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|education_num|    marital_status|       occupation|  relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+------------+-------------+------------------+-----------------+--------------+-----+------+------------+------------+--------------+--------------+------+
| 40|Self-emp-not-inc|223881| Prof-school|           15|Married-civ-spouse|   Prof-specialty|       Husband|White|  Male|       99999|           0|            70| United-States|  >50K|
| 30|         Private|149118|     HS-grad|            9|          Divorced|     Craft-repair| Not-in-family|White|Female|           0|           0|            40| United-States| <=50K|
| 46|         Private|109209|Some-college|           10|Married-civ-spouse|

In [5]:

#using spark.sql pull all the columsn from the df except for capital.gains, capital.loss, and fnlwgt
df = spark.sql("""SELECT age, 
               workclass, 
               education, 
               education_num, 
               marital_status, 
               occupation, 
               relationship, 
               race, 
               sex, 
               hours_per_week, 
               native_country, 
               income
               FROM income_spark""")




In [6]:
#make the spark dataframe df a pandas dataframe
income = df.toPandas()

In [7]:
income.replace("?", np.nan, inplace=True)
income.head()

Unnamed: 0,age,workclass,education,education_num,marital_status,occupation,relationship,race,sex,hours_per_week,native_country,income
0,40,Self-emp-not-inc,Prof-school,15,Married-civ-spouse,Prof-specialty,Husband,White,Male,70,United-States,>50K
1,30,Private,HS-grad,9,Divorced,Craft-repair,Not-in-family,White,Female,40,United-States,<=50K
2,46,Private,Some-college,10,Married-civ-spouse,Adm-clerical,Husband,White,Male,40,United-States,>50K
3,32,Private,Assoc-voc,11,Married-civ-spouse,Other-service,Husband,White,Male,60,United-States,>50K
4,54,,Preschool,1,Married-civ-spouse,,Wife,White,Female,40,Mexico,<=50K


In [8]:
income.dropna(inplace=True)
income.head()

Unnamed: 0,age,workclass,education,education_num,marital_status,occupation,relationship,race,sex,hours_per_week,native_country,income
0,40,Self-emp-not-inc,Prof-school,15,Married-civ-spouse,Prof-specialty,Husband,White,Male,70,United-States,>50K
1,30,Private,HS-grad,9,Divorced,Craft-repair,Not-in-family,White,Female,40,United-States,<=50K
2,46,Private,Some-college,10,Married-civ-spouse,Adm-clerical,Husband,White,Male,40,United-States,>50K
3,32,Private,Assoc-voc,11,Married-civ-spouse,Other-service,Husband,White,Male,60,United-States,>50K
5,63,Private,Some-college,10,Married-civ-spouse,Prof-specialty,Husband,White,Male,16,United-States,<=50K


In [9]:
income_cat = income.dtypes[income.dtypes == 'object'].index.tolist()

In [10]:
#check the number of unique values in each column
income[income_cat].nunique()

age               72
workclass          7
education         16
education_num     16
marital_status     7
occupation        14
relationship       6
race               5
sex                2
hours_per_week    93
native_country    40
income             2
dtype: int64

In [11]:
# Create a OneHotEncoder instance
enc = OneHotEncoder(sparse=False)

# Fit and transform the OneHotEncoder using the categorical variable list
encode_df = pd.DataFrame(enc.fit_transform(income[income_cat]))

# Add the encoded variable names to the dataframe
encode_df.columns = enc.get_feature_names_out(income_cat)
encode_df.head()



Unnamed: 0,age_17,age_18,age_19,age_20,age_21,age_22,age_23,age_24,age_25,age_26,...,native_country_Scotland,native_country_South,native_country_Taiwan,native_country_Thailand,native_country_Trinadad&Tobago,native_country_United-States,native_country_Vietnam,native_country_Yugoslavia,income_<=50K,income_>50K
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0
4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0


In [12]:
# Merge one-hot encoded features and drop the originals
income = income.merge(encode_df,left_index=True, right_index=True)
income = income.drop(income_cat,1)
income.head()

  income = income.drop(income_cat,1)


Unnamed: 0,age_17,age_18,age_19,age_20,age_21,age_22,age_23,age_24,age_25,age_26,...,native_country_Scotland,native_country_South,native_country_Taiwan,native_country_Thailand,native_country_Trinadad&Tobago,native_country_United-States,native_country_Vietnam,native_country_Yugoslavia,income_<=50K,income_>50K
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0
5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0


In [13]:
#drop capital.gain, capital.loss, fnlwgt, income_>50K
income.drop(['income_>50K'], axis=1, inplace=True)


In [14]:
# Split our preprocessed data into our features and target arrays
y = income["income_<=50K"].values
X = income.drop(["income_<=50K"], axis=1).values

# Split the preprocessed data into a training and testing dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=78)

In [15]:
# Create a StandardScaler instances
scaler = StandardScaler()

# Fit the StandardScaler
X_scaler = scaler.fit(X_train)

# Scale the data
X_train_scaled = X_scaler.transform(X_train)
X_test_scaled = X_scaler.transform(X_test)

In [16]:
# Define the model - deep neural net
number_input_features = len(X_train[0])
hidden_nodes_layer1 =  8
hidden_nodes_layer2 = 5

nn = tf.keras.models.Sequential()

# First hidden layer
nn.add(
    tf.keras.layers.Dense(units=hidden_nodes_layer1, input_dim=number_input_features, activation="relu")
)

# Second hidden layer
nn.add(tf.keras.layers.Dense(units=hidden_nodes_layer2, activation="relu"))

# Output layer
nn.add(tf.keras.layers.Dense(units=1, activation="sigmoid"))

# Check the structure of the model
nn.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 8)                 2232      
                                                                 
 dense_1 (Dense)             (None, 5)                 45        
                                                                 
 dense_2 (Dense)             (None, 1)                 6         
                                                                 
Total params: 2283 (8.92 KB)
Trainable params: 2283 (8.92 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [17]:
# Compile the model
nn.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

In [18]:
# Train the model
fit_model = nn.fit(X_train_scaled,y_train,epochs=100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [19]:
# Evaluate the model using the test data
model_loss, model_accuracy = nn.evaluate(X_test_scaled,y_test,verbose=2)
print(f"Loss: {model_loss}, Accuracy: {model_accuracy}")

168/168 - 0s - loss: 0.5031 - accuracy: 0.8150 - 208ms/epoch - 1ms/step
Loss: 0.5031192302703857, Accuracy: 0.8149738907814026
