# Homework 4: Big Data

This homework assignment builds on the in-class work we did with Spark.
You will be using the [Yelp Academic Dataset](https://www.kaggle.com/yelp-dataset/yelp-dataset) and focusing primaily on the text of the reviews (i.e. the reviews.json.gz file).

**We suggest that you work in groups to make a plan to tackle this homework assignment.**

Here are the two questions that comprise the assignment:

1. List the 50 most common non-stopword words that are unique to *positive* reviews.
2. List the 50 most common non-stopword words that are unique to *negative* reviews.

As an example, consider the following two reviews:

* Positive: The meal was great, and the service was the best we ever experienced.
* Negative: The meal was awful.  It was the worst thing we ever experienced.

Assume our stopwords are {'the','was','and','the','was','we','it'}

* Positive unique: {'great', 'service', 'best'}

* Negative unique: {'awful', 'worst', 'thing'}

In this example, each unique word occurs just once, so the concept of "top 50" doesn't make sense.  For your data, you'll need to count the number of times each unique word occurs.

Because this is the final homework assignment in this course, we are leaving it up to you to operationalize most of the details.  For example, you will need to determine what constitutes a positive or a negative review.

**You should take care to document your work, preferably using markdown blocks. In-code commenting is also 
a good idea.**

You will also need to generate a list of stopwords.  Neither spaCy nor NLTK are available on AWS EMR, so you'll need to be creative in how you get a good list of stopwords into Spark.

Finally, you will notice that there are a **lot** of reviews.  You might want to work off a small sample (i.e. use the rdd.sample() function in Spark) to work on a reduced size dataset while you're developing your solution.

### REMEMBER TO TERMINATE YOUR AWS CLUSTER(S) WHEN YOU'RE DONE (OR WHEN YOU TAKE A BREAK)!

Please download your work in HTML and IPYNB formats and submit both to Canvas.

This section has all of the import statmenets I need plus some additional

In [1]:
from pyspark.sql import SQLContext
from pyspark import *
from pyspark.sql import Row
import csv
from pyspark.ml.feature import StopWordsRemover
import re
import pyspark.sql.functions as f
from pyspark.sql.functions import col, split, udf, concat, lit

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1555962736889_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
## Importing the spark dataframe 
df = spark.read.json('s3://umsi-data-science/data/yelp/review.json.gz')

VBox()

In [4]:
## printed schema of data for future reference
df.printSchema()

VBox()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

In [8]:
## Took a sample to make sure my method works before using on whole dataset 
sample = df.sample(False,0.0005,81)


VBox()

Question 1: List the 50 most common non-stopword words that are unique to *positive* reviews.

In [6]:
## Got a count of the sample to make sure it was a reasonable enough size 
sample.count()

VBox()

2634

There are 2634 rows in the sample data. 

In [22]:
## Take the sample and cast the text column to array<string> to work with the stopword remover in pyspark 
samp = sample.withColumn("text", split(col("text"), " ").cast("array<string>"))

## Removes the stopwords in the column
remover = StopWordsRemover(inputCol="text", outputCol="text2")
temp2 = remover.transform(samp)



VBox()

In [24]:
## Creates a new column called "text2" that has the once again concatinated data after it was separated for the array
temp3 = temp2.withColumn("text2", f.concat_ws(" ", "text2"))


VBox()

In [34]:
## This code is getiting rid of punctuation and digits and setting the words in text2 to lowercase 
temp4 = temp3.select(f.lower(f.regexp_replace(col("text2"), "[^0-9a-zA-Z ]", "")).alias('text2'))


VBox()

In [67]:
## This code i found on https://stackoverflow.com/questions/48927271/count-number-of-words-in-a-spark-dataframe
## It sorts the words from each row of the text column after splitting them 
## and counts the number of occurances for each

temp5 = temp4.withColumn('word', f.explode(f.split(f.col('text2'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

VBox()

In [71]:
## Gets rid of the '' word that showed the most and was always the most common 
temp6 = temp5.filter(temp5["word"] != '')

VBox()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)

+-----+-----+
| word|count|
+-----+-----+
|place| 1338|
| food| 1314|
| good| 1278|
|great| 1091|
| like| 1004|
+-----+-----+
only showing top 5 rows

Creating Function to test everything based on successful sample above.
Here, I realized that both lists need to be unique so I created an optional parameter I will use to ensure they are unique. 

In [100]:
def top50words(d, d2 = None):
    samp = d.withColumn("text", split(col("text"), " ").cast("array<string>"))
    remover = StopWordsRemover(inputCol="text", outputCol="text2")
    temp2 = remover.transform(samp)
    temp3 = temp2.withColumn("text2", f.concat_ws(" ", "text2"))
    temp4 = temp3.select(f.lower(f.regexp_replace(col("text2"), "[^0-9a-zA-Z ]", "")).alias('text2'))
    temp5 = temp4.withColumn('word', f.explode(f.split(f.col('text2'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)
    temp6 = temp5.filter(temp5["word"] != '')
    temp6 = temp6.select(col("word"), col("count").alias("occurances"))
    ## This part of the function makes sure lists are unique
    if d2 != None:
        ## Number to make DF smaller and still make sure theres 50 unique words 
        d1 = dict()
        og_key = d2.keys()
        for x in temp6.take(100):
            d1[x.word] = x.occurances
        return {k:v for k, v in d1.iteritems() if k not in og_key}
    else: 
        d = dict()
        for x in temp6.take(50):
            d[x.word] = x.occurances
        return d

VBox()

I'll be running the code on a sample because there's been issues with running on the whole dataset. 

In [101]:
## here I'm splitting the positive reviews from negative based on how many stars they received
## I made a positive review 5 stars and a negative 1 star so that there should be a drasitc difference in words
df_samp = df.sample(False,0.05,20)
p = df_samp[df_samp['stars'] == 5]
n = df_samp[df_samp['stars'] == 1]

VBox()

In [102]:
## Running function on positive reviews 
pos_top_50 = top50words(p)

VBox()

In [104]:
## Running functionon negative reviews
neg_top_50 = top50words(n, pos_top_50)


VBox()

In [111]:
## Shows the top 50 positive words 
pos = sorted([(x[0], x[1]) for x in pos_top_50.iteritems()], key = lambda x: x[1], reverse = True)[:50]
for x in pos:
    print(x)

VBox()

(u'great', 62077)
(u'place', 50696)
(u'food', 42881)
(u'good', 38560)
(u'service', 34456)
(u'time', 33214)
(u'get', 28510)
(u'one', 28087)
(u'best', 28051)
(u'like', 27597)
(u'back', 26883)
(u'go', 24888)
(u'love', 24564)
(u'really', 23327)
(u'amazing', 23236)
(u'always', 22898)
(u'also', 22656)
(u'friendly', 20584)
(u'staff', 19602)
(u'definitely', 17883)
(u'well', 17856)
(u'it', 17652)
(u'us', 17419)
(u'recommend', 17377)
(u'nice', 17334)
(u'delicious', 16855)
(u'even', 16676)
(u'got', 16308)
(u'first', 14588)
(u'try', 14328)
(u'made', 13797)
(u'experience', 13740)
(u'new', 13109)
(u'come', 13052)
(u'everything', 12956)
(u'make', 12921)
(u'awesome', 12357)
(u'went', 12238)
(u'restaurant', 12233)
(u'came', 12210)
(u'every', 12012)
(u'little', 11954)
(u'ever', 11935)
(u'never', 11838)
(u'vegas', 11615)
(u'much', 11500)
(u'work', 11251)
(u'fresh', 11161)
(u'going', 11130)
(u'day', 11065)

In [110]:
## Shows the top 50 negative word
neg = sorted([(x[0], x[1]) for x in neg_top_50.iteritems()], key = lambda x: x[1], reverse = True)[:50]
for x in neg:
    print(x)

VBox()

(u'said', 13489)
(u'told', 12944)
(u'asked', 9623)
(u'order', 9129)
(u'minutes', 8650)
(u'people', 8272)
(u'customer', 7519)
(u'know', 7194)
(u'called', 7178)
(u'ordered', 7045)
(u'another', 7016)
(u'bad', 6418)
(u'take', 6340)
(u'manager', 6325)
(u'car', 6317)
(u'took', 6311)
(u'give', 6157)
(u'way', 6046)
(u'still', 6029)
(u'two', 5978)
(u'2', 5977)
(u'call', 5901)
(u'want', 5864)
(u'money', 5756)
(u'left', 5391)
(u'worst', 5302)
(u'say', 5195)
(u'me', 5162)
(u'again', 5050)
(u'room', 4979)
(u'wait', 4901)
(u'better', 4879)
(u'wanted', 4868)
(u'see', 4835)
(u'rude', 4814)
(u'business', 4661)
(u'around', 4594)
(u'i', 4579)
(u'last', 4545)
(u'since', 4481)
(u'right', 4458)
(u'3', 4456)
(u'times', 4450)
(u'table', 4399)
(u'here', 4390)
(u'horrible', 4248)
(u'phone', 4197)
(u'pay', 4172)
(u'next', 4146)
(u'nothing', 4111)