In [19]:
from google.colab import drive
drive.mount('/content/drive')

project_path = "/content/drive/MyDrive/Data_Science_Projects/Customer_Complaint_Trend_Forecasting/"
data_path = project_path + "data/"

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark --quiet

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.12/dist-packages/pyspark"

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Cleaner").getOrCreate()

print("Spark ready.")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Spark ready.


In [20]:
df = spark.read.option("header", True).option("inferSchema", True).csv(data_path + "complaints_raw.csv")

print("RAW COLUMNS:")
for c in df.columns:
    print(repr(c))

df.printSchema()


RAW COLUMNS:
'Date received'
'Product'
'Sub-product'
'Issue'
'Sub-issue'
'Consumer complaint narrative'
'Company public response'
'Company'
'State'
'ZIP code'
'Tags'
'Consumer consent provided?'
'Submitted via'
'Date sent to company'
'Company response to consumer'
'Timely response?'
'Consumer disputed?'
'Complaint ID'
root
 |-- Date received: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Sub-product: string (nullable = true)
 |-- Issue: string (nullable = true)
 |-- Sub-issue: string (nullable = true)
 |-- Consumer complaint narrative: string (nullable = true)
 |-- Company public response: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZIP code: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Consumer consent provided?: string (nullable = true)
 |-- Submitted via: string (nullable = true)
 |-- Date sent to company: string (nullable = true)
 |-- Company response to consumer: string (nulla

In [21]:
from pyspark.sql.functions import col, trim, length

text_col = "Consumer complaint narrative"

df = df.dropna(subset=[text_col])
df = df.filter(length(trim(col(text_col))) > 0)

print("AFTER TEXT DROP:")
for c in df.columns:
    print(repr(c))


AFTER TEXT DROP:
'Date received'
'Product'
'Sub-product'
'Issue'
'Sub-issue'
'Consumer complaint narrative'
'Company public response'
'Company'
'State'
'ZIP code'
'Tags'
'Consumer consent provided?'
'Submitted via'
'Date sent to company'
'Company response to consumer'
'Timely response?'
'Consumer disputed?'
'Complaint ID'


In [22]:
from pyspark.sql import functions as F

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

df = df.withColumn(
    "date_received",
    F.to_date(df["Date received"], "MM/dd/yy")
)

df = df.filter(df["date_received"].isNotNull())
df = df.drop("Date received")

df = df.withColumn("year", F.year("date_received"))
df = df.withColumn("month_num", F.month("date_received"))
df = df.withColumn("day", F.dayofmonth("date_received"))

print("AFTER DATE CLEANING:")
for c in df.columns:
    print(repr(c))


AFTER DATE CLEANING:
'Product'
'Sub-product'
'Issue'
'Sub-issue'
'Consumer complaint narrative'
'Company public response'
'Company'
'State'
'ZIP code'
'Tags'
'Consumer consent provided?'
'Submitted via'
'Date sent to company'
'Company response to consumer'
'Timely response?'
'Consumer disputed?'
'Complaint ID'
'date_received'
'year'
'month_num'
'day'


In [23]:
from pyspark.sql.functions import upper, trim

df = df.filter(df["State"].isNotNull())
df = df.filter(length(trim(col("State"))) > 0)

df = df.withColumn("State", upper(trim(col("State"))))
df = df.filter(length(col("State")) == 2)
df = df.filter(~col("State").isin("NA", "N/A", "XX", "--", "??", "00"))

print("AFTER STATE CLEANING:")
for c in df.columns:
    print(repr(c))


AFTER STATE CLEANING:
'Product'
'Sub-product'
'Issue'
'Sub-issue'
'Consumer complaint narrative'
'Company public response'
'Company'
'State'
'ZIP code'
'Tags'
'Consumer consent provided?'
'Submitted via'
'Date sent to company'
'Company response to consumer'
'Timely response?'
'Consumer disputed?'
'Complaint ID'
'date_received'
'year'
'month_num'
'day'


In [24]:
from pyspark.sql.functions import regexp_replace, when

df = df.withColumn("Product", upper(trim(col("Product"))))

df = df.withColumn(
    "Product",
    regexp_replace(col("Product"), "^(NA|NONE|N/A|NULL|UNSPECIFIED|\\s*)$", "")
)

df = df.withColumn(
    "Product",
    when(col("Product") == "", None).otherwise(col("Product"))
)

df = df.na.fill({"Product": "UNKNOWN"})

print("AFTER PRODUCT CLEANING:")
df.select("Product").show(10, truncate=False)


AFTER PRODUCT CLEANING:
+----------------------------------------------------------------------------+
|Product                                                                     |
+----------------------------------------------------------------------------+
|DEBT COLLECTION                                                             |
|MONEY TRANSFER, VIRTUAL CURRENCY, OR MONEY SERVICE                          |
|CREDIT REPORTING OR OTHER PERSONAL CONSUMER REPORTS                         |
|CHECKING OR SAVINGS ACCOUNT                                                 |
|CREDIT REPORTING OR OTHER PERSONAL CONSUMER REPORTS                         |
|CREDIT REPORTING, CREDIT REPAIR SERVICES, OR OTHER PERSONAL CONSUMER REPORTS|
|CREDIT REPORTING, CREDIT REPAIR SERVICES, OR OTHER PERSONAL CONSUMER REPORTS|
|CREDIT REPORTING OR OTHER PERSONAL CONSUMER REPORTS                         |
|DEBT COLLECTION                                                             |
|CREDIT REPORTING OR OTHER P

In [25]:
df = df.withColumn("Issue", upper(trim(col("Issue"))))

df = df.withColumn(
    "Issue",
    regexp_replace(col("Issue"), "^(NA|NONE|N/A|NULL|\\s*)$", "")
)

df = df.withColumn(
    "Issue",
    when(col("Issue") == "", None).otherwise(col("Issue"))
)

df = df.na.fill({"Issue": "NO_ISSUE"})

print("AFTER ISSUE CLEANING:")
df.select("Issue").show(10, truncate=False)


AFTER ISSUE CLEANING:
+---------------------------------------------------+
|Issue                                              |
+---------------------------------------------------+
|TOOK OR THREATENED TO TAKE NEGATIVE OR LEGAL ACTION|
|FRAUD OR SCAM                                      |
|INCORRECT INFORMATION ON YOUR REPORT               |
|PROBLEM CAUSED BY YOUR FUNDS BEING LOW             |
|IMPROPER USE OF YOUR REPORT                        |
|IMPROPER USE OF YOUR REPORT                        |
|INCORRECT INFORMATION ON YOUR REPORT               |
|INCORRECT INFORMATION ON YOUR REPORT               |
|ATTEMPTS TO COLLECT DEBT NOT OWED                  |
|INCORRECT INFORMATION ON YOUR REPORT               |
+---------------------------------------------------+
only showing top 10 rows



In [26]:
df = df.withColumn("Sub-issue", upper(trim(col("Sub-issue"))))

df = df.withColumn(
    "Sub-issue",
    regexp_replace(col("Sub-issue"), "^(NA|NONE|N/A|NULL|\\s*)$", "")
)

df = df.withColumn(
    "Sub-issue",
    when(col("Sub-issue") == "", None).otherwise(col("Sub-issue"))
)

df = df.na.fill({"Sub-issue": "NO_SUB_ISSUE"})

print("AFTER SUB-ISSUE CLEANING:")
df.select("Sub-issue").show(10, truncate=False)


AFTER SUB-ISSUE CLEANING:
+---------------------------------------------+
|Sub-issue                                    |
+---------------------------------------------+
|SEIZED OR ATTEMPTED TO SEIZE YOUR PROPERTY   |
|NO_SUB_ISSUE                                 |
|INFORMATION BELONGS TO SOMEONE ELSE          |
|OVERDRAFTS AND OVERDRAFT FEES                |
|REPORTING COMPANY USED YOUR REPORT IMPROPERLY|
|REPORTING COMPANY USED YOUR REPORT IMPROPERLY|
|INFORMATION BELONGS TO SOMEONE ELSE          |
|INFORMATION BELONGS TO SOMEONE ELSE          |
|DEBT WAS PAID                                |
|ACCOUNT INFORMATION INCORRECT                |
+---------------------------------------------+
only showing top 10 rows



In [27]:
df = df.withColumn(
    "clean_text",
    regexp_replace(
        col(text_col),
        "(?i)(x{3,}|\\*{3,}|n/?a|#|\\[redacted\\]|information provided.*)",
        ""
    )
)

df = df.withColumn("clean_text", lower(col("clean_text")))
df = df.withColumn("clean_text", regexp_replace(col("clean_text"), "[^a-zA-Z\\s]", " "))
df = df.withColumn("clean_text", regexp_replace(col("clean_text"), "\\s+", " "))
df = df.withColumn("clean_text", trim(col("clean_text")))

df = df.filter(length(col("clean_text")) > 20)

print("AFTER CLEAN TEXT:")
df.select("clean_text").show(10, truncate=False)


AFTER CLEAN TEXT:
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [28]:
print("FINAL COLUMNS:")
for c in df.columns:
    print(repr(c))

df.printSchema()
df.show(5, truncate=120)

spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")

df.write.mode("overwrite").parquet(data_path + "cleaned_data.parquet")
print("CLEANED DATA SAVED.")


FINAL COLUMNS:
'Product'
'Sub-product'
'Issue'
'Sub-issue'
'Consumer complaint narrative'
'Company public response'
'Company'
'State'
'ZIP code'
'Tags'
'Consumer consent provided?'
'Submitted via'
'Date sent to company'
'Company response to consumer'
'Timely response?'
'Consumer disputed?'
'Complaint ID'
'date_received'
'year'
'month_num'
'day'
'clean_text'
root
 |-- Product: string (nullable = false)
 |-- Sub-product: string (nullable = true)
 |-- Issue: string (nullable = false)
 |-- Sub-issue: string (nullable = false)
 |-- Consumer complaint narrative: string (nullable = true)
 |-- Company public response: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZIP code: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Consumer consent provided?: string (nullable = true)
 |-- Submitted via: string (nullable = true)
 |-- Date sent to company: string (nullable = true)
 |-- Company response to consumer: string (nullabl