In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.ml.feature import StringIndexer,StandardScaler, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import findspark
import warnings
warnings.filterwarnings("ignore")

np.random.seed(42)

In [2]:
findspark.init()

spark = SparkSession.builder.master("local[4]").appName("ISM6562 Spark App01").enableHiveSupport().getOrCreate();

# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext  

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/28 13:28:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/28 13:28:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark Session WebUI Port: 4041


In [3]:
# fetch dataset 
df = pd.read_csv('adult_data.csv')
df.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,income
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


In [4]:
# Drop rows with missing values
df = df.dropna()

In [5]:
# Define the dictionary for renaming columns
cols_rename = {
    'age': 'age',
    'workclass': 'workclass',
    'fnlwgt': 'fnl_wgt',
    'education': 'education',
    'education-num': 'education_num',
    'marital-status': 'marital_status',
    'occupation': 'occupation',
    'relationship': 'relationship',
    'race': 'race',
    'sex': 'sex',
    'capital-gain': 'capital_gain',
    'capital-loss': 'capital_loss',
    'hours-per-week': 'hours_per_week',
    'native-country': 'native_country',
    'income': 'income'
}

# Rename columns in the DataFrame
df.rename(columns=cols_rename, inplace=True)

In [6]:
df['income'] = df['income'].str.replace('<=50K.', '<=50K', regex=True).str.replace('>50K.', '>50K', regex=True)

In [7]:
df['income'] = ['0' if value == '<=50K' else '1' for value in df['income']]

In [8]:
df['income'] = df['income'].astype('int')

In [9]:
# Convert pandas DataFrame to Spark DataFrame
df_spark = spark.createDataFrame(df)

# Show the Spark DataFrame
df_spark.show()

24/04/28 13:28:38 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+---+----------------+-------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnl_wgt|   education|education_num|      marital_status|       occupation| relationship|              race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+-------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
| 39|       State-gov|  77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States|     0|
| 50|Self-emp-not-inc|  83311|   Bachelors|           13|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|           0|           0|            13| United-States|     0|


In [10]:
#  Drop irrelevant columns
columns_to_drop = ['workclass', 'education_num', 'fnl_wgt']
df_spark = df_spark.drop(*columns_to_drop)

In [11]:
# Replace question marks with 'Unknown' in all columns
df_spark = df_spark.replace('?', 'Unknown')

In [12]:
# Iterate over each column and print unique values
for col_name in df_spark.columns:
    unique_values = df_spark.select(col(col_name)).distinct().collect()
    unique_values_list = [row[0] for row in unique_values]
    print(f"Unique values in column '{col_name}': {unique_values_list}")

                                                                                

Unique values in column 'age': [29, 26, 65, 54, 19, 22, 77, 34, 50, 57, 32, 43, 84, 31, 39, 25, 71, 68, 72, 58, 27, 63, 56, 51, 52, 79, 17, 41, 28, 33, 88, 85, 48, 67, 44, 61, 37, 83, 55, 74, 62, 49, 35, 80, 66, 76, 36, 75, 78, 18, 69, 21, 59, 81, 38, 82, 42, 30, 73, 90, 23, 46, 20, 70, 60, 40, 64, 53, 45, 47, 24, 87, 86, 89]
Unique values in column 'education': ['Masters', '10th', '5th-6th', 'Assoc-acdm', 'Assoc-voc', '7th-8th', '9th', 'HS-grad', 'Bachelors', '11th', '1st-4th', 'Preschool', '12th', 'Doctorate', 'Some-college', 'Prof-school']
Unique values in column 'marital_status': ['Separated', 'Never-married', 'Married-spouse-absent', 'Divorced', 'Widowed', 'Married-AF-spouse', 'Married-civ-spouse']
Unique values in column 'occupation': ['Sales', 'Exec-managerial', 'Prof-specialty', 'Handlers-cleaners', 'Farming-fishing', 'Craft-repair', 'Unknown', 'Transport-moving', 'Priv-house-serv', 'Protective-serv', 'Other-service', 'Tech-support', 'Machine-op-inspct', 'Armed-Forces', 'Adm-cl

In [None]:
# StringIndexer for categorical columns
indexer_education = StringIndexer(inputCol='education', outputCol='education_index', handleInvalid='keep')
indexer_marital_status = StringIndexer(inputCol='marital_status', outputCol='marital_status_index', handleInvalid='keep')
indexer_occupation = StringIndexer(inputCol='occupation', outputCol='occupation_index', handleInvalid='keep')
indexer_relationship = StringIndexer(inputCol='relationship', outputCol='relationship_index', handleInvalid='keep')
indexer_race = StringIndexer(inputCol='race', outputCol='race_index', handleInvalid='keep')
indexer_sex = StringIndexer(inputCol='sex', outputCol='sex_index', handleInvalid='keep')
indexer_native_country = StringIndexer(inputCol='native_country', outputCol='native_country_index', handleInvalid='keep')

In [13]:
df_spark.columns

['age',
 'education',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native_country',
 'income']

In [14]:
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)

In [15]:
train_data, test_data

(DataFrame[age: bigint, education: string, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: bigint, capital_loss: bigint, hours_per_week: bigint, native_country: string, income: bigint],
 DataFrame[age: bigint, education: string, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: bigint, capital_loss: bigint, hours_per_week: bigint, native_country: string, income: bigint])

In [30]:
# Define numeric columns
numeric_cols = ['age', 'capital_gain', 'capital_loss', 'hours_per_week']

# Define categorical columns
categorical_cols = ['education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country']

# Define transformers for numeric columns
numeric_assembler = VectorAssembler(inputCols=numeric_cols, outputCol='numeric_features')
scaler = StandardScaler(inputCol='numeric_features', outputCol='scaled_numeric_features')

# Define transformers for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col+'_index', handleInvalid='keep') for col in categorical_cols]
encoder = OneHotEncoder(inputCols=[col+'_index' for col in categorical_cols], 
                        outputCols=[col+'_encoded' for col in categorical_cols])

# Define VectorAssembler for features
assembler = VectorAssembler(inputCols=['scaled_numeric_features'] + [col+'_encoded' for col in categorical_cols], 
                            outputCol='features')

# Define Decision Tree Classifier
dt = DecisionTreeClassifier(featuresCol='features', labelCol='income')

# Create Pipeline
pipeline = Pipeline(stages=[numeric_assembler, scaler] + indexers + [encoder, assembler, dt])

# Fit Pipeline on Training Data
pipeline_model = pipeline.fit(train_data)

                                                                                

In [31]:
# Transform Train Data
train_predictions = pipeline_model.transform(train_data)

# Transform Test Data
test_predictions = pipeline_model.transform(test_data)

# Evaluate Model on Train Data
evaluator = BinaryClassificationEvaluator(labelCol='income', metricName='areaUnderROC')
train_roc_auc = evaluator.evaluate(train_predictions)
print("Train ROC AUC:", train_roc_auc)

# Evaluate Model on Test Data
test_roc_auc = evaluator.evaluate(test_predictions)
print("Test ROC AUC:", test_roc_auc)

                                                                                

Train ROC AUC: 0.6621526487864421
Test ROC AUC: 0.6643192029669447
