In [1]:
from pyspark.sql import functions as F

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1581621892773_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
input_bucket = 's3://joonbigdataanalytics'
complaint_input_path = '/Data/Customer_Complaints.json'

customer_complaints = spark.read.json(input_bucket + complaint_input_path)

# Show some basic information aggregated across the complaints dataset to validate 
# the data has loaded correctly 
# First: Show date ranges
customer_complaints.agg(F.min("Date received"), F.max("Date received")).show()
# Next: Show product types
customer_complaints.select("Product").distinct().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------------------+
|min(Date received)|max(Date received)|
+------------------+------------------+
|        01/01/2012|        12/31/2018|
+------------------+------------------+

+--------------------+
|             Product|
+--------------------+
|     Debt collection|
|    Virtual currency|
|         Payday loan|
|     Money transfers|
|Money transfer, v...|
|Checking or savin...|
|            Mortgage|
|        Prepaid card|
|Credit card or pr...|
|    Credit reporting|
|       Consumer Loan|
|Credit reporting,...|
|         Credit card|
|Bank account or s...|
|Vehicle loan or l...|
|Other financial s...|
|        Student loan|
|Payday loan, titl...|
+--------------------+

In [4]:
# Load the zip code geo data
zip_geo_input_path = '/Data/us-zip-code-latitude-and-longitude.csv'
zip_geo = spark.read.csv(input_bucket + zip_geo_input_path, header = True, multiLine=True, inferSchema=True, sep=";")

# display some representative data to verify it loaded correctly
zip_geo.agg(F.min("Timezone"), F.max("Timezone")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-------------+
|min(Timezone)|max(Timezone)|
+-------------+-------------+
|          -10|           -4|
+-------------+-------------+

In [5]:
# Load the tax data by zip code
tax_data_input_path = '/Data/15zpallagi.csv'
tax_data = spark.read.csv(input_bucket + tax_data_input_path, header = True, multiLine=True, inferSchema=True)

# display some representative data to verify it loaded correctly
tax_data.agg(F.min("N1"), F.max("N1")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+
|min(N1)|  max(N1)|
+-------+---------+
|    0.0|6231480.0|
+-------+---------+

In [6]:
# Feature Engineering: Reduce the 18 possible product categories down 
# to a more manageable 7 different categories of product
# Build up the map of translations
mapping = {
    "Debt collection": "DebtCollection",
    "Virtual currency": "NonBank",
    "Payday loan": "Loans",
    "Money transfers": "Transfers",
    "Money transfer, virtual currency, or money service": "Transfers",
    "Checking or savings account": "BankAccount",
    "Mortgage": "BankAccount",
    "Prepaid card": "CreditCard",
    "Credit card or prepaid card": "CreditCard",
    "Credit reporting": "CreditService",
    "Consumer Loan": "Loans",
    "Credit reporting, credit repair services, or other personal consumer reports": "CreditService",
    "Credit card": "CreditCard",
    "Bank account or service": "BankAccount",
    "Vehicle loan or lease": "Loans",
    "Other financial service": "NonBank",
    "Student loan": "Loans",
    "Payday loan, title loan, or personal loan": "Loans"
}
# Select out the required information
complaint_subset = customer_complaints.select(["Company", "Complaint ID", \
                                              "Date received", "Issue", "Product", \
                                              "Sub-issue", "Sub-product", "ZIP code"])
# Remap the product values to the reduced set
complaint_count_src = complaint_subset.replace(to_replace=mapping, subset=['Product'])
# Then group by the zip code (which will roll up the complaints per product per zip), 
# and at the same time pivot the data so that 7 possible rows of data per ZIP become 
# one row with 7 columns of summed values (and null where that category didn't appear in the zip)
zipcode_complaint_count = complaint_count_src.groupBy('ZIP code').pivot('Product').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Data in the tax file is banded by adjusted gross income brackets. 
# Roll the individual banded numbers of returns and total income into a sum per zip
rollup_tax = tax_data.select(F.col("zipcode"), F.col("N1").alias("NumReturns"), \
                             F.col("A02650").alias("TotalIncome")).groupBy('zipcode').sum()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Join zip code geo data into the complaint count by zip dataset
complaints = zipcode_complaint_count.join(zip_geo.select(["Zip","City","State", \
                                                          "Latitude","Longitude"]), \
                                          F.col("ZIP code") == F.col("Zip"), 'inner')
# Add the tax data to the complaint counts
complaints = complaints.join(rollup_tax.select(F.col("zipcode"), F.col("sum(NumReturns)"). \
                                               alias("NumReturns"), F.col("sum(TotalIncome)").
                                               alias("TotalIncome")), F.col("ZIP code") == F.col("zipcode"),\
                             'inner')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
save_name = "/Data/ComplaintStats"
# combine the complaints dataset into a single partition, and save it to the 
# ComplaintStats folder in the S3 bucket where we are storing complaints data
complaints.repartition(1).write.json(input_bucket + save_name)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…