### Consumer Finances Protection Bureau (CFPB) 
#### Consumer Complaint Database

This program gets the complaint database (full dataset) from the web
and performs some data cleaning and transformation as well as basic univariate data exploration in pyspark.  It then creates a dataframe and exports it into a CSV file that can be used for further analysis.  

This is more of an exercise with spark. The full complaints dataset is reasonably small (just over 500MB as of the end of April 2018) so it can be processed in a more traditional way. But this exercise shows the potential of parallel processing as well as challenges that come with it. 
 

This version: April 2018

In [1]:
import pandas as pd
import numpy as np
import time
from datetime import datetime
beg_time = time.clock()

In [2]:
import sys
#stdout = sys.stdout
#reload(sys)
#sys.setdefaultencoding('utf-8')
#sys.stdout = stdout

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StopWordsRemover

In [None]:
#spark.stop()

In [4]:
#https://rea.tech/how-we-optimize-apache-spark-apps/
confM = pyspark.SparkConf().setAll([('spark.executor.memory', '10g'), \
                                    ('spark.executor.cores', '5'), \
                                    ('spark.executor.instances', '25'), \
                                    ('spark.default.parallelism', '100'), \
                                    ('spark.yarn.executor.memoryOverhead', '1g'), \
                                    ('spark.submit.deployMode', 'cluster')])
                                    #('spark.cores.max', '3'), \
                                    #('spark.driver.memory','4g')])

In [5]:
spark = SparkSession.builder \
    .appName("CFPBETL") \
    .config(conf = confM) \
    .getOrCreate() 

In [6]:
spark.sparkContext.getConf().getAll()

[(u'spark.eventLog.enabled', u'true'),
 (u'spark.submit.deployMode', u'cluster'),
 (u'spark.driver.extraLibraryPath',
  u'/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native'),
 (u'spark.executor.cores', u'5'),
 (u'spark.driver.appUIAddress',
  u'http://ip-10-0-0-248.us-east-2.compute.internal:4040'),
 (u'spark.blacklist.decommissioning.timeout', u'1h'),
 (u'spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS', u'$(hostname -f)'),
 (u'spark.executor.instances', u'25'),
 (u'spark.executor.extraJavaOptions',
  u"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'"),
 (u'spark.yarn.executor.memoryOverhead', u'1g'),
 (u'spark.eventLog.dir', u'hdfs:///var/log/spark/apps'),
 (u'spark.sql.hive.metastore.sharedPrefixes',
  u'com.amazonaws.services.dynamodbv2'),
 (u'spark.sql.warehouse.dir', u'hdfs:///user/spark/warehouse'),
 (u'spark.serializer.o

In [7]:
# Create a pandas dataframe, then convert it into a spark dataframe
#url = "https://data.consumerfinance.gov/api/views/s6ew-h6mp/rows.csv?accessType=DOWNLOAD"
#Comp = pd.read_csv(url)#.sample(frac=0.01)
# Save original dataset into a CSV file so that we don't have to read from the web 
# each time we run the program
#Comp.to_csv("complaints.csv", index = False)
Comp = pd.read_csv("complaints.csv")#.sample(frac=0.20)
Comp.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1019719 entries, 0 to 1019718
Data columns (total 18 columns):
Date received                   1019719 non-null object
Product                         1019719 non-null object
Sub-product                     784549 non-null object
Issue                           1019719 non-null object
Sub-issue                       524369 non-null object
Consumer complaint narrative    275265 non-null object
Company public response         315727 non-null object
Company                         1019719 non-null object
State                           1007508 non-null object
ZIP code                        1003161 non-null object
Tags                            140966 non-null object
Consumer consent provided?      487479 non-null object
Submitted via                   1019719 non-null object
Date sent to company            1019719 non-null object
Company response to consumer    1019716 non-null object
Timely response?                1019719 non-null obje

In [8]:
# First, remove spaces and non-alphanumeric characters from column names
# so that it will be easy to use them in SparkSQL
Comp.columns=Comp.columns.str.replace('?','')
Comp.columns=Comp.columns.str.replace('-','')
Comp.columns=Comp.columns.str.replace(' ','')

display(Comp)

Unnamed: 0,Datereceived,Product,Subproduct,Issue,Subissue,Consumercomplaintnarrative,Companypublicresponse,Company,State,ZIPcode,Tags,Consumerconsentprovided,Submittedvia,Datesenttocompany,Companyresponsetoconsumer,Timelyresponse,Consumerdisputed,ComplaintID
0,03/12/2014,Mortgage,Other mortgage,"Loan modification,collection,foreclosure",,,,M&T BANK CORPORATION,MI,48382,,,Referral,03/17/2014,Closed with explanation,Yes,No,759217
1,10/01/2016,Credit reporting,,Incorrect information on credit report,Account status,I have outdated information on my credit repor...,Company has responded to the consumer and the ...,"TRANSUNION INTERMEDIATE HOLDINGS, INC.",AL,352XX,,Consent provided,Web,10/05/2016,Closed with explanation,Yes,No,2141773
2,10/17/2016,Consumer Loan,Vehicle loan,Managing the loan or lease,,I purchased a new car on XXXX XXXX. The car de...,,"CITIZENS FINANCIAL GROUP, INC.",PA,177XX,Older American,Consent provided,Web,10/20/2016,Closed with explanation,Yes,No,2163100
3,06/08/2014,Credit card,,Bankruptcy,,,,AMERICAN EXPRESS COMPANY,ID,83854,Older American,,Web,06/10/2014,Closed with explanation,Yes,Yes,885638
4,09/13/2014,Debt collection,Credit card,Communication tactics,Frequent or repeated calls,,,"CITIBANK, N.A.",VA,23233,,,Web,09/13/2014,Closed with explanation,Yes,Yes,1027760
5,11/13/2013,Mortgage,Conventional adjustable mortgage (ARM),"Loan servicing, payments, escrow account",,,,U.S. BANCORP,MN,48322,,,Phone,11/20/2013,Closed with monetary relief,Yes,No,596562
6,06/16/2015,Debt collection,Medical,Improper contact or sharing of info,Contacted employer after asked not to,,Company believes it acted appropriately as aut...,California Accounts Service,CA,92111,,Consent not provided,Web,06/19/2015,Closed with explanation,Yes,No,1422680
7,06/15/2015,Credit reporting,,Credit reporting company's investigation,Inadequate help over the phone,An account on my credit report has a mistaken ...,Company chooses not to provide a public response,Experian Information Solutions Inc.,VA,224XX,,Consent provided,Web,06/15/2015,Closed with explanation,Yes,No,1420702
8,11/13/2015,Mortgage,Other mortgage,"Loan modification,collection,foreclosure",,,Company believes it acted appropriately as aut...,"Aldridge Pite, LLP",CA,93101,,,Referral,12/10/2015,Closed with explanation,Yes,Yes,1654890
9,10/21/2014,Mortgage,Conventional fixed mortgage,"Loan modification,collection,foreclosure",,,,OCWEN LOAN SERVICING LLC,FL,32714,Older American,,Web,10/21/2014,Closed with explanation,Yes,No,1079567


$ $
#### Missing values
$ $

There are many missing values in the dataset. Convert those into a category by recoding them as xxxxx This makes working with them easier (GroupBy, etc), since treatment of missing values can vary.

This replacement is done on the pandas dataframe, before creating the spark dataframe.
$ $

In [9]:
# Creating a spark dataframe:
# Directly reading from the CSV with the CFPB URL didn't work, even while I tried many varions like
# df = (sql.read.format("csv").options(header="true").load(url))

# The code below helps with "merge" error that was given for direct spark DF creation
# This may have to do with missing values
for col in Comp.columns:
    if ((Comp[col].dtypes != np.int64) & (Comp[col].dtypes != np.float64)):
        Comp[col] = Comp[col].fillna('xxxxx')

df = spark.createDataFrame(Comp)

In [10]:
df.printSchema()

root
 |-- Datereceived: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Subproduct: string (nullable = true)
 |-- Issue: string (nullable = true)
 |-- Subissue: string (nullable = true)
 |-- Consumercomplaintnarrative: string (nullable = true)
 |-- Companypublicresponse: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZIPcode: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Consumerconsentprovided: string (nullable = true)
 |-- Submittedvia: string (nullable = true)
 |-- Datesenttocompany: string (nullable = true)
 |-- Companyresponsetoconsumer: string (nullable = true)
 |-- Timelyresponse: string (nullable = true)
 |-- Consumerdisputed: string (nullable = true)
 |-- ComplaintID: long (nullable = true)



In [11]:
df.count()

1019719

In [12]:
df.show(n=10)

+------------+----------------+--------------------+--------------------+--------------------+--------------------------+---------------------+--------------------+-----+-------+--------------+-----------------------+------------+-----------------+-------------------------+--------------+----------------+-----------+
|Datereceived|         Product|          Subproduct|               Issue|            Subissue|Consumercomplaintnarrative|Companypublicresponse|             Company|State|ZIPcode|          Tags|Consumerconsentprovided|Submittedvia|Datesenttocompany|Companyresponsetoconsumer|Timelyresponse|Consumerdisputed|ComplaintID|
+------------+----------------+--------------------+--------------------+--------------------+--------------------------+---------------------+--------------------+-----+-------+--------------+-----------------------+------------+-----------------+-------------------------+--------------+----------------+-----------+
|  03/12/2014|        Mortgage|      Other 

In [13]:
# Make sure there are no ComplaintID duplicates. Then remove this feature.
df = df.dropDuplicates(['ComplaintID'])
df = df.drop('ComplaintID')
#df.count()

$ $
#### Product and Sub-product
$ $

In [14]:
# SQL: Counts of Product + Subproduct sorted in a descending order of count
# groupBy also works similarly for these 
df.createOrReplaceTempView("Prod")

ProdDF = spark.sql("SELECT Product, Subproduct, count(*) as cnt \
                    FROM Prod \
                    GROUP BY Product, Subproduct \
                    ORDER BY cnt DESC")
#ProdDF.show()
#ProdDF.count()
ProdDF.coalesce(1).write.csv(path="Products.csv", mode="append", header = "true", sep='\t')

$ $

Spark: The product - sub-product table above is written into a tab-delimited file first. A new, combined product classification is created . Any subcategory that has more than 10,000 observations is kept as a separate product. Smaller sub-categories are folded into a generic "other" category.
  
Note: Spark creates a directory named Products.csv instead of a file under HDFS (/user/hadoop/Products.csv). Hadoop datanode tool (50070) has a "Browse the file system" tool that can be used to view/download the output. There will be a single CSV file in that directory that will need to be recoded. I have manually recoded Product + Subproduct combinations into a single, combined categorical variable using a spreadsheet. The code below tham imports the recoded file back into this environment.
 
$ $


In [15]:
# This code reads the updated product CSV file into a spark dataframe
Prod = spark.read.format("csv").option("header", "true").load("Products_recoded.csv")
#Prod.show()

In [16]:
# Merge the new (product+sub-product) variable with the main data frame
# SQL version 
df.createOrReplaceTempView("P1")
Prod.createOrReplaceTempView("P2")

df = spark.sql("SELECT a.*, b.Prod \
                    FROM P1 a  LEFT JOIN P2 b\
                    ON a.Product == b.Product AND a.Subproduct == b.Subproduct")
#df.show()


$ $
#### Recode Product and Sub-product
$ $


 
 
By combining Product and Sub-product, we get a new categorical feature with about 20 categories. All but four of these categories have more than 10,000 observations.

$ $


In [17]:
# Here is the new variable with counts
df.groupBy('Prod').count().orderBy('count', ascending=False).show()

+--------------------+------+
|                Prod| count|
+--------------------+------+
|    Credit reporting|248970|
|Debt collection -...|145241|
|    Mortgage - other|117762|
|Credit card or pr...|115419|
|Mortgage - conven...| 70616|
|Bank account or s...| 59045|
|Bank account or s...| 45744|
|Debt collection -...| 28700|
|Mortgage - FHA mo...| 28192|
|Mortgage - conven...| 25381|
|Student loan - no...| 22283|
|Debt collection -...| 21205|
|Consumer loan - v...| 17782|
|Student loan - fe...| 16556|
|Consumer loan - o...| 13825|
|Mortgage - home e...| 11625|
|     Money transfers| 11047|
|Payday loan, titl...|  9825|
|Other financial s...|  6476|
|Student loan - other|  3937|
+--------------------+------+
only showing top 20 rows



In [18]:
# Since we no longer need the two original features, we can delete them. 
df = df.drop('Product','Subproduct')
df.printSchema()

root
 |-- Datereceived: string (nullable = true)
 |-- Issue: string (nullable = true)
 |-- Subissue: string (nullable = true)
 |-- Consumercomplaintnarrative: string (nullable = true)
 |-- Companypublicresponse: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZIPcode: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Consumerconsentprovided: string (nullable = true)
 |-- Submittedvia: string (nullable = true)
 |-- Datesenttocompany: string (nullable = true)
 |-- Companyresponsetoconsumer: string (nullable = true)
 |-- Timelyresponse: string (nullable = true)
 |-- Consumerdisputed: string (nullable = true)
 |-- Prod: string (nullable = true)



$ $

#### Issue and Sub-issue

$ $

These two features are combined into one in a manner similar to Product and Sub-product. There are many more categories here, though.
$ $


In [19]:
df.createOrReplaceTempView("Iss")

IssDF = spark.sql("SELECT Issue, Subissue, count(*) as cnt \
                    FROM Iss \
                    GROUP BY Issue, Subissue \
                    ORDER BY cnt DESC")

In [20]:
IssDF.count()
IssDF.coalesce(1).write.csv(path="Issues.csv", mode="append", header = "true", sep='\t')

$ $

Spark: The product - sub-product table above is written into a tab-delimited file first. A new, combined product classification is created . Any subcategory that has more than 10,000 observations is kept as a separate product. Smaller sub-categories are folded into a generic "other" category.

$ $


In [21]:
# This code reads the updated Issues CSV file into a spark dataframe
Iss = spark.read.format("csv").option("header", "true").load("Issues_recoded.csv")

In [22]:
# Merge the new (product+sub-product) variable with the main data frame
df.createOrReplaceTempView("P1")
Iss.createOrReplaceTempView("P2")

df = spark.sql("SELECT a.*, b.Iss \
                    FROM P1 a  LEFT JOIN P2 b\
                    ON a.Issue == b.Issue AND a.Subissue == b.Subissue")

In [23]:
df.groupBy('Iss').count().orderBy('count', ascending=False).show()

+--------------------+------+
|                 Iss| count|
+--------------------+------+
|               Other|136567|
|Loan modification...|112314|
|Loan servicing, p...| 77337|
|Incorrect informa...| 59564|
|Account opening, ...| 37961|
|Incorrect informa...| 37057|
|Cont'd attempts c...| 36743|
|Incorrect informa...| 32385|
|Deposits and with...| 22851|
|Problem with a cr...| 22069|
|Disclosure verifi...| 21820|
|Improper use of m...| 21060|
|Attempts to colle...| 20338|
|Dealing with my l...| 18503|
|Communication tac...| 17623|
|Managing the loan...| 17324|
|Application, orig...| 17233|
|Cont'd attempts c...| 16632|
|Struggling to pay...| 15163|
|    Billing disputes| 15136|
+--------------------+------+
only showing top 20 rows



In [24]:
# Remove the original two variables since they are now recoded
df = df.drop('Issue','Subissue')
df.printSchema()

root
 |-- Datereceived: string (nullable = true)
 |-- Consumercomplaintnarrative: string (nullable = true)
 |-- Companypublicresponse: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZIPcode: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Consumerconsentprovided: string (nullable = true)
 |-- Submittedvia: string (nullable = true)
 |-- Datesenttocompany: string (nullable = true)
 |-- Companyresponsetoconsumer: string (nullable = true)
 |-- Timelyresponse: string (nullable = true)
 |-- Consumerdisputed: string (nullable = true)
 |-- Prod: string (nullable = true)
 |-- Iss: string (nullable = true)



### Consumer complaint narrative  
$ $  
This section largely draws from my previous work. It first cleans the narrative by stripping stop words, punctuation marks etc. Then, it counts the positive and negative words using the Loughran - McDonald finance sentiment dictionary. The following are recorded:    
  
* 'Clean' narrative: an array of non-stop words and its length (word count)
* Positive and negative word arrays and their lengths
* Percent sentiment: Difference in Positive and Negative word array lengths divided by the sum of those two numbers. By construction, this number will be between 100% and -100%.
 
$ $


In [25]:
positives = open('LoughranMcDonald_Positive.csv', "r").readlines()
positive = [pos.strip().lower().split(',')[0] for pos in positives]
negatives = open('LoughranMcDonald_Negative.csv', "r").readlines()
negative = [neg.strip().lower().split(',')[0] for neg in negatives]

In [26]:
# Transform the narrative: (1) lowercase, (2) remove punctuation, (3) split into words, 
# and (4) remove stopwords 
# Also, record the length of the narrative after removing stopwords

df = df.withColumn('Nr', lower(regexp_replace(df.Consumercomplaintnarrative,'[^\w\s]',' ')))
df = df.withColumn('Spl', split('Nr', "\s+"))

remover = StopWordsRemover(inputCol='Spl', outputCol='Narr')
remover.loadDefaultStopWords('english')
df = remover.transform(df)
df = df.withColumn('NarrLength', size('Narr'))

In [27]:
# Right now, the same function works on the same narrative twice
# one to find the positive words, and another to find the negative words
# There may be an easier way to do this
def inListPos(narrative):
    x = [ ]
    for word in narrative:
        if word in (positive):
            x.append(word)        
    return x
def inListNeg(narrative):
    x = [ ]
    for word in narrative:
        if word in (negative):
            x.append(word)
    return x
pUDF = udf(inListPos,ArrayType(StringType()))
nUDF = udf(inListNeg,ArrayType(StringType()))
df = df.withColumn('Pos',pUDF(df['Narr']))
df = df.withColumn('Neg',nUDF(df['Narr']))
df = df.withColumn('PosLength',size('Pos'))
df = df.withColumn('NegLength',size('Neg'))
df = df.withColumn('PSent',(df.PosLength -  df.NegLength) / (df.PosLength + df.NegLength))

In [28]:
# Here is how the narrative looks after all the transformations
df.filter(df.Consumercomplaintnarrative != 'xxxxx')\
  .select('Consumercomplaintnarrative','Nr','Spl',\
          'Narr', 'NarrLength','Pos','PosLength','Neg','NegLength', 'PSent').show(n=50)

+--------------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+--------------------+---------+-------------------+
|Consumercomplaintnarrative|                  Nr|                 Spl|                Narr|NarrLength|                 Pos|PosLength|                 Neg|NegLength|              PSent|
+--------------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+--------------------+---------+-------------------+
|      I am the Trustee ...|i am the trustee ...|[i, am, the, trus...|[trustee, sister,...|        98|          [superior]|        1|[questions, quest...|        5|-0.6666666666666666|
|      This dispute cent...|this dispute cent...|[this, dispute, c...|[dispute, centers...|       349|[outstanding, sat...|        8|[dispute, unlawfu...|       19|-0.4074074074074074|
|      Bank of America r...|bank of america r...|[bank, of, americ...|[bank

In [29]:
df = df.drop('Consumercomplaintnarrative','Nr','Spl')

### Company public response

$ $

This feature is largely unpopulated (73%) or gets the standard "no public response" from either the company (6%) or CFPB (14%). This field can be dropped.


In [30]:
df.createOrReplaceTempView("Resp")

RespDF = spark.sql("SELECT Companypublicresponse, count(*) as cnt \
                    FROM Resp \
                    GROUP BY Companypublicresponse \
                    ORDER BY cnt DESC")
RespDF.show()

+---------------------+------+
|Companypublicresponse|   cnt|
+---------------------+------+
|                xxxxx|703992|
| Company has respo...|196521|
| Company chooses n...| 52473|
| Company believes ...| 47846|
| Company believes ...|  4548|
| Company disputes ...|  4149|
| Company believes ...|  3345|
| Company believes ...|  3030|
| Company can't ver...|  1919|
| Company believes ...|  1843|
| Company believes ...|    53|
+---------------------+------+



In [31]:
df = df.drop('Companypublicresponse')

### Companies
$ $

50 companies account for 75% of the complaints even though there are 4300 companies in the dataset. Even among the top companies, the 15 most-complained about companies make up 60% of the complaints.

$ $

In [32]:
# Companies in descending order of complaint frequency 
# Obtain percent frequencies
df.createOrReplaceTempView('TView1')

DF1 = spark.sql("SELECT Company, count(*) as Cnt \
                 FROM TView1 \
                 GROUP BY Company \
                 ORDER BY Cnt DESC")

DF1.createOrReplaceTempView('TView2')

DF2 = spark.sql("SELECT Company, Cnt, \
                 Cnt/(SELECT SUM(Cnt) from TView2 Temp1) Perc from TView2")\
           .collect()
C = spark.createDataFrame(DF2)

In [33]:
# https://stackoverflow.com/questions/46574860/how-can-i-obtain-percentage-frequencies-in-pyspark
# Suggested solution
df.createOrReplaceTempView('Comp')

CompDF = spark.sql("SELECT Company,cnt/(SELECT SUM(cnt) from (SELECT Company, count(*) as cnt\
    FROM Comp GROUP BY Company ORDER BY cnt DESC) temp_tab) sum_freq from\
    (SELECT Company, count(*) as cnt FROM Comp GROUP BY Company ORDER BY cnt DESC)").collect()
C = spark.createDataFrame(CompDF)
C.show()

+--------------------+--------------------+
|             Company|            sum_freq|
+--------------------+--------------------+
|       EQUIFAX, INC.| 0.08164209944112054|
|BANK OF AMERICA, ...| 0.07278573803175188|
|Experian Informat...| 0.07079205153576623|
|TRANSUNION INTERM...| 0.06445010831415321|
|WELLS FARGO & COM...|0.060940317871884316|
|JPMORGAN CHASE & CO.|0.050150090368032765|
|      CITIBANK, N.A.|0.040793591175608185|
|CAPITAL ONE FINAN...| 0.02630332473946254|
|OCWEN LOAN SERVIC...|0.025761999138978482|
|Navient Solutions...| 0.02377419661691113|
| NATIONSTAR MORTGAGE|0.018155001524929906|
| SYNCHRONY FINANCIAL| 0.01675363507005361|
|        U.S. BANCORP|0.014442213982479487|
|Ditech Financial LLC|0.012666234521471111|
|AMERICAN EXPRESS ...| 0.01050975808041235|
|       PNC Bank N.A.|0.009887037507391743|
|ENCORE CAPITAL GR...|0.009397687009852714|
|       DISCOVER BANK|0.008154207188450936|
|HSBC NORTH AMERIC...|0.007659953379313321|
|TD BANK US HOLDIN...|0.00763739

### State and ZIP
$ $
Complaints distrubution is also skewed heavily towards states with larger population. State and ZIP code features are dropped from the analysis.
$ $


In [34]:
# States in descending order of complaint frequency 
# Obtain percent frequencies also

df.createOrReplaceTempView('TView1')

DF1 = spark.sql("SELECT State, count(*) as Cnt \
                 FROM TView1 \
                 GROUP BY State \
                 ORDER BY Cnt DESC")

DF1.createOrReplaceTempView('TView2')

DF2 = spark.sql("SELECT State, Cnt, \
                 Cnt/(SELECT SUM(Cnt) from TView2 Temp1) Perc from TView2")\
           .collect()
C = spark.createDataFrame(DF2)
C.show()

+-----+------+--------------------+
|State|   Cnt|                Perc|
+-----+------+--------------------+
|   CA|142992| 0.14022686642104346|
|   FL| 97460| 0.09557534967966666|
|   TX| 82779| 0.08117824616389417|
|   NY| 68499|  0.0671743882383284|
|   GA| 51345| 0.05035210680589457|
|   NJ| 39141| 0.03838410385606231|
|   IL| 38848|0.038096769796385085|
|   PA| 35327|0.034643857768659796|
|   VA| 31058| 0.03045741032578583|
|   OH| 30818| 0.03022205136905363|
|   MD| 30630|0.030037686852946742|
|   NC| 30492| 0.02990235545282573|
|   MI| 24778|0.024298850957959986|
|   AZ| 22279|0.021848175820985978|
|   WA| 20482|0.020085925632453646|
|   MA| 18774|0.018410954390376173|
|   CO| 17041| 0.01671146659030576|
|   TN| 16576|0.016255458611637127|
|   SC| 14316|0.014039161769075598|
|   MO| 13763|0.013496855506271825|
+-----+------+--------------------+
only showing top 20 rows



In [35]:
# Drop Zip code from the dataset
df = df.drop('ZIPcode')

### Tags

In [36]:
# Tags 
# Obtain percent frequencies also
# Most of the tags are blank
df.createOrReplaceTempView('TView1')

DF1 = spark.sql("SELECT Tags, count(*) as Cnt \
                 FROM TView1 \
                 GROUP BY Tags \
                 ORDER BY Cnt DESC")

DF1.createOrReplaceTempView('TView2')

DF2 = spark.sql("SELECT Tags, Cnt, \
                 Cnt/(SELECT SUM(Cnt) from TView2 Temp1) Perc from TView2")\
           .collect()
C = spark.createDataFrame(DF2)
C.show()

+--------------------+------+--------------------+
|                Tags|   Cnt|                Perc|
+--------------------+------+--------------------+
|               xxxxx|878753|  0.8617599554387042|
|      Older American| 68592| 0.06726558983406213|
|       Servicemember| 61220| 0.06003614721310479|
|Older American, S...| 11154|0.010938307514128892|
+--------------------+------+--------------------+



In [37]:
df = df.drop('Tags')

### Consumer consent provided?

In [38]:
# Counts and percent frequencies also
df.createOrReplaceTempView('TView1')

DF1 = spark.sql("SELECT Consumerconsentprovided, count(*) as Cnt \
                 FROM TView1 \
                 GROUP BY Consumerconsentprovided \
                 ORDER BY Cnt DESC")
DF1.createOrReplaceTempView('TView2')

DF2 = spark.sql("SELECT Consumerconsentprovided, Cnt, \
                 Cnt/(SELECT SUM(Cnt) from TView2 Temp1) Perc from TView2")\
           .collect()
C = spark.createDataFrame(DF2)
C.show()

+-----------------------+------+-------------------+
|Consumerconsentprovided|   Cnt|               Perc|
+-----------------------+------+-------------------+
|                  xxxxx|532240| 0.5219477130464373|
|       Consent provided|275265| 0.2699420134370351|
|   Consent not provided|198728| 0.1948850614728175|
|                  Other| 12506|0.01226416297038694|
|      Consent withdrawn|   980|9.61049073323141E-4|
+-----------------------+------+-------------------+



In [39]:
# For the majority of cases, whether consent is provided is unknown
df = df.na.replace(['xxxxx','Consent withdrawn'], ['Other','Other'], 'Consumerconsentprovided')

### Submitted via

In [40]:
# Counts and percent frequencies
df.createOrReplaceTempView('TView1')

DF1 = spark.sql("SELECT Submittedvia, count(*) as Cnt \
                 FROM TView1 \
                 GROUP BY Submittedvia \
                 ORDER BY Cnt DESC")
DF1.createOrReplaceTempView('TView2')

DF2 = spark.sql("SELECT Submittedvia, Cnt, \
                 Cnt/(SELECT SUM(Cnt) from TView2 Temp1) Perc from TView2")\
           .collect()
C = spark.createDataFrame(DF2)
C.show()

+------------+------+--------------------+
|Submittedvia|   Cnt|                Perc|
+------------+------+--------------------+
|         Web|730749|  0.7166180094712367|
|    Referral|151027|  0.1481064881599735|
|       Phone| 63295| 0.06207102152651858|
| Postal mail| 59203|0.058058151314234606|
|         Fax| 15082|0.014790349105979195|
|       Email|   363|3.559804220574491...|
+------------+------+--------------------+



### Company response to consumer

In [41]:
# Counts and percent frequencies
df.createOrReplaceTempView('TView1')

DF1 = spark.sql("SELECT Companyresponsetoconsumer, count(*) as Cnt \
                 FROM TView1 \
                 GROUP BY Companyresponsetoconsumer \
                 ORDER BY Cnt DESC")

DF1.createOrReplaceTempView('TView2')

DF2 = spark.sql("SELECT Companyresponsetoconsumer, Cnt, \
                 Cnt/(SELECT SUM(Cnt) from TView2 Temp1) Perc from TView2")\
           .collect()
C = spark.createDataFrame(DF2)
C.show()

+-------------------------+------+--------------------+
|Companyresponsetoconsumer|   Cnt|                Perc|
+-------------------------+------+--------------------+
|     Closed with expla...|782205|  0.7670789697946199|
|     Closed with non-m...|122951| 0.12057341287158521|
|     Closed with monet...| 62121| 0.06091972396317025|
|     Closed without re...| 17868|0.017522474328712127|
|                   Closed| 17611| 0.01727044411254473|
|              In progress|  6672|0.006542978997155099|
|       Closed with relief|  5304|0.005201432943781571|
|        Untimely response|  4984|0.004887621001471974|
|                    xxxxx|     3|2.941986959152472...|
+-------------------------+------+--------------------+



### Timely response?

In [42]:
# Counts and percent frequencies
df.createOrReplaceTempView('TView1')

DF1 = spark.sql("SELECT Timelyresponse, count(*) as Cnt \
                 FROM TView1 \
                 GROUP BY Timelyresponse \
                 ORDER BY Cnt DESC")
#DF1.show()
DF1.createOrReplaceTempView('TView2')

DF2 = spark.sql("SELECT Timelyresponse, Cnt, \
                 Cnt/(SELECT SUM(Cnt) from TView2 Temp1) Perc from TView2")\
           .collect()
C = spark.createDataFrame(DF2)
C.show()

+--------------+------+-------------------+
|Timelyresponse|   Cnt|               Perc|
+--------------+------+-------------------+
|           Yes|992040| 0.9728562476525395|
|            No| 27679|0.02714375234746043|
+--------------+------+-------------------+



### Consumer disputed?

In [43]:
# Counts and percent frequencies
df.createOrReplaceTempView('TView1')

DF1 = spark.sql("SELECT Consumerdisputed, count(*) as Cnt \
                 FROM TView1 \
                 GROUP BY Consumerdisputed \
                 ORDER BY Cnt DESC")
#DF1.show()
DF1.createOrReplaceTempView('TView2')

DF2 = spark.sql("SELECT Consumerdisputed, Cnt, \
                 Cnt/(SELECT SUM(Cnt) from TView2 Temp1) Perc from TView2")\
           .collect()
C = spark.createDataFrame(DF2)
C.show()

+----------------+------+-------------------+
|Consumerdisputed|   Cnt|               Perc|
+----------------+------+-------------------+
|              No|620184| 0.6081910800916723|
|           xxxxx|251157|0.24630020623328583|
|             Yes|148378|0.14550871367504184|
+----------------+------+-------------------+



In [44]:
# For about 7.5% of observations, the answer is blank. recode as Other
df = df.na.replace(['xxxxx'], ['Other'], 'Consumerdisputed')
#df.show()

$ $
#### Dates: Date received and Date sent to company
$ $

In [45]:
from pyspark.sql.functions import col, udf

In [46]:
# https://stackoverflow.com/questions/38080748/convert-pyspark-string-to-date-format
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

In [47]:
df = df.withColumn('DateRec', func(col('Datereceived')))
df = df.withColumn('DateSent', func(col('Datesenttocompany')))

In [48]:
# The original, string-typed dates are no longer necessary
df = df.drop('Datereceived','Datesenttocompany')

In [49]:
df = df.withColumn('DateDiff', datediff(df.DateSent, df.DateRec))

In [50]:
df = df.withColumn('Yr', year(df.DateRec))
df = df.withColumn('Mon', month(df.DateRec))

In [51]:
# Not surprisingly, how a complaint was submitted  
# has an effect on how long it takes CFPB to submit it
# group by column 'A' and 'B' then performing some function here
group_df = df.groupby(['Submittedvia'])
df_grouped = group_df.agg(avg("DateDiff").alias("avgC"), 
                          count("DateDiff").alias("countC"))
df_grouped.show() # print out the spark dataframe

+------------+-----------------+------+
|Submittedvia|             avgC|countC|
+------------+-----------------+------+
|       Phone|4.503625878821392| 63295|
|         Fax|5.519957565309641| 15082|
|       Email|7.900826446280992|   363|
|    Referral|5.894674462182259|151027|
| Postal mail|7.814891137273449| 59203|
|         Web|2.709548695927056|730749|
+------------+-----------------+------+



In [52]:
df.groupby(['DateDiff']) \
  .agg(avg("DateDiff").alias("avgC"), count("DateDiff").alias("countC")) \
  .orderBy("DateDiff") \
  .show()

+--------+----+------+
|DateDiff|avgC|countC|
+--------+----+------+
|      -1|-1.0|  7052|
|       0| 0.0|578081|
|       1| 1.0|100349|
|       2| 2.0| 62993|
|       3| 3.0| 50051|
|       4| 4.0| 46486|
|       5| 5.0| 41398|
|       6| 6.0| 30536|
|       7| 7.0| 19561|
|       8| 8.0|  8757|
|       9| 9.0|  5187|
|      10|10.0|  4415|
|      11|11.0|  3761|
|      12|12.0|  3669|
|      13|13.0|  3617|
|      14|14.0|  3533|
|      15|15.0|  3077|
|      16|16.0|  2238|
|      17|17.0|  2189|
|      18|18.0|  1971|
+--------+----+------+
only showing top 20 rows



In [53]:
# Fold the extreme values (<0 and >8 days) 
df = df.withColumn('DtDiff', when(df.DateDiff > 8, 8).otherwise(when(df.DateDiff < 0, 0).otherwise(df.DateDiff)))

In [54]:
df = df.drop('DateDiff')
df.groupby(['DtDiff']) \
  .agg(count("DtDiff").alias("countC")) \
  .orderBy("DtDiff") \
  .show()

+------+------+
|DtDiff|countC|
+------+------+
|     0|585133|
|     1|100349|
|     2| 62993|
|     3| 50051|
|     4| 46486|
|     5| 41398|
|     6| 30536|
|     7| 19561|
|     8| 83212|
+------+------+



In [55]:
df.groupby(['Yr']) \
  .agg(count("Yr").alias("countC")) \
  .orderBy("Yr") \
  .show()

+----+------+
|  Yr|countC|
+----+------+
|2011|  2536|
|2012| 72373|
|2013|108218|
|2014|153053|
|2015|168515|
|2016|191491|
|2017|243000|
|2018| 80533|
+----+------+



In [56]:
df.groupby(['Mon']) \
  .agg(count("Mon").alias("countC")) \
  .orderBy("Mon") \
  .show()

+---+------+
|Mon|countC|
+---+------+
|  1| 96692|
|  2| 91758|
|  3|103344|
|  4| 87603|
|  5| 76469|
|  6| 77550|
|  7| 81845|
|  8| 84503|
|  9| 86875|
| 10| 82065|
| 11| 73819|
| 12| 77196|
+---+------+



In [57]:
# CSV doesn't support arrays
df = df.withColumn('Narrative', concat_ws(" ", 'Narr'))
df = df.withColumn('PosWords', concat_ws(" ", 'Pos'))
df = df.withColumn('NegWords', concat_ws(" ", 'Neg'))

In [58]:
df = df.drop('Narr','Pos','Neg')

In [59]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Consumerconsentprovided: string (nullable = true)
 |-- Submittedvia: string (nullable = true)
 |-- Companyresponsetoconsumer: string (nullable = true)
 |-- Timelyresponse: string (nullable = true)
 |-- Consumerdisputed: string (nullable = true)
 |-- Prod: string (nullable = true)
 |-- Iss: string (nullable = true)
 |-- NarrLength: integer (nullable = false)
 |-- PosLength: integer (nullable = false)
 |-- NegLength: integer (nullable = false)
 |-- PSent: double (nullable = true)
 |-- DateRec: date (nullable = true)
 |-- DateSent: date (nullable = true)
 |-- Yr: integer (nullable = true)
 |-- Mon: integer (nullable = true)
 |-- DtDiff: integer (nullable = true)
 |-- Narrative: string (nullable = false)
 |-- PosWords: string (nullable = false)
 |-- NegWords: string (nullable = false)



In [60]:
# A final look at the transformed data frame
df.show()

+--------------------+-----+-----------------------+------------+-------------------------+--------------+----------------+--------------------+--------------------+----------+---------+---------+-----+----------+----------+----+---+------+---------+--------+--------+
|             Company|State|Consumerconsentprovided|Submittedvia|Companyresponsetoconsumer|Timelyresponse|Consumerdisputed|                Prod|                 Iss|NarrLength|PosLength|NegLength|PSent|   DateRec|  DateSent|  Yr|Mon|DtDiff|Narrative|PosWords|NegWords|
+--------------------+-----+-----------------------+------------+-------------------------+--------------+----------------+--------------------+--------------------+----------+---------+---------+-----+----------+----------+----+---+------+---------+--------+--------+
|BANK OF AMERICA, ...|   OR|                  Other|         Web|     Closed with non-m...|           Yes|              No|Mortgage - conven...|Loan servicing, p...|         1|        0|       

In [61]:
df.write.option("header", "true").csv("compFinal")

In [62]:
# This will consolidate all output files into a single CSV file
# The merge will result in duplicate header rows
# The resulting CSV file will have to be filtered
!hadoop fs -getmerge -nl compFinal ComplaintsFinal.csv

In [63]:
print time.clock() - beg_time, "seconds"

173.13 seconds
