### Colab Notebook Configuration 
If running in a Colab Notebook, these next few cells get the local environment configured. 

In [None]:
# %%bash
# apt-get install openjdk-8-jdk-headless -qq > /dev/null
# wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
# tar -xzf spark-2.4.5-bin-hadoop2.7.tgz
# pip install pyspark findspark

In [None]:
# %%bash
# apt-get install openjdk-8-jdk-headless -qq > /dev/null
# update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

In [None]:
# import os
# os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["PYSPARK_SUBMIT_ARGS"] ="--master local[2] pyspark-shell"

# # Find Spark so that we can access session within our notebook
# import findspark
# findspark.init()

# # Start SparkSession on all available cores
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# ! pip install wget
# import wget

# # Download complaints data 
# wget.download('https://macs-30123-final-proj-tyagie.s3.amazonaws.com/complaints.parquet', 'complaints.parquet')

In [None]:
# DATA = 'complaints.parquet'

### EMR Cluster Configuration
If running on an EMR cluster, the PySpark configuration is already set up, but I have to install a few packages into the environment.  

In [1]:
DATA = 's3://macs-30123-final-proj-tyagie/complaints.parquet'

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1591380815751_0006,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc.install_pypi_package("datetime")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting datetime
  Using cached https://files.pythonhosted.org/packages/73/22/a5297f3a1f92468cc737f8ce7ba6e5f245fcfafeae810ba37bd1039ea01c/DateTime-4.3-py2.py3-none-any.whl
Collecting zope.interface (from datetime)
  Using cached https://files.pythonhosted.org/packages/f3/21/8db61925409f4a4bac5fac19dbee26b735bd410b0f05f233058ace5511dc/zope.interface-5.1.0-cp37-cp37m-manylinux1_x86_64.whl
Installing collected packages: zope.interface, datetime
Successfully installed datetime-4.3 zope.interface-5.1.0

In [3]:
import datetime

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Load Data
I first load the full CFPB Consumer Complaints Database. The original data is available from the [CFPB website](https://www.consumerfinance.gov/data-research/consumer-complaints/) under the Download the Data section. In the previous script `01_download-and-store.py`, I download this data in CSV format, then convert it to Parquet format and store it in an S3 bucket. 

In [4]:
from pyspark.sql.types import *

# Specify schema 
schema = StructType([
    StructField("date_received", StringType(), True),
    StructField("product", StringType(), True),
    StructField("sub-product", StringType(), True), 
    StructField("issue", StringType(), True),
    StructField("sub-issue", StringType(), True),
    StructField("consumer_complaint_narrative", StringType(), True), 
    StructField("company_public_response", StringType(), True),
    StructField("company", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True), 
    StructField("tags", StringType(), True),
    StructField("consumer_consent_provided?", StringType(), True),
    StructField("submitted_via", StringType(), True), 
    StructField("date_sent_to_company", StringType(), True),
    StructField("company_response_to_consumer", StringType(), True),
    StructField("timely_response?", StringType(), True),
    StructField("consumer_disputed?", StringType(), True), 
    StructField("complaint_id", StringType(), True),
])

# Load data 
data = spark.read.parquet(DATA)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Limit to data after 2013 (methodology change) and valid outcomes  
YEARS = ['2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020']
LABELS = ['Closed with explanation', 
          'Closed with monetary relief', 
          'Closed with non-monetary relief', 
          'Untimely response']

data = data.withColumn('year', data['date_received'].substr(1, 4)) 
data = data.where(data.year.isin(YEARS))
data = data.where(data.company_response_to_consumer.isin(LABELS))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Generate Features and Label Vectors

I created features from the categorical variables through one-hot-encoding the following variables: `product`, `issue`, `company`, `state`.
I also encode the multi-class outcome that I'm predicting (`company_response_to_consumer`). Note that I specify a handler to skip rows in the testing data with one-hot-encoded feature values that weren't seen in the training data. I also encode (rather than drop) missing values, given that missing data in these fields may be itself predictive. 

In [6]:
from pyspark.sql import Window
from pyspark.sql.functions import when
from pyspark.sql import functions as F

from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler 

# Replace missing values in categorical columns with 'Missing' string 
data = data.fillna({'product':'Missing', 
                    'issue': 'Missing', 
                    'state': 'Missing', 
                    'company': 'Missing', 
                    'consumer_complaint_narrative': ''})

# Only encode companies with more than 1000 complaints 
w = Window.partitionBy('company')
data = data.withColumn('company_complaints', F.count('company').over(w))
data = data.withColumn('company_limited', 
                       when(data['company_complaints'] > 1000, data['company']).otherwise('Other'))

# Encode categorical features 
product_indexer = StringIndexer(
    inputCol='product', 
    outputCol='product_idx', 
    handleInvalid='skip'
)
product_onehot = OneHotEncoderEstimator(
    inputCols=['product_idx'], 
    outputCols=['product_dummy']
)

issue_indexer = StringIndexer(
    inputCol='issue', 
    outputCol='issue_idx', 
    handleInvalid='skip'
)
issue_onehot = OneHotEncoderEstimator(
    inputCols=['issue_idx'], 
    outputCols=['issue_dummy']
)

state_indexer = StringIndexer(
    inputCol='state', 
    outputCol='state_idx', 
    handleInvalid='skip'
)
state_onehot = OneHotEncoderEstimator(
    inputCols=['state_idx'], 
    outputCols=['state_dummy']
)

company_indexer = StringIndexer(
    inputCol='company', 
    outputCol='company_idx', 
    handleInvalid='skip'
)
company_onehot = OneHotEncoderEstimator(
    inputCols=['company_idx'], 
    outputCols=['company_dummy']
)

# Assemble features 
assembler = VectorAssembler(
    inputCols=['product_dummy', 'issue_dummy', 'state_dummy', 'company_dummy'], 
    outputCol='features'
)

# Encode label 
label_indexer = StringIndexer(
    inputCol='company_response_to_consumer', 
    outputCol='label'
)

data = data.na.drop(subset=['company_response_to_consumer'])
data = label_indexer.fit(data).transform(data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Resample Data
To address the class imbalance in this data (where over 80% of complaints result in Closed with explanation outcomes), I over-sample the minority classes and under-sample the majority class in the training data. I do this using standard bootstrapping, but I also provide code to do this using synthetic resampling of the minority classes. Note that this requires converting the PySpark RDDs into NumPy arrays or Pandas dataframes. 

In [7]:
# Split into training and testing data 
train_df, test_df = data.randomSplit([0.7, 0.3])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
from pyspark.sql.functions import col
from functools import reduce
from pyspark.sql import DataFrame

FRAC_MAJORITY = 0.5 

# Resample (undersample majority class, undersample majority class)
train_0 = train_df.where(col('label')==0.0).sample(True, FRAC_MAJORITY,  seed=0)
frac_1 = train_0.count() / train_df.where(col('label')==1.0).count() 
frac_2 = train_0.count() / train_df.where(col('label')==2.0).count() 
frac_3 = train_0.count() / train_df.where(col('label')==3.0).count()

train_0 = train_df.where(col('label')==0.0).sample(True, FRAC_MAJORITY,  seed=0)
train_1 = train_df.where(col('label')==1.0).sample(True, frac_1, seed=0)
train_2 = train_df.where(col('label')==2.0).sample(True, frac_2, seed=0)
train_3 = train_df.where(col('label')==3.0).sample(True, frac_3, seed=0)

train_dfs = [train_0, train_1, train_2, train_3]
train_df_resampled = reduce(DataFrame.unionAll, train_dfs)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Use synthetic oversampling - Note: no good PySpark implementations 
# e.g. https://github.com/anathan90/SparkSMOTE, https://github.com/Angkirat/Smote-for-Spark both are issue-prone 

# from imblearn.over_sampling import SMOTE 

# train_X = train_df.select('product_dummy', 'issue_dummy', 'state_dummy', 'company_dummy').toPandas()
# train_y = train_df.select('label').toPandas()

# sm = SMOTE(random_state=0) 
# train_X_resampled, train_y_resampled = sm.fit_sample(train_X, train_y)
# train_pdf_resampled = pd.concat(train_X_resampled, train_y_resampled)

# train_df_resampled = spark.createDataFrame(train_pdf_resampled)

### Train Models 
Finally, I build my machine learning models. I use k-fold cross-validation to identify the best performing model across a variety of classifier and hyperparameter specifications. I build the following types of classifiers: Logistic Regression, Decision Tree, Naive Bayes, Gradient-Boosted Trees, and Support Vector Machines. I iterate over (admittedly small - running out of AWS credits!) parameter grids. 

#### General Setup 

In [9]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import NaiveBayes

from pyspark.ml.classification import OneVsRest

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Build feature engineering pipeline  
pipeline = Pipeline(
    stages=[product_indexer, product_onehot, 
            issue_indexer, issue_onehot, 
            state_indexer, state_onehot,  
            company_indexer, company_onehot, 
            assembler]
)

# Fit feature engineering pipeline 
fitPipeline = pipeline.fit(train_df_resampled)
train_fit = fitPipeline.transform(train_df_resampled)
test_fit = fitPipeline.transform(test_df)

# Create evaluator 
evaluator = MulticlassClassificationEvaluator(
    labelCol='label'
)

# k in k-fold cross validation 
NUM_FOLDS = 3

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Logistic Regression

In [10]:
# LOGISTIC REGRESSION  
lr = LogisticRegression(labelCol='label')
ovr = OneVsRest(classifier=lr)

paramGrid = ((ParamGridBuilder()
  .addGrid(ovr.regParam, [0.1, 1.0]) 
  .build()))

cv = CrossValidator(estimator=ovr, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=NUM_FOLDS)

start = datetime.datetime.now()
model = cv.fit(train_fit)
print("Time Elapsed:", datetime.datetime.now() - start)

predictions = model.transform(test_fit)
print(evaluator.getMetricName(), evaluator.evaluate(predictions))
print(model.bestModel.explainParam('regParam'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time Elapsed: 0:05:19.786501
f1 0.62054165014387
regParam: regularization parameter (>= 0) (default: 0.0, current: 0.1)

#### Decision Tree

In [11]:
# DECISION TREE 
dt = DecisionTreeClassifier(labelCol='label')
ovr = OneVsRest(classifier=dt)

paramGrid = ((ParamGridBuilder()
  .addGrid(ovr.impurity, ['gini', 'entropy'])
  .addGrid(ovr.maxDepth, [10, 20])            
  .build()))

cv = CrossValidator(estimator=ovr, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=NUM_FOLDS)

start = datetime.datetime.now()
model = cv.fit(train_fit)
print("Time Elapsed:", datetime.datetime.now() - start)

predictions = model.transform(test_fit)
print(evaluator.getMetricName(), evaluator.evaluate(predictions))
print(model.bestModel.explainParam('impurity'))
print(model.bestModel.explainParam('maxDepth'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time Elapsed: 1:00:15.282377
f1 0.47619958830219755
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini, current: gini)
maxDepth: Maximum depth of the tree. (Nonnegative) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 20)

#### Naive Bayes

In [14]:
# NAIVE BAYES 
nb = NaiveBayes(labelCol='label')
ovr = OneVsRest(classifier=nb)

paramGrid = ((ParamGridBuilder()       
  .build()))

cv = CrossValidator(estimator=ovr, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=NUM_FOLDS)

start = datetime.datetime.now()
model = cv.fit(train_fit)
print("Time Elapsed:", datetime.datetime.now() - start)

predictions = model.transform(test_fit)
print(evaluator.getMetricName(), evaluator.evaluate(predictions))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time Elapsed: 0:01:36.720193
f1 0.5051995904019904

#### Gradient-Boosted Trees

In [None]:
# # GRADIENT BOOSTED TREES 
# NOTE: PySpark's OVR support for Gradient Boosted Trees is glitchy 
# https://github.com/eubr-bigsea/citrus/issues/82

# gbt = GBTClassifier(labelCol='label')
# ovr = OneVsRest(classifier=gbt)

# paramGrid = ((ParamGridBuilder()
#   .addGrid(gbt.stepSize, [0.01, 0.1, 0.5])
#   .addGrid(gbt.maxDepth, [10, 20])            
#   .build()))

# cv = CrossValidator(estimator=ovr, 
#                     estimatorParamMaps=paramGrid, 
#                     evaluator=evaluator, 
#                     numFolds=NUM_FOLDS)

# start = datetime.datetime.now()
# model = cv.fit(train_fit)
# print("Time Elapsed:", datetime.datetime.now() - start)

# predictions = model.transform(test_fit)
# print(evaluator.getMetricName(), evaluator.evaluate(predictions))
# print(model.bestModel.explainParam('stepSize'))
# print(model.bestModel.explainParam('maxDepth'))

#### Support Vector Machines

In [15]:
# SUPPORT VECTOR MACHINE 
# ovr = LinearSVC(labelCol='label')
svm = LinearSVC(labelCol='label')
ovr = OneVsRest(classifier=svm)

paramGrid = ((ParamGridBuilder()
  .addGrid(svm.regParam, [0.1])
  .build()))

cv = CrossValidator(estimator=ovr, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=NUM_FOLDS)

start = datetime.datetime.now()
model = cv.fit(train_fit)
print("Time Elapsed:", datetime.datetime.now() - start)

predictions = model.transform(test_fit)
print(evaluator.getMetricName(), evaluator.evaluate(predictions))
# print(model.bestModel.explainParam('stepSize'))
# print(model.bestModel.explainParam('maxDepth'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-15:
Traceback (most recent call last):
  File "/emr/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/emr/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/emr/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 9195



Time Elapsed: 0:56:18.537995
f1 0.581622630251788

### Evaluate Models
Finally, I provide a more substantive evaluation of the best performing model. I report precision, recall, and F1 score overall unweighted, by class, and weighted by class. 

In [16]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Instantiate metrics object
predictionAndLabels = predictions.select('prediction', 'label')
metrics = MulticlassMetrics(predictionAndLabels.rdd)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Overall statistics
print("Overall Summary Stats")
print("  Precision = %s" % metrics.precision())
print("  Recall = %s" % metrics.recall())
print("  F1 score = %s" % metrics.fMeasure())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Overall Summary Stats
  Precision = 0.5255003155142883
  Recall = 0.5255003155142883
  F1 score = 0.5255003155142883

In [18]:
# Statistics by class 
labels = predictions.select('label').distinct().collect()
print("Summary Statistics by Class")
for l in labels: 
  print("  Class %s" % l[0])
  print("   Precision = %s" % metrics.precision(l[0]))
  print("   Recall = %s" % metrics.recall(l[0]))
  print("   F1 score = %s" % metrics.fMeasure(l[0]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Summary Statistics by Class
  Class 1.0
   Precision = 0.2902257271017219
   Recall = 0.6846147282191313
   F1 score = 0.407641693976577
  Class 2.0
   Precision = 0.177631857657276
   Recall = 0.8992065194081064
   F1 score = 0.29666053488043015
  Class 0.0
   Precision = 0.9483585084443453
   Recall = 0.4728680016119395
   F1 score = 0.6310723723862951
  Class 3.0
   Precision = 0.24178549287042778
   Recall = 0.9774436090225563
   F1 score = 0.3876739562624254

In [19]:
# Weighted summary statistics 
print("Weighted Summary Stats")
print("  Weighted Recall = %s" % metrics.weightedRecall)
print("  Weighted precision = %s" % metrics.weightedPrecision)
print("  Weighted F1 score = %s" % metrics.weightedFMeasure())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Weighted Summary Stats
  Weighted Recall = 0.5255003155142883
  Weighted precision = 0.8177752682209127
  Weighted F1 score = 0.5829016511363316