# SIADS 516: Homework 3

- **Dr. Chris Teplovs**, School of Information, University of Michigan
- **Kris Steinhoff**, School of Information, University of Michigan


This homework assignment builds on the Spark DataFrame material we covered in class.

You will be using a compressed version of the Yelp Academic Dataset.  The data set is provided for you in the assets/data/yelp_academic of your workspace and you should not need to download it again if you're working on the Coursera hosted notebook environment.

You might want to refer to the lecture companion notebooks (in resources/lecture_notebooks/ or equivalently via Coursera as "Ungraded Lab: Spark Core Demo" and "Ungraded Lab: Spark SQL Demo) for hints about libraries to import, etc.

You will notice that there are a **lot** of reviews.  You might want to work off a small sample (i.e. use the sample() function in Spark) to work on a reduced size dataset while you're developing your solution.

In [1]:
# The AutograderHelper class provides methods used by the autograder.
from autograder_helper import AutograderHelper

In [2]:
# Autograder cell. This cell is worth 0 points.
# This cell has hidden code used to configure the autograder.

In [3]:
from pyspark.sql import SparkSession 

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('My First Spark application') \
    .getOrCreate()
sc = spark.sparkContext

22/07/19 18:29:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# Set up some RDDs:
user = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_user.json.gz')
review = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_review.json.gz')
checkin = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_checkin.json.gz')

                                                                                

## -- COOL COMPLIMENTS --

Determine how many users have received more than 5000 "cool" compliments.

- Create a variable `user_count` (an integer) which contains the number of user with more than 5000 "cool" compliments (using the `compliment_cool` field.)

In [5]:
# YOUR CODE HERE
# raise NotImplementedError()

user_count=user.filter(user["compliment_cool"]>5000).count()

                                                                                

In [6]:
assert type(user_count) == int, "The user_count variable should be an integer."

In [7]:
# Autograder cell. This cell is worth 2 points (out of 20). This cell contains hidden tests.

## -- USEFUL POSITIVE REVIEWS --

Determine the top 5 most useful positive reviews.

- Create a variable `top_5_useful_positive`. This should be a PySpark DataFrame
- For this question a "positive review" is one with 4 or 5 stars
- The DataFrame should be ordered by `useful` and contain 5 rows
- The DataFrame should have these columns:
    - `review_id`
    - `useful`
    - `stars`

In [8]:
# YOUR CODE HERE
# raise NotImplementedError()

good_reviews=review.filter(review["stars"]>=4)
useful_reviews=good_reviews.sort("useful", ascending=False)
top_5=useful_reviews.take(5)
df_top_5=spark.createDataFrame(top_5)
top_5_useful_positive=df_top_5.select("review_id", "useful", "stars")

                                                                                

In [9]:
import pyspark

assert type(top_5_useful_positive) == pyspark.sql.dataframe.DataFrame, \
    "The top_useful_positive variable should be a Spark DataFrame."

submitted = AutograderHelper.parse_spark_dataframe(top_5_useful_positive)

In [10]:
# Autograder cell. This cell is worth 1 point (out of 20). This cell does not contain hidden tests.
# This cell deliberately includes answers to provide guidance on how this question is graded.

assert len(submitted) == 5, \
    "The result must have 5 rows."

top_useful_review_id = "1lGXlyq4MALOMx17vpBcoQ"
assert submitted["review_id"][0] == top_useful_review_id, \
    f'The first row should have review_id "{top_useful_review_id}" (this review has the most "useful" ratings)'

In [11]:
# Autograder cell. This cell is worth 4 points (out of 20). This cell contains hidden tests.

## -- CHECKINS --

Determine what hours of the day most checkins occur.

- Create a variable `hours_by_checkin_count`. This should be a PySpark DataFrame
- The DataFrame should be ordered by `count` and contain 24 rows
- The DataFrame should have these columns:
    - `hour` (the hour of the day as an integer, 0-23)
    - `count` (the number of checkins that occurred in that hour)


In [12]:
# YOUR CODE HERE
# raise NotImplementedError()

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import explode

datesplit=udf(lambda x: x.split(','), ArrayType(StringType()))
df_explode=checkin.select('business_id', datesplit('date').alias('dates')).withColumn('checkin_date', explode('dates'))

datestrip=udf(lambda x: x.strip(), StringType())
df_strip=df_explode.select('business_id', datestrip('checkin_date').alias('checkin_date'))

def hoursplit(string):
    
    time=string.split(" ")[1]
    hour=time.split(":")[0]
    
    if hour.isnumeric():
        return int(hour)
    else:
        return hour

hoursplit_int=udf(lambda x: hoursplit(x), IntegerType())
df_hour=df_strip.select('business_id', hoursplit_int('checkin_date').alias('hour'))

hours_by_checkin_count=df_hour.groupby('hour').count().sort('count', ascending=False)

In [13]:
assert type(hours_by_checkin_count) == pyspark.sql.dataframe.DataFrame, \
    "The top_useful_positive variable should be a Spark DataFrame."

submitted = AutograderHelper.parse_spark_dataframe(hours_by_checkin_count)

                                                                                

In [14]:
# Autograder cell. This cell is worth 1 point (out of 20). This cell does not contain hidden tests.

assert len(submitted) == 24, \
    "The top_useful_positive DataFrame must have 24 rows."

assert submitted["hour"][0] == 1, \
    'The first row should have hour 1'

In [15]:
# Autograder cell. This cell is worth 4 points (out of 20). This cell contains hidden tests.

## -- COMMON USEFUL WORDS --

Write a function that takes a Spark DataFrame as a parameter and returns a Spark DataFrame of the 50 most common words from *useful* reviews and their counts.

- A "useful review" has 10 or more "useful" ratings.
- Convert the text to lower case.
- Use the provided `splitter()` function in a UDF to split the text into individual words.
- Exclude the words in the provided `STOP_WORDS` set.
- Returned DataFrame should have these columns:
    - `word`
    - `count`

In [16]:
import re

def splitter(text):
    WORD_RE = re.compile(r"[\w']+")
    return WORD_RE.findall(text)


STOP_WORDS = {
    "a", "about", "above", "after", "again", "against", "aint", "all", "also", "although", "am", "an", "and", "any",
    "are", "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "can",
    "check", "checked", "could", "did", "do", "does", "doing", "don", "down", "during", "each", "few", "for", "from",
    "further", "get", "go", "got", "had", "has", "have", "having", "he", "her", "here", "hers", "herself", "him",
    "himself", "his", "how", "however", "i", "i'd", "if", "i'm", "in", "into", "is", "it", "its", "it's", "itself",
    "i've", "just", "me", "more", "most", "my", "myself", "no", "nor", "not", "now", "of", "off", "on", "once", "one",
    "online", "only", "or", "other", "our", "ours", "ourselves", "out", "over", "own", "paid", "place", "s", "said", 
    "same", "service", "she", "should", "so", "some", "such", "t", "than", "that", "the", "their", "theirs", "them",
    "themselves", "then", "there", "these", "they", "this", "those", "through", "to", "too", "under", "until", "up",
    "us", "very", "was", "we", "went", "were", "we've", "what", "when", "where", "which", "while", "who", "whom",
    "why", "will", "with", "would", "you", "your", "yours", "yourself", "yourselves",
}

def common_useful_words(reviews, limit=50):
    # YOUR CODE HERE
#     raise NotImplementedError()

    useful_reviews=reviews.filter(reviews["useful"]>=10)
    
    lower_case=udf(lambda x: x.lower(), StringType())
    df_lower=useful_reviews.select(lower_case('text').alias('lower_case_text'))
    
    splitter_str=udf(lambda x: splitter(x), ArrayType(StringType()))
    df_splitter=df_lower.select(splitter_str('lower_case_text').alias('words')).withColumn('word', explode('words'))
    
    df_filtered_words=df_splitter.filter(~df_splitter.word.isin(STOP_WORDS))
    df_words=df_filtered_words.select('word')
    
    df_count=df_words.groupby('word').count().sort('count', ascending=False)
    
    most_common_list=df_count.take(limit)
    most_common=spark.createDataFrame(most_common_list)
    
    return most_common

Now we'll run it on the `review` DataFrame

In [17]:
common_useful_words_counts = common_useful_words(review)

                                                                                

In [18]:
assert type(common_useful_words_counts) == pyspark.sql.dataframe.DataFrame, \
    "The common_useful_words_counts variable should be a Spark DataFrame."

submitted = AutograderHelper.parse_spark_dataframe(common_useful_words_counts)

In [19]:
# Autograder cell. This cell is worth 2 points (out of 20). This cell does not contain hidden tests.

assert len(submitted) == 50, \
    "The common_useful_words_counts DataFrame must have 24 rows."

assert submitted["word"][0] == 'like', \
    'The first row should have word "like"'

assert submitted["count"][0] == 101251, \
    'The first row should have count 101251'

In [20]:
# Autograder cell. This cell is worth 6 points (out of 20). This cell contains hidden tests.