In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("FinalProject_BigData")
    .master("local[*]")
    .getOrCreate()
)
spark.range(5).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



Task 1: Data Collection

In [3]:
import pandas as pd

data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"

column_names = [
    "age", "workclass", "fnlwgt", "education", "education_num",
    "marital_status", "occupation", "relationship", "race", "sex",
    "capital_gain", "capital_loss", "hours_per_week", "native_country", "income"
]

pandas_df = pd.read_csv(data_url, names=column_names, header=None, skipinitialspace=True)

spark_df = spark.createDataFrame(pandas_df)

display(spark_df)

print("Initial preview of the dataset:")
spark_df.show(5)
print("Total number of records:", spark_df.count())

DataFrame[age: bigint, workclass: string, fnlwgt: bigint, education: string, education_num: bigint, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: bigint, capital_loss: bigint, hours_per_week: bigint, native_country: string, income: string]

Initial preview of the dataset:
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education_num|    marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|          

Task 2: Data Cleaning

In [11]:
from pyspark.sql.functions import col, trim

for col_name in spark_df.columns:
    spark_df = spark_df.withColumn(col_name, trim(col(col_name)))

spark_df = spark_df.replace("?", None).dropna()

numeric_fields = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]

for field in numeric_fields:
    spark_df = spark_df.withColumn(field, col(field).cast("int"))

display(spark_df)

DataFrame[age: int, workclass: string, fnlwgt: int, education: string, education_num: int, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: int, capital_loss: int, hours_per_week: int, native_country: string, income: string]

In [10]:
print("Here's what the cleaned data looks like:")
spark_df.show(5)

Here's what the cleaned data looks like:
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education_num|    marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9| 

Task 3: Feature Engineering

In [6]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

label_indexer = StringIndexer(inputCol="income", outputCol="label")

categorical_cols = [
    "workclass", "education", "marital_status",
    "occupation", "relationship", "race",
    "sex", "native_country"
]

indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="skip")
    for col in categorical_cols
]

encoders = [
    OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_ohe")
    for col in categorical_cols
]

numeric_cols = [
    "age", "fnlwgt", "education_num",
    "capital_gain", "capital_loss", "hours_per_week"
]

assembler = VectorAssembler(
    inputCols=numeric_cols + [f"{col}_ohe" for col in categorical_cols],
    outputCol="features"
)

pipeline = Pipeline(
    stages=indexers + encoders + [label_indexer, assembler]
)

fe_df = pipeline.fit(spark_df).transform(spark_df)
final_df = fe_df.select("features", "label")
train_df, test_df = final_df.randomSplit([0.8, 0.2], seed=42)

display(final_df)
final_df.show(5)
print("Training rows:", train_df.count())
print("Testing rows:", test_df.count())


DataFrame[features: vector, label: double]

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(96,[0,1,2,3,5,9,...|  0.0|
|(96,[0,1,2,5,7,14...|  0.0|
|(96,[0,1,2,5,6,12...|  0.0|
|(96,[0,1,2,5,6,17...|  0.0|
|(96,[0,1,2,5,6,14...|  0.0|
+--------------------+-----+
only showing top 5 rows
Training rows: 24254
Testing rows: 5908


Task 4: Model Training

In [7]:
from pyspark.ml.classification import LogisticRegression, GBTClassifier

lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=100
)

lr_model = lr.fit(train_df)

print("Logistic Regression Model")
print("Iterations:", lr_model.getMaxIter())
print("Regularization:", lr_model.getRegParam())
print("Elastic Net:", lr_model.getElasticNetParam())


gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=20,
    maxDepth=5
)

gbt_model = gbt.fit(train_df)

print("\nGradient Boosted Tree Model")
print("Iterations:", gbt_model.getMaxIter())
print("Tree Depth:", gbt_model.getMaxDepth())


Logistic Regression Model
Iterations: 100
Regularization: 0.0
Elastic Net: 0.0

Gradient Boosted Tree Model
Iterations: 20
Tree Depth: 5


Task 5: Tuning and Evaluating

In [8]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, GBTClassifier

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

lr = LogisticRegression(featuresCol="features", labelCol="label")

lr_param_grid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1])
    .addGrid(lr.elasticNetParam, [0.0, 0.5])
    .build()
)

lr_cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=lr_param_grid,
    evaluator=evaluator,
    numFolds=5
)

lr_cv_model = lr_cv.fit(train_df)
lr_best_auc = evaluator.evaluate(lr_cv_model.bestModel.transform(train_df))
print("Best Logistic Regression AUC (Training):", lr_best_auc)


gbt = GBTClassifier(featuresCol="features", labelCol="label")

gbt_param_grid = (
    ParamGridBuilder()
    .addGrid(gbt.maxDepth, [3, 5])
    .addGrid(gbt.maxIter, [10, 20])
    .build()
)

gbt_cv = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=gbt_param_grid,
    evaluator=evaluator,
    numFolds=5
)

gbt_cv_model = gbt_cv.fit(train_df)
gbt_best_auc = evaluator.evaluate(gbt_cv_model.bestModel.transform(train_df))
print("Best GBT AUC (Training):", gbt_best_auc)


Best Logistic Regression AUC (Training): 0.9040112001567027
Best GBT AUC (Training): 0.9180995307160787


Task 6: Prediction and Final Evaluation

In [9]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

lr_test_predictions = lr_cv_model.bestModel.transform(test_df)
lr_test_auc = evaluator.evaluate(lr_test_predictions)
print("Logistic Regression Test AUC:", lr_test_auc)

gbt_test_predictions = gbt_cv_model.bestModel.transform(test_df)
gbt_test_auc = evaluator.evaluate(gbt_test_predictions)
print("Gradient Boosted Tree Test AUC:", gbt_test_auc)


Logistic Regression Test AUC: 0.9001087913915462
Gradient Boosted Tree Test AUC: 0.9050750837686011
