# TF-IDF with PySpark: Investigating the consumer financial complaint database

## John Snow Spark NLP package
See [this Quick Start Guide](https://nlp.johnsnowlabs.com/docs/en/quickstart)

and/or [this quick start guide](https://github.com/JohnSnowLabs/spark-nlp)

I used aspects of each, personally. Some highlights:

If you haven't already installed PySpark (note: PySpark version 2.4.4 is the only supported version):
$ conda install pyspark==2.4.4
$ conda install -c johnsnowlabs spark-nlp

If you already have PySpark, make sure to install spark-nlp in the same channel as PySpark. In my case PySpark is in my conda-forge channel, so I used:
$ conda install -c johnsnowlabs spark-nlp --channel conda-forge

I already had PySpark installed and set up for use with jupyter notebooks, but if you don't, you may need to set some additional environment variables in the terminal (as mentioned in the second quick start guide, but not the first, so I'm not positive):

export SPARK_HOME=/path/to/your/spark/folder

export PYSPARK_PYTHON=python3

export PYSPARK_DRIVER_PYTHON=jupyter

export PYSPARK_DRIVER_PYTHON_OPTS=notebook

In [None]:
# if you don't need to access the SparkSession:
# import sparknlp
# sparknlp.start()

since i need the spark session to load the data from my parquet file, I'll start a "custom" SparkSession

In [1]:
from pyspark.sql import SparkSession

# start spark session configured for spark nlp
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Spark NLP') \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.3.5") \
    .getOrCreate()

# set to pandas-like output
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [2]:
# get the list of stopwords from nltk
from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')
eng_stopwords.append('xxxx')

### Set up pipeline

In [5]:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer, 
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline

Start with the documentAssembler, then see the annotator [docs](https://nlp.johnsnowlabs.com/docs/en/annotators) to for the available annotators. Convert back to human-readable form at the end with a Finnisher.

In [6]:
documentAssembler = DocumentAssembler() \
    .setInputCol('consumer_complaint_narrative') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
    .setInputCols(['token']) \
    .setOutputCol('normalized') \
    .setLowercase(True)

# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(['normalized']) \
    .setOutputCol('lemma') \

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(['lemma']) \
    .setOutputCol('clean_lemma') \
    .setCaseSensitive(False) \
    .setStopWords(eng_stopwords)

# finisher converts tokens to human-readable output
finisher = Finisher() \
    .setInputCols(['clean_lemma']) \
    .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


#### Defining the pipeline

In [7]:
pipeline = Pipeline() \
    .setStages([
        documentAssembler,
        tokenizer,
        normalizer,
        lemmatizer,
        stopwords_cleaner,
        finisher
    ])

## Import data and apply pipeline

Note: info on how to get this parquet file here:

data set: https://catalog.data.gov/dataset/consumer-complaint-database

my transformation from csv to parquet: https://github.com/allisonhonold/spark-blog/blob/master/pyspark_blog.ipynb

In [8]:
# import data
df = spark.read.load('../data/consumer_complaints.parquet',
                     inferSchema="true", header="true")

In [9]:
# select equifax data as test
data = df.filter((df['company'] == 'EQUIFAX, INC.') 
                & (df['consumer_complaint_narrative'].isNull() == False))

In [10]:
# transform text with the pipeline
equifax = pipeline.fit(data).transform(data)

In [11]:
equifax.columns

['date_received',
 'product',
 'sub-product',
 'issue',
 'sub-issue',
 'consumer_complaint_narrative',
 'company_public_response',
 'company',
 'state',
 'zip_code',
 'tags',
 'consumer_consent_provided?',
 'submitted_via',
 'date_sent_to_company',
 'company_response_to_consumer',
 'timely_response?',
 'consumer_disputed?',
 'complaint_id',
 'document',
 'token',
 'normalized',
 'lemma',
 'clean_lemma',
 'finished_clean_lemma']

In [13]:
equifax.select('finished_clean_lemma')

finished_clean_lemma
"[address, never, ..."
"[pay, satisfied, ..."
"[send, equifax, m..."
"[since, fill, ide..."
"[massive, equifax..."
"[accident, multip..."
"[keep, breach, se..."
"[numerous, accoun..."
"[ks, dispute, col..."
"[number, inquries..."


In [14]:
# expand the "finished_clean_lemma" column so that the words are not in a list
from pyspark.sql.functions import explode, col

equifax_words = equifax.withColumn("exploded_text", explode(col("finished_clean_lemma")))

In [15]:
equifax_words.columns

['date_received',
 'product',
 'sub-product',
 'issue',
 'sub-issue',
 'consumer_complaint_narrative',
 'company_public_response',
 'company',
 'state',
 'zip_code',
 'tags',
 'consumer_consent_provided?',
 'submitted_via',
 'date_sent_to_company',
 'company_response_to_consumer',
 'timely_response?',
 'consumer_disputed?',
 'complaint_id',
 'document',
 'token',
 'normalized',
 'lemma',
 'clean_lemma',
 'finished_clean_lemma',
 'exploded_text']

In [16]:
counts = equifax_words.groupby('exploded_text').count()

In [17]:
counts_pd = counts.toPandas()

In [18]:
counts_pd

Unnamed: 0,exploded_text,count
0,online,4300
1,still,10241
2,hope,811
3,tarnish,27
4,transaction,2362
...,...,...
28068,ftcgovequifax,1
28069,msr,1
28070,relegate,2
28071,celebration,1


In [19]:
counts_pd.shape

(28073, 2)

In [20]:
{counts_pd.loc[i, 'exploded_text']: counts_pd.loc[i, 'count'] for i in range(counts_pd.shape[0])}

{'online': 4300,
 'still': 10241,
 'hope': 811,
 'tarnish': 27,
 'transaction': 2362,
 'enablement': 77,
 'randomly': 61,
 'requirement': 647,
 'input': 119,
 'hud': 27,
 'degrade': 76,
 'recognize': 726,
 'elevate': 12,
 'backdate': 31,
 'priority': 61,
 'everyday': 59,
 'medicare': 23,
 'smartconnect': 1,
 'interaction': 27,
 'travel': 92,
 'implore': 16,
 'dbt': 1,
 'uptodate': 17,
 'responed': 7,
 'trail': 148,
 'electrical': 5,
 'deleterious': 7,
 'dnt': 1,
 'thwart': 7,
 'athree': 1,
 'alady': 1,
 'premeditated': 2,
 'likelihood': 16,
 'treid': 1,
 'barrier': 44,
 'ransom': 5,
 'countermeasure': 1,
 'art': 13,
 'stateowned': 3,
 'confidentiality': 13,
 'xxxxb': 1,
 'xxxxxxxxwith': 7,
 'persist': 43,
 'indigenous': 2,
 'resolvednothing': 1,
 'jewelry': 17,
 'socialsecurity': 4,
 'imitation': 2,
 'historys': 2,
 'indicator': 23,
 'isp': 2,
 'lemonxxxx': 1,
 'understad': 1,
 'beem': 1,
 'unmodified': 2,
 'currentcreditcardopenequifaxxxxx': 1,
 'xxxxxxxxwei': 1,
 'yearsif': 4,
 'remo

In [21]:
companies = ['EQUIFAX, INC.',
 'Experian Information Solutions Inc.',
 'TRANSUNION INTERMEDIATE HOLDINGS, INC.',
 'BANK OF AMERICA, NATIONAL ASSOCIATION',
 'WELLS FARGO & COMPANY',
 'JPMORGAN CHASE & CO.',
 'CITIBANK, N.A.',
 'CAPITAL ONE FINANCIAL CORPORATION',
 'Navient Solutions, LLC.',
 'Ocwen Financial Corporation',
 'SYNCHRONY FINANCIAL',
 'NATIONSTAR MORTGAGE',
 'U.S. BANCORP',
 'AMERICAN EXPRESS COMPANY',
 'Ditech Financial LLC',
 'PNC Bank N.A.',
 'ENCORE CAPITAL GROUP INC.',
 'PORTFOLIO RECOVERY ASSOCIATES INC',
 'DISCOVER BANK',
 'TD BANK US HOLDING COMPANY']

In [22]:
from pyspark.sql.functions import explode, col

# initialize {company: {word counts}} dictionary
company_complaint_word_counts_dict = {company: {} for company in companies}

for company in companies:
    print(company)
    # get complaint narratives
    company_df = df.filter((df['company'] == company) 
                    & (df['consumer_complaint_narrative'].isNull() == False))
    data = company_df.select('consumer_complaint_narrative')
    
    # process narratives into counts dictionary
    clean_data = pipeline.fit(data).transform(data)
    clean_data_exploded = clean_data.withColumn("exploded_text", explode(col("finished_clean_lemma")))
    counts = clean_data_exploded.groupby('exploded_text').count().toPandas()
    counts_dict = {counts.loc[i, 'exploded_text']: counts.loc[i, 'count'] for i in range(counts.shape[0])}
    
    # add counts to dictionary
    company_complaint_word_counts_dict[company] = counts_dict

EQUIFAX, INC.
Experian Information Solutions Inc.
TRANSUNION INTERMEDIATE HOLDINGS, INC.
BANK OF AMERICA, NATIONAL ASSOCIATION
WELLS FARGO & COMPANY
JPMORGAN CHASE & CO.
CITIBANK, N.A.
CAPITAL ONE FINANCIAL CORPORATION
Navient Solutions, LLC.
Ocwen Financial Corporation
SYNCHRONY FINANCIAL
NATIONSTAR MORTGAGE
U.S. BANCORP
AMERICAN EXPRESS COMPANY
Ditech Financial LLC
PNC Bank N.A.
ENCORE CAPITAL GROUP INC.
PORTFOLIO RECOVERY ASSOCIATES INC
DISCOVER BANK
TD BANK US HOLDING COMPANY


## Tf-idf

In [23]:
def term_frequency(BoW_dict):
    tot_words = sum(BoW_dict.values())
    freq_dict = {word: BoW_dict[word]/tot_words for word in BoW_dict.keys()}
    return freq_dict

In [24]:
from math import log

def inverse_document_frequency(list_of_dicts):
    tot_docs = len(list_of_dicts)
    words = set([w for w_dict in list_of_dicts for w in w_dict.keys()])
    idf_dict = {word: log(float(tot_docs)/(1.0+ sum([1 for w_dict in list_of_dicts if word in w_dict.keys()]))) for word in words}
    return idf_dict

In [25]:
def tf_idf(list_of_dicts):
    words = set([w for w_dict in list_of_dicts for w in w_dict.keys()])
    tf_idf_dicts = []
    idfs = inverse_document_frequency(list_of_dicts)
    for w_dict in list_of_dicts:
        w_dict.update({word: 0 for word in words if word not in w_dict.keys()})
        tf = term_frequency(w_dict)
        tf_idf_dicts.append({word: tf[word]*idfs[word] for word in words})
    return tf_idf_dicts

In [27]:
list_of_word_dicts = [company_complaint_word_counts_dict[company] for company in companies]
tf_idf_by_company_list = tf_idf(list_of_word_dicts)
tf_idf_by_company_dict = {c: tf_dict for c, tf_dict in zip(companies, tf_idf_by_company_list)}

## what's unique about each of our top companies (in terms of most compaints)?

In [28]:
import heapq

In [40]:
from nltk.corpus import words

eng_words = words.words()

for company in companies[0:10]:
    print ("\n", company)
    tf_idf_dict = tf_idf_by_company_dict[company]
    t100 = heapq.nlargest(100, tf_idf_dict, key=tf_idf_dict.get)
    t100 = {word: tf_idf_dict[word] for word in t100}
    t100 = [(k, v) for k, v in zip(t100.keys(), t100.values())]
    t100 = sorted(t100, key=lambda x: x[1], reverse=True)
    
    counter = 0
    for word, tfidf in t100:
        if counter < 10:
            if word in eng_words:
                counter += 1
                print (word, tfidf)


 EQUIFAX, INC.
reseller 0.0002688113448994784
tobe 0.00011921055656363865
accuser 6.927943339562246e-05
reinsertion 6.873224017786062e-05
certifiably 6.287826619016647e-05
certifiable 5.514274765382813e-05
runner 4.9931123168016185e-05
compliantly 4.7578859720163194e-05
counteraction 4.0577767192584324e-05
rejectable 3.6200064296811736e-05

 Experian Information Solutions Inc.
reseller 0.00027501965154673653
tobe 0.00014687622941775372
accuser 8.571715639653474e-05
certifiably 7.710990986287158e-05
certifiable 6.866771862714405e-05
runner 6.281202094071141e-05
reinsertion 5.664692062940039e-05
compliantly 5.587557352215855e-05
counteraction 4.989434077751539e-05
rejectable 4.4865729243365284e-05

 TRANSUNION INTERMEDIATE HOLDINGS, INC.
reseller 0.0003042826362853188
tobe 0.00018786806000262364
tu 0.0001447630541930031
accuser 0.0001098803905320385
certifiably 9.867512105377535e-05
certifiable 8.755074575739848e-05
compliantly 7.476791926979552e-05
reinsertion 7.393924034451927e-05
cou

In [44]:
for company in companies:
    tf_idf_dict = tf_idf_by_company_dict[company]
    n_words = len([v for v in tf_idf_dict.values() if v > 0])
    print (f"{company}'s # of words: {n_words}")

EQUIFAX, INC.'s # of words: 24241
Experian Information Solutions Inc.'s # of words: 22881
TRANSUNION INTERMEDIATE HOLDINGS, INC.'s # of words: 21200
BANK OF AMERICA, NATIONAL ASSOCIATION's # of words: 19820
WELLS FARGO & COMPANY's # of words: 21038
JPMORGAN CHASE & CO.'s # of words: 19433
CITIBANK, N.A.'s # of words: 17386
CAPITAL ONE FINANCIAL CORPORATION's # of words: 13188
Navient Solutions, LLC.'s # of words: 13078
Ocwen Financial Corporation's # of words: 10713
SYNCHRONY FINANCIAL's # of words: 9969
NATIONSTAR MORTGAGE's # of words: 9990
U.S. BANCORP's # of words: 8568
AMERICAN EXPRESS COMPANY's # of words: 8114
Ditech Financial LLC's # of words: 6520
PNC Bank N.A.'s # of words: 5405
ENCORE CAPITAL GROUP INC.'s # of words: 3876
PORTFOLIO RECOVERY ASSOCIATES INC's # of words: 3699
DISCOVER BANK's # of words: 6392
TD BANK US HOLDING COMPANY's # of words: 5094
