# Binary Classification Example

## Dataset Info

The Adult dataset is publicly available at the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult). This data derives from census data and consists of information about 48842 individuals and their annual income. You can use this information to predict if an individual earns <=50K or >50k a year. The dataset consists of both numeric and categorical variables. <br>

Attribute Information:

age: continuous <br>
workclass: Private,Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked <br>
fnlwgt: continuous <br>
education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc... <br>
education-num: continuous <br>
marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent... <br>
occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners... <br>
relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried <br>
race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black <br>
sex: Female, Male <br>
capital-gain: continuous <br>
capital-loss: continuous <br>
hours-per-week: continuous <br>
native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany... <br>
Target/Label: - <=50K, >50K

### Imports and PySpark Setup

In [1]:
from pathlib import Path
import sys

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
conf = SparkConf() \
    .setAppName('pySparkExamples') \
    .setMaster('local')

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

Disable Line wrapping in output cells

In [4]:
%%html
<style>
div.output_area pre {
    white-space: pre;
}
</style>

In [5]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

### Reading Dataset

In [6]:
PROJECT_ROOT = Path.cwd().parent.as_posix()
csv_file = f"{PROJECT_ROOT}/data/adult.data"
uri_scheme = "file://"

In [7]:
input_path = f"{uri_scheme}/{csv_file}" if sys.platform == "win32" else f"{uri_scheme}{csv_file}"

In [8]:
# Creating Schema since headers are not present

from pyspark.sql.types import DoubleType, StringType, StructField, StructType

schema = StructType([
    StructField("age", DoubleType(), False),
    StructField("workclass", StringType(), False),
    StructField("fnlwgt", DoubleType(), False),
    StructField("education", StringType(), False),
    StructField("education_num", DoubleType(), False),
    StructField("marital_status", StringType(), False),
    StructField("occupation", StringType(), False),
    StructField("relationship", StringType(), False),
    StructField("race", StringType(), False),
    StructField("sex", StringType(), False),
    StructField("capital_gain", DoubleType(), False),
    StructField("capital_loss", DoubleType(), False),
    StructField("hours_per_week", DoubleType(), False),
    StructField("native_country", StringType(), False),
    StructField("income", StringType(), False)
])

In [9]:
dataset = spark.read.csv(input_path, header=False, schema=schema)
columns = dataset.columns

In [10]:
dataset.printSchema()
dataset.show()

root
 |-- age: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)

+----+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| age|        workclass|  fnlwgt|    education|education_num|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_

### Data Preprocessing

- Category Indexing
- One Hot Encoding

In [11]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [12]:
categorical_columns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex",
                       "native_country"]

In [13]:
# Pipeline Stages

stages = []

In [14]:
# indexing and transforming categorical columns

for categorical_column in categorical_columns:
    string_indexer = StringIndexer(inputCol=categorical_column, outputCol=categorical_column + "Index")
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[categorical_column + "classVec"])
    stages += [string_indexer, encoder]

In [15]:
# indexing Label

label_indexer = StringIndexer(inputCol="income", outputCol="label")
stages += [label_indexer]

In [16]:
# Transform all features into a vector using VectorAssembler

numeric_columns = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assembler_inputs = [c + "classVec" for c in categorical_columns] + numeric_columns
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages += [assembler]

### Preparing train and test data

In [17]:
from pyspark.ml import Pipeline

In [18]:
partial_pipeline = Pipeline().setStages(stages)
pipeline_model = partial_pipeline.fit(dataset)
transformed_dataset = pipeline_model.transform(dataset)

In [19]:
# Keep relevant columns

columns_to_select = ["label", "features"] + columns
dataset = transformed_dataset.select(columns_to_select)
display(dataset)

DataFrame[label: double, features: vector, age: double, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string]

In [20]:
# Randomly split data into training and test sets. set seed for reproducibility

(training_data, test_data) = dataset.randomSplit([0.8, 0.2], seed=100)
print(training_data.count())
print(test_data.count())

26082
6479


In [21]:
training_data.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



## Create & Evaluate Models

**Classification Algorithms**
- Logistic Regression
- Decision Tree Classifier
- Random Forest Classifier

**Steps**
1. Create initial model using training dataset
2. Tune Parameters with *ParamGrid* and 5-Fold Cross Validation
3. Evaluate the best model from cross validation using test dataset

### Logistic Regression

In [22]:
from pyspark.ml.classification import LogisticRegression

#### Build Model

In [23]:
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
lr_model = lr.fit(training_data)
lr_predictions = lr_model.transform(test_data)

#### Evaluate Model

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [25]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(lr_predictions)

0.8985934331968357

#### Parameter Tuning

In [26]:
# Get List of Tunable Parameters

print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [28]:
lr_parameter_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.5, 2.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [1, 5, 10]) \
    .build()

#### Cross Validation

In [29]:
# Create 5-fold cross validator
cv_lr = CrossValidator(estimator=lr, estimatorParamMaps=lr_parameter_grid, evaluator=evaluator, numFolds=5)

# run cross validations
cv_lr_model = cv_lr.fit(training_data)

# use best model for predictions
cv_lr_predictions = cv_lr_model.transform(test_data)

In [30]:
# Evaluate best model

evaluator.evaluate(cv_lr_predictions)

0.8970005900581439

### Decision trees

In [31]:
from pyspark.ml.classification import DecisionTreeClassifier

#### Build Model

In [32]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)
dt_model = dt.fit(training_data)

print("numNodes = ", dt_model.numNodes)
print("depth = ", dt_model.depth)
display(dt_model)

dt_predictions = dt_model.transform(test_data)

numNodes =  11
depth =  3


DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5f992cb423e1, depth=3, numNodes=11, numClasses=2, numFeatures=100

#### Evaluate Model

In [33]:
evaluator.evaluate(dt_predictions)

0.746863550622595

#### Parameter Tuning

In [34]:
# Get List of Tunable Parameters

print(dt.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: label)
leafCol: Leaf indices column name. Predicted leaf index of each instance in each tree by preorder. (default: )
maxBins: Max number of bins for discreti

In [35]:
dt_parameter_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [1, 2, 6, 10]) \
    .addGrid(dt.maxBins, [20, 40, 80]) \
    .build()

#### Cross Validation

In [36]:
# Create 5-fold cross validator
cv_dt = CrossValidator(estimator=dt, estimatorParamMaps=dt_parameter_grid, evaluator=evaluator, numFolds=5)

# run cross validations
cv_dt_model = cv_dt.fit(training_data)

# use best model for predictions
cv_dt_predictions = cv_dt_model.transform(test_data)

In [37]:
# Evaluate best model

evaluator.evaluate(cv_dt_predictions)

0.7719608343673301

### Random Forest

In [38]:
from pyspark.ml.classification import RandomForestClassifier

#### Build Model

In [39]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf_model = rf.fit(training_data)

rf_predictions = rf_model.transform(test_data)

#### Evaluate Model

In [40]:
evaluator.evaluate(rf_predictions)

0.8731913122307333

#### Parameter Tuning

In [41]:
# Get List of Tunable Parameters

print(rf.explainParams())

bootstrap: Whether bootstrap samples are used when building trees. (default: True)
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the featur

In [42]:
rf_parameter_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [2, 4, 6]) \
    .addGrid(rf.maxBins, [20, 60]) \
    .addGrid(rf.numTrees, [5, 20]) \
    .build()

#### Cross Validation

In [43]:
# Create 5-fold cross validator
cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=rf_parameter_grid, evaluator=evaluator, numFolds=5)

# run cross validations
cv_rf_model = cv_rf.fit(training_data)

# use best model for predictions
cv_rf_predictions = cv_rf_model.transform(test_data)

In [44]:
# Evaluate best model

evaluator.evaluate(cv_rf_predictions)

0.8883229951100481