# COURSE: Big Data - CTS43135

## Lab Instruction #5: Machine Learning with Spark MLlib

### Lab Objectives:
- Understand and practice data processing with PySpark and Spark MLlib.
- Load and explore the dataset.
- Perform feature preprocessing.
- Define the model and build the pipeline.

### Prerequisites:
- Basic knowledge of Python, SQL, and Machine Learning is recommended.
- This lab runs on Python 3.6+ with Apache Spark 3.x.
- A working environment like Jupyter Notebook or Google Colab is recommended for easier execution.

#### Activity 1: Load the dataset & Data Exploration

#### 1. Download the dataset.

In [3]:
import urllib.request

url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
urllib.request.urlretrieve(url, "adult.csv")


('adult.csv', <http.client.HTTPMessage at 0x18fe8304250>)

#### 2. Display the First Few Lines

In [5]:
with open('adult.csv', 'r') as file:
    for i in range(5):
        print(file.readline().strip())


39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K
38, Private, 215646, HS-grad, 9, Divorced, Handlers-cleaners, Not-in-family, White, Male, 0, 0, 40, United-States, <=50K
53, Private, 234721, 11th, 7, Married-civ-spouse, Handlers-cleaners, Husband, Black, Male, 0, 0, 40, United-States, <=50K
28, Private, 338409, Bachelors, 13, Married-civ-spouse, Prof-specialty, Wife, Black, Female, 0, 0, 40, Cuba, <=50K


#### 3. Initialize a Spark Session.

In [8]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("lab05").getOrCreate()


#### 4. Load the Dataset.

In [9]:
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""
dataset = spark.read.csv("adult.csv", schema=schema)


#### 5. Split the dataset into training and test set.

In [11]:
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) # Cache because accessing training data multiple times
print(testDF.count())


26076
6485


#### 6. Data Exploration.

In [14]:
trainDF.select("hours_per_week").summary().show()


+-------+------------------+
|summary|    hours_per_week|
+-------+------------------+
|  count|             26076|
|   mean|  40.4284782942169|
| stddev|12.404569739132008|
|    min|               1.0|
|    25%|              40.0|
|    50%|              40.0|
|    75%|              45.0|
|    max|              99.0|
+-------+------------------+



In [15]:
trainDF.groupBy("education").count().sort("count", ascending=False).show()


+-------------+-----+
|    education|count|
+-------------+-----+
|      HS-grad| 8408|
| Some-college| 5860|
|    Bachelors| 4255|
|      Masters| 1388|
|    Assoc-voc| 1102|
|         11th|  958|
|   Assoc-acdm|  845|
|         10th|  748|
|      7th-8th|  510|
|  Prof-school|  465|
|          9th|  419|
|         12th|  361|
|    Doctorate|  323|
|      5th-6th|  265|
|      1st-4th|  126|
|    Preschool|   43|
+-------------+-----+



### Activity 2: Feature preprocessing


#### 1. Convert categorical variables to numeric

In [16]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
categoricalCols = ["workclass", "education", "marital_status", "occupation",
"relationship", "race", "sex"]
# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols])
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols])
# The label column ("income") is also a string value - it has two possible values, "<=50K" and ">50K".
# Convert it to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="income", outputCol="label")

In [None]:
stringIndexerModel = stringIndexer.fit(trainDF)



In [20]:
trainDF.show()

+----+---------+--------+-------------+-------------+--------------+----------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
| age|workclass|  fnlwgt|    education|education_num|marital_status|occupation|   relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+----+---------+--------+-------------+-------------+--------------+----------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
|17.0|        ?| 34019.0|         10th|          6.0| Never-married|         ?|      Own-child|              White|   Male|         0.0|         0.0|          20.0| United-States| <=50K|
|17.0|        ?| 34088.0|         12th|          8.0| Never-married|         ?|      Own-child|              White| Female|         0.0|         0.0|          25.0| United-States| <=50K|
|17.0|        ?| 47407.0|         11th|          7.0| Never-marri

#### 2. Combine all feature columns into a single feature vector. 

In [21]:
from pyspark.ml.feature import VectorAssembler
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")


### Activity 3: Define the model & Build the pipeline

#### 1. Create a logistic regression model.

In [22]:
# Import the LogisticRegression model from PySpark's MLlib
from pyspark.ml.classification import LogisticRegression
# Initialize a Logistic Regression model
# - featuresCol: Specifies the column containing feature vectors
# - labelCol: Specifies the column containing the target label (0 or 1)
# - regParam: Regularization parameter (L2 regularization by default); helps prevent overfitting
lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)


#### 2. Build a pipeline

In [None]:
from pyspark.ml import Pipeline
# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler,lr])
# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)
# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)


#### 3. Display the predictions.

In [25]:
predDF.select("features", "label", "prediction", "probability").show(5, truncate=False)

+-------------------------------------------------------------------------------------------+-----+----------+----------------------------------------+
|features                                                                                   |label|prediction|probability                             |
+-------------------------------------------------------------------------------------------+-----+----------+----------------------------------------+
|(59,[3,13,24,36,45,48,53,54,55,58],[1.0,1.0,1.0,1.0,1.0,1.0,17.0,41643.0,7.0,15.0])        |0.0  |0.0       |[0.9062475766406436,0.09375242335935641]|
|(59,[3,15,24,36,45,48,52,53,54,55,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,17.0,64785.0,6.0,30.0]) |0.0  |0.0       |[0.8927692098013015,0.10723079019869852]|
|(59,[3,13,24,36,45,48,53,54,55,58],[1.0,1.0,1.0,1.0,1.0,1.0,17.0,80077.0,7.0,20.0])        |0.0  |0.0       |[0.9041098028218706,0.09589019717812941]|
|(59,[3,13,24,36,45,48,52,53,54,55,58],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,17.0,104025.0,7.0,18

#### 4. Evaluate the model.

In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")

mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

Area under ROC curve: 0.8832451799218509
Accuracy: 0.7680801850424056
