<a href="https://colab.research.google.com/github/francoarenas91/Big-Data-Analytics-final-project/blob/main/exploratory_FFA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! pip install pyspark -q

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as fn
from pyspark.sql.functions import col, when, expr, split, size, length, regexp_replace, year
import matplotlib.pyplot as plt

import pandas as pd
import os

In [None]:
# get data
!wget https://files.consumerfinance.gov/ccdb/complaints.csv.zip -O complaints.csv.zip
!unzip -o complaints.csv.zip -d /content/


--2024-03-16 22:09:32--  https://files.consumerfinance.gov/ccdb/complaints.csv.zip
Resolving files.consumerfinance.gov (files.consumerfinance.gov)... 13.226.34.117, 13.226.34.127, 13.226.34.108, ...
Connecting to files.consumerfinance.gov (files.consumerfinance.gov)|13.226.34.117|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 751905510 (717M) [binary/octet-stream]
Saving to: ‘complaints.csv.zip’


2024-03-16 22:09:36 (179 MB/s) - ‘complaints.csv.zip’ saved [751905510/751905510]

Archive:  complaints.csv.zip
  inflating: /content/complaints.csv  


In [None]:
# initialize SparkSession
spark = SparkSession\
    .builder\
    .appName("Consumer Complaints")\
    .getOrCreate()

# read the csv file
df = spark.read.csv("/content/complaints.csv",
                    header=True,
                    multiLine=True,
                    quote="\"",
                    escape="\"",
                    inferSchema=True)

df.show(5)

+-------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+-----+--------+----+--------------------------+-------------+--------------------+----------------------------+----------------+------------------+------------+
|Date received|             Product|         Sub-product|               Issue|           Sub-issue|Consumer complaint narrative|Company public response|             Company|State|ZIP code|Tags|Consumer consent provided?|Submitted via|Date sent to company|Company response to consumer|Timely response?|Consumer disputed?|Complaint ID|
+-------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+-----+--------+----+--------------------------+-------------+--------------------+----------------------------+----------------+------------------+------------

In [None]:
# define a cutoff date
# using a static dataset ensures consistency in our model training
cutoff_date = "2024-01-01"
df = df.filter(fn.col("Date received") < fn.lit(cutoff_date))
# df.show(5)


save and load downsized dataframe for quick testing

In [None]:
# df.limit(1000).write.parquet("/content/complaints_limited.parquet",mode="overwrite")
# df=spark.read.parquet("/content/complaints_limited.parquet")

# df.count()

Data Cleaning

In [None]:
df_filtered = df.filter(df["Consumer complaint narrative"].isNotNull())
df_filtered = df_filtered.dropDuplicates()
df_filtered = df_filtered.filter(size(split(col("Consumer complaint narrative"), "\s+")) > 1)
df_filtered = df_filtered.filter(size(split(col("Consumer complaint narrative"), "\s+")) < 800)

In [None]:
# step 1: update "Consumer Loan" based on Sub-products
df_filtered = df_filtered.withColumn("Product",
    when(
        (col("Product") == "Consumer Loan") & col("Sub-product").isin(["Vehicle loan", "Vehicle lease"]),
        "Vehicle loan or lease"
    ).otherwise(col("Product")))

df_filtered = df_filtered.withColumn("Product",
    when(
        (col("Product") == "Consumer Loan") & ~col("Sub-product").isin(["Vehicle loan", "Vehicle lease"]),
        "Payday loan, title loan, or personal loan"
    ).otherwise(col("Product")))

# step 2: update "Bank account or service" for relevant Sub-products
df_filtered = df_filtered.withColumn("Product",
    when(
        (col("Product") == "Bank account or service") & col("Sub-product").isin(["Checking account", "Savings account", "(CD) Certificate of deposit"]),
        "Checking or savings account"
    ).when(
        (col("Product") == "Bank account or service") & (col("Sub-product") == "Other bank product/service"),
        "Other financial service"
    ).when(
        (col("Product") == "Bank account or service") & (col("Sub-product") == "Cashing a check without an account"),
        "Money transfer, virtual currency, or money service"
    ).otherwise(col("Product"))
)

# step 3: update other Product categories
allowed_values = [
    "Credit reporting, credit repair services, or other personal consumer reports",
    "Debt collection",
    "Mortgage",
    "Credit card or prepaid card",
    "Checking or savings account",
    "Student loan",
    "Money transfer, virtual currency, or money service",
    "Vehicle loan or lease",
    "Payday loan, title loan, or personal loan",
    "Other financial service",
    "Debt or credit management"
]

df_filtered = df_filtered.withColumn("Product",
    when(col("Product").isin(allowed_values), col("Product"))
    .when(col("Product") == "Credit card", "Credit card or prepaid card")
    .when(col("Product") == "Prepaid card", "Credit card or prepaid card")
    .when(col("Product") == "Payday loan", "Payday loan, title loan, or personal loan")
    .when(col("Product") == "Payday loan, title loan, personal loan, or advance loan", "Payday loan, title loan, or personal loan")
    .when(col("Product") == "Money transfers", "Money transfer, virtual currency, or money service")
    .when(col("Product") == "Virtual currency", "Money transfer, virtual currency, or money service")
    .when(col("Product") == "Credit reporting", "Credit reporting, credit repair services, or other personal consumer reports")
    .when(col("Product") == "Credit reporting or other personal consumer reports", "Credit reporting, credit repair services, or other personal consumer reports")
    .otherwise("Other financial service")
)

df_filtered = df_filtered.filter(~(df_filtered['Product'].isin(["Other financial service", "Debt or credit management"])))

In [None]:
selected_columns = ['Product', 'Sub-issue', 'Consumer complaint narrative',
                    'Company', 'ZIP code', 'Date sent to company']
df_selected = df_filtered.select(selected_columns).dropna()
# df_selected.show()

In [None]:
# df_selected.groupBy("Product").count().orderBy('count', ascending=False).show(truncate=False)

Save cleaned dataset to file to avoid repeating the process

In [None]:
df_selected.write.parquet("/content/cleaned_dataset.parquet",mode="overwrite")
df_selected = spark.read.parquet("/content/cleaned_dataset.parquet")

# Feature engineering

List of top words

In [None]:
# from pyspark.ml.feature import Tokenizer, RegexTokenizer
# from pyspark.sql.functions import col, explode, desc
# from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

# top_words=10

# regexTokenizer = RegexTokenizer(inputCol="Consumer complaint narrative", outputCol="words", pattern="\\W+")
# stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered_words")


# words = regexTokenizer.transform(df_selected)
# words= stopwordsRemover.transform(words)


# exploded_words = words.withColumn("words", explode(col("filtered_words")))
# top_words=exploded_words\
#                         .select("words")\
#                         .groupBy("words")\
#                         .count()\
#                         .orderBy(desc("count")).limit(top_words).rdd.map(lambda row: row.words).collect()

# # top_words = word_count.limit(10).select("words").rdd.map(lambda row: row.word).collect()
# top_words


Vectorize the columns with IDF


In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import Pipeline

# Tokenization
tokenizer = Tokenizer(inputCol="Consumer complaint narrative", outputCol="words")

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Vectorize text
cv = CountVectorizer(inputCol="filtered",
                     outputCol="rawFeatures",
                      vocabSize = 100, # the size of the vocabulary
                     minDF = 2.0 #in how many documents must a vocabulary word appear
                     )

# Apply IDF
idf = IDF(inputCol="rawFeatures", outputCol="word_features") #min doc freq ignores word that appears in less than X documents

# Pipeline
wordVecPipeline = Pipeline(stages=[tokenizer, remover, cv, idf])

# Fit the pipeline to your data
# df_vectorized = wordVecPipeline.fit(df_selected).transform(df_selected)


# Show the vectorized text
# df_vectorized.show()

NOTE: word2vect captures more nuance, but is much more computing intensive

In [None]:
# # sin and cos transformation for day and month
# from pyspark.sql.functions import col, to_date, dayofmonth, month, sin, cos
# from math import pi

# max_days_expr = when(col("Month").isin(1, 3, 5, 7, 8, 10, 12), 31) \
#     .when(col("Month").isin(4, 6, 9, 11), 30) \
#     .when(col("Month") == 2, 28)

# df_dates = df_vectorized.withColumn("Month", month(col("Date sent to company")))\
#        .withColumn("Day", dayofmonth(col("Date sent to company")))\
#        .withColumn("NormalizedDay", col("Day") / max_days_expr)\
#        .withColumn("Month_sin", sin(col("Month") * (2 * pi) / 12)) \
#        .withColumn("Month_cos", cos(col("Month") * (2 * pi) / 12)) \
#        .withColumn("Day_sin", sin(col("NormalizedDay") * 2 * pi)) \
#        .withColumn("Day_cos", cos(col("NormalizedDay") * 2 * pi))

# # df_dates.show()

Create custom transformer for this date encoding

In [None]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param, Params
from pyspark.sql.functions import col, month, dayofmonth, sin, cos, when
from math import pi

class DateFeatureTransformer(Transformer, Params):
    """
    A custom Transformer that adds date-related features to a DataFrame.
    """

    @keyword_only
    def __init__(self, inputCol=None, outputCols=None):
        super(DateFeatureTransformer, self).__init__()
        self.inputCol = Param(self, "inputCol", "")
        self.outputCols = Param(self, "outputCols", "")
        self._setDefault(inputCol=inputCol, outputCols=outputCols)

        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCols is not None:
            self.setOutputCols(outputCols)

    def setInputCol(self, value):
        """
        Sets the value of inputCol.
        """
        return self._set(inputCol=value)

    def setOutputCols(self, value):
        """
        Sets the value of outputCols.
        """
        return self._set(outputCols=value)

    def getInputCol(self):
        """
        Gets the value of inputCol or its default value.
        """
        return self.getOrDefault(self.inputCol)

    def getOutputCols(self):
        """
        Gets the value of outputCols or its default value.
        """
        return self.getOrDefault(self.outputCols)

    def _transform(self, df):
        # Define the expression for calculating the maximum number of days in a month
        max_days_expr = when(col("Month").isin(1, 3, 5, 7, 8, 10, 12), 31) \
            .when(col("Month").isin(4, 6, 9, 11), 30) \
            .when(col("Month") == 2, 28)

        # Apply transformations
        df_transformed = df.withColumn("Month", month(col(self.getInputCol()))) \
            .withColumn("Day", dayofmonth(col(self.getInputCol()))) \
            .withColumn("NormalizedDay", col("Day") / max_days_expr) \
            .withColumn("Month_sin", sin(col("Month") * (2 * pi) / 12)) \
            .withColumn("Month_cos", cos(col("Month") * (2 * pi) / 12)) \
            .withColumn("Day_sin", sin(col("NormalizedDay") * 2 * pi)) \
            .withColumn("Day_cos", cos(col("NormalizedDay") * 2 * pi))

        return df_transformed

date_transf = DateFeatureTransformer(inputCol="Date sent to company",
                                     outputCols=["Month", "Day", "NormalizedDay", "Month_sin", "Month_cos", "Day_sin", "Day_cos"])



One hot encode company

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

string_indexer = StringIndexer(inputCol = 'Company',
                               outputCol = 'Company_indexed',
                               handleInvalid = 'skip')

ohe = OneHotEncoder(inputCol = 'Company_indexed',
                    outputCol = 'Company_ohe')

ohePipeline = Pipeline(stages=[string_indexer,ohe])

# df_ohe = ohePipeline.fit(df_dates).transform(df_dates)

# df_ohe.show()

Assemble all inputs

In [None]:
from pyspark.ml.feature import VectorAssembler

#no need to scale as everithing is normalized already
vector_assembler = VectorAssembler(
    inputCols = ['word_features', 'Month_sin', 'Month_cos', 'Day_sin','Day_cos',"Company_ohe"],
    outputCol = 'features')


# df_model=vector_assembler.transform(df_binary)

# df_model.show()

Putting all transformations together in a pipeline and text-train split to avoid data leakage

In [None]:
df_binary = df_selected.withColumn("binary_y",
                              when(col("product") == "Credit reporting, credit repair services, or other personal consumer reports", 1).otherwise(0))

train_df, test_df = df_binary.randomSplit([0.8, 0.2], seed=42)

pipeline=Pipeline(stages=[tokenizer,remover,cv,idf, date_transf, string_indexer,ohe,vector_assembler])

feature_eng= pipeline.fit(train_df)
train_transformed = feature_eng.transform(train_df)
test_transformed = feature_eng.transform(test_df)


Save and read to file to avoid re-executing all the featue engineering in the test df

In [None]:
train_transformed.write.parquet("/content/train_transformed.parquet",mode="overwrite")
train_transformed = spark.read.parquet("/content/train_transformed.parquet")

test_transformed.write.parquet("/content/test_transformed.parquet",mode="overwrite")
test_transformed = spark.read.parquet("/content/test_transformed.parquet")

Try random forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf = RandomForestClassifier(featuresCol="features",
                            labelCol="binary_y",
                            numTrees=20,
                            maxDepth=4,
                            maxBins=32)

rf_model = rf.fit(train_transformed)

train_predictions = rf_model.transform(train_transformed)

test_predictions = rf_model.transform(test_transformed)

evaluator = BinaryClassificationEvaluator(labelCol="binary_y")

train_roc_auc = evaluator.evaluate(train_predictions)
test_roc_auc = evaluator.evaluate(test_predictions)
print(f"Train ROC AUC: {train_roc_auc}")
print(f"Test ROC AUC: {test_roc_auc}")

Train ROC AUC: 0.8951796292680188
Test ROC AUC: 0.8945488017632424
