# EMR Notebook Sample

This is notebook shows how EMR notebooks can make it easy to do development and analytics with PySpark.

The dataset is an export of the Consumer Financial Protection Bureau database
See https://www.consumerfinance.gov/ and specifically http://files.consumerfinance.gov/ccdb/complaints.csv.zip

After the session, if you'd like to use this data please get your OWN copy by unzipping the original into S3 bucket in your own AWS account.

Setup Notes
- EMR 5.32+ requires Spark, Livy and JupyterEnterpriseGateway packages
- To use this notebook, your cluster should have been launched with nltk-bootstrap.sh script
- You cluster needs EBS volume increase. Suggest 15GB


In [None]:
input_data_in_s3 = "s3://heiwad-transfer/data-sets/cfpb-complaints.csv"

In [None]:
# Direct load data from S3
# https://stackoverflow.com/questions/40413526/reading-csv-files-with-quoted-fields-containing-embedded-commas
df = spark.read.load(input_data_in_s3, # please get your own copy after the session
                     format="csv", sep=",", inferSchema="true", header="true", quote = '"', escape='"')

df.printSchema()

In [None]:
#Try simple aggregation to find top companies represented in the data set

res = df.groupby("Company").count().orderBy('count',ascending=False)
res.show(20)

In [None]:
# get just the complaints column and simplify the column name
complaints = df.select("Consumer complaint narrative").withColumnRenamed("Consumer complaint narrative","text")

# Let's sample some of the text for this column

for complaint in complaints.head(8):
    if complaint['text']:
        print('* ' + complaint['text'] + '\n')

# Let's Find out what they are complaining about

Let's start with starting to count the words represented in the data. But not all words are useful so we'll filter out the common words that can be omitted from a sentence and still have it make some sense.

These *stop words* will have high counts and aren't very useful for NLP so we will filter them out.

The NLP library, including stop-words dictionary, was installed via bootstrap script on all nodes in the cluster.

In [None]:
# Note - stopwords dictionary was installed via bootstrap script

from nltk.corpus import stopwords 
stop_words = set(stopwords.words('english'))

#--Ignore word fragments from suppressing PII in the data set
stop_words.add('xx')
stop_words.add('xxxx')
stop_words

In [None]:
# Tokenize via regex - very quick, could be better.

import re

def emitWords(row):
    if row['text']:
        words = []
        tokens = re.split('\W+',row['text'].lower())
        for token in tokens:
            stripped = token.strip("$.,1234567890\\/';{}~!?-")
            if stripped and (stripped not in stop_words):
                words.append(stripped)
        return words
    else:
        return []    

# test - see how emit words parses the following sentence (code local to leader node)
emitWords({'text':"running. $949 . can't stop. won't stop? runners run on runs"})

In [None]:
# We can apply functions that change the shape of data by applying flatMap on the underlying rdd. This is a 'map-reduce' style operation on Spark
counts = complaints.rdd.flatMap(emitWords) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)

top_words = counts.top(15, key=lambda x: x[1])
top_words


EMR Notebooks can install Python packages on the leader node. This is useful for viewing or charting data.

https://aws.amazon.com/blogs/big-data/install-python-libraries-on-a-running-cluster-with-emr-notebooks/


In [None]:
# list installed packages
sc.list_packages()


In [None]:
sc.install_pypi_package("pandas") 
sc.install_pypi_package("matplotlib")

In [None]:
import matplotlib
import pandas as pd
import matplotlib.pyplot as plt 

In [None]:
top_words_pd = pd.DataFrame(top_words,columns=['words','count'])

In [None]:
top_words_pd.sort_values(by='count').plot.barh(x='words', y='count', rot=0,figsize=(10,10))

# Use Jupyter Magic to show the plot
%matplot plt

Try checking the frequency of words you thought would be common below by replacing "happy" with anything else.

In [None]:
counts.filter(lambda x: "happy" == x[0]).collect()

## Using custom python libraries installed on the cluster

The previous language model is very simple (regex). Many english words have various versions that mean more or less the same thing. If we want to break the words (run vs runs) and make sure the root is always a word this is called lemmatization. We can use language models like NLTK. 

In [None]:
# Apply lemmatization to the keys to combine counts for words that mean the same thing

# language model installed on cluster via bootstrap action


from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

def lemmatize_record(record):
    (word, count) = record
    return (lemmatizer.lemmatize(word),count)


# See lemmatiation in action...
words = [('go',1),('goes',1), ('run',1), ('runs',1)]
for record in words:
    lemma =lemmatize_record(record)
    print (f"{record[0]} becomes {lemma[0]}")

Try lemmatization on the word counts and then compare if the top words have changed.

In [None]:
combined = counts.map(lemmatize_record).reduceByKey(lambda a, b: a + b)
top_words_combined = combined.top(15, key=lambda x: x[1])
top_words_combined_pd = pd.DataFrame(top_words_combined,columns=['words','count'])


both_pd = pd.merge(
    top_words_pd,
    top_words_combined_pd,
    how="left",
    on='words',
    left_on=None,
    right_on=None,
    left_index=False,
    right_index=False,
    sort=False,
    suffixes=("_regex", "_lemmatized"),
    copy=True,
    indicator=False,
    validate=None,
)

both_pd.set_index('words').sort_values(by='count_regex',ascending = True).plot.barh(figsize=(10,10))


%matplot plt

Optional: Specify S3 bucket in your own account if you'd like to save the results back to Amazon S3

In [None]:
#Export Results back to S3

output_s3_bucket_name= "<bucket_name>" # just the bucket name
output_path="complaints"

s3_out = f"s3://{s3_bucket_name}/{path}"

combined.saveAsTextFile(s3_out)
