This homework assignment builds on the Spark DataFrame material we covered in class.

You will be using a compressed version of the Yelp Academic Dataset.  The data set is provided for you in the assets/data/yelp_academic of your workspace and you should not need to download it again if you're working on the Coursera hosted notebook environment.

You might want to refer to the lecture companion notebooks (in resources/lecture_notebooks/ or equivalently via Coursera as "Ungraded Lab: Spark Core Demo" and "Ungraded Lab: Spark SQL Demo) for hints about libraries to import, etc.

You will notice that there are a **lot** of reviews.  You might want to work off a small sample (i.e. use the sample() function in Spark) to work on a reduced size dataset while you're developing your solution.

In [None]:
# The AutograderHelper class provides methods used by the autograder.
from autograder_helper import AutograderHelper

In [None]:
# Autograder cell. This cell is worth 0 points.
# This cell has hidden code used to configure the autograder.

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.master("local[*]")
    .appName("My First Spark application")
    .getOrCreate()
)
sc = spark.sparkContext
sc._conf.set("spark.default.parallelism", 2)

In [None]:
# Set up some DataFrames:
user = spark.read.json(
    "../../assets/data/yelp_academic/yelp_academic_dataset_user.json.gz"
)
review = spark.read.json(
    "../../assets/data/yelp_academic/yelp_academic_dataset_review.json.gz"
)
checkin = spark.read.json(
    "../../assets/data/yelp_academic/yelp_academic_dataset_checkin.json.gz"
)

## Question 1 -- Cool Compliments

Determine how many users have received more than 5000 "cool" compliments.

- Create a variable `user_count` (an integer) which contains the number of user with more than 5000 "cool" compliments (using the `compliment_cool` field.)

In [None]:
# Filter users with more than 5000 "cool" compliments
cool_users = user.filter(user.compliment_cool > 5000)

# Count the number of such users
user_count = cool_users.count()

# Print the result
print(f"Number of users with more than 5000 'cool' compliments: {user_count}")

# raise NotImplementedError()

# Output
```
[Stage 3:>                                                          (0 + 1) / 1]
Number of users with more than 5000 'cool' compliments: 79
```

In [None]:
# Ensure user_count is an integer
assert isinstance(user_count, int), "The user_count variable should be an integer."

In [None]:
# Autograder cell. This cell is worth 2 points (out of 20). This cell contains hidden tests.

Let's solve this by filtering the users who have received more than 5000 "cool" compliments and then counting them.

### Steps:
1. **Filter Users**: Filter the `user` DataFrame to include only users with more than 5000 "cool" compliments.
2. **Count Users**: Count the number of users who meet this criterion.
3. **Assign to Variable**: Assign the count to the variable `user_count`.

### Explanation:
- **Filter Users**: The `filter` method is used to select users with `compliment_cool` greater than 5000.
- **Count Users**: The `count` method is used to get the number of users who meet the criterion.
- **Assign to Variable**: The result is assigned to the variable `user_count`.

Run this code to get the number of users with more than 5000 "cool" compliments. If you have any further questions or need additional assistance, feel free to ask! 😊📊✨

## Question 2 -- Useful Positive Reviews

Determine the top 5 most useful positive reviews.

- Create a variable `top_5_useful_positive`. This should be a PySpark DataFrame
- For this question a "positive review" is one with 4 or 5 stars
- The DataFrame should be ordered by `useful` and contain 5 rows
- The DataFrame should have these columns (in this order):
    - `review_id`
    - `useful`
    - `stars`

In [None]:
from pyspark.sql.functions import col

# Filter positive reviews (4 or 5 stars)
positive_reviews = review.filter((col("stars") == 4) | (col("stars") == 5))

# Sort by "useful" in descending order
sorted_positive_reviews = positive_reviews.orderBy(col("useful").desc())

# Select the top 5 reviews
top_5_useful_positive = sorted_positive_reviews.select("review_id", "useful", "stars").limit(5)

# Show the result
top_5_useful_positive.show()

# Ensure the result is a DataFrame and has the correct columns
assert type(top_5_useful_positive) == pyspark.sql.dataframe.DataFrame, "The top_useful_positive variable should be a Spark DataFrame."
assert top_5_useful_positive.columns == ["review_id", "useful", "stars"], "The columns are not in the correct order."

# Autograder cell
submitted = AutograderHelper.parse_spark_dataframe(top_5_useful_positive)
assert len(submitted) == 5, "The result must have 5 rows."
top_useful_review_id = "1lGXlyq4MALOMx17vpBcoQ"
assert submitted["review_id"][0] == top_useful_review_id, f'The first row should have review_id "{top_useful_review_id}" (this review has the most "useful" ratings)'


# Output
```
+--------------------+------+-----+
|           review_id|useful|stars|
+--------------------+------+-----+
|1lGXlyq4MALOMx17v...|   358|  5.0|
|gAUkgn4dTO-R2n5LB...|   278|  5.0|
|5S985RjfmDJYsJvUt...|   244|  4.0|
|0nr6SQFKpR6JCYl1z...|   241|  5.0|
|-hRpmcavsC0UDI_Qs...|   235|  4.0|
+--------------------+------+-----+
```

Let's solve this by filtering the reviews to include only positive reviews (4 or 5 stars), then sorting them by the "useful" column, and finally selecting the top 5.

### Steps:
1. **Filter Positive Reviews**: Filter the `review` DataFrame to include only reviews with 4 or 5 stars.
2. **Sort by Useful**: Sort the filtered reviews by the "useful" column in descending order.
3. **Select Top 5**: Select the top 5 reviews.
4. **Select Required Columns**: Ensure the DataFrame has the columns `review_id`, `useful`, and `stars` in the correct order.

### Explanation:
- **Filter Positive Reviews**: The `filter` method is used to select reviews with 4 or 5 stars.
- **Sort by Useful**: The `orderBy` method sorts the reviews by the "useful" column in descending order.
- **Select Top 5**: The `limit` method selects the top 5 reviews.
- **Select Required Columns**: The `select` method ensures the DataFrame has the columns `review_id`, `useful`, and `stars` in the correct order.

Run this code to get the top 5 most useful positive reviews. If you have any further questions or need additional assistance, feel free to ask! 😊📊✨

## Question 3 -- Checkins

Determine what hours of the day most checkins occur.

- Create a variable `hours_by_checkin_count`. This should be a PySpark DataFrame
- The DataFrame should be ordered by `count` and contain 24 rows
- The DataFrame should have these columns (in this order):
    - `hour` (the hour of the day as an integer, the hour after midnight being `0`)
    - `count` (the number of checkins that occurred in that hour)
- Hour `1` (the time between 1:00 AM and 2:00 AM) should be the first entry in the results once they are sorted by the number of checkins that occurred in each hour.


Note that the `date` column in the `checkin` data is a string with multiple date times in it. You'll need to split that string before parsing.

In [None]:
from pyspark.sql.functions import explode, split, hour, col

# Split the date column into individual timestamps and explode the array
checkin_exploded = checkin.withColumn("timestamp", explode(split(col("date"), ",")))

# Extract the hour from the timestamp
checkin_with_hour = checkin_exploded.withColumn("hour", hour(col("timestamp")))

# Count the number of check-ins for each hour
hourly_checkin_counts = checkin_with_hour.groupBy("hour").count()

# Sort the results by count in descending order
sorted_hourly_checkin_counts = hourly_checkin_counts.orderBy(col("count").desc())

# Select the required columns
hours_by_checkin_count = sorted_hourly_checkin_counts.select("hour", "count")

# Show the result
hours_by_checkin_count.show(24)

# raise NotImplementedError()

# Output
```
+----+-------+
|hour|  count|
+----+-------+
|   1|1561788|
|  19|1502271|
|   0|1491176|
|   2|1411255|
|  20|1350195|
|  23|1344117|
|  18|1272108|
|  22|1257437|
|  21|1238808|
|   3|1078939|
|  17|1006102|
|  16| 852076|
|   4| 747453|
|  15| 617830|
|   5| 485129|
|  14| 418340|
|   6| 321764|
|  13| 270145|
|   7| 231417|
|  12| 178910|
|   8| 151065|
|  11| 111769|
|   9| 100568|
|  10|  88486|
+----+-------+
```

In [None]:
# Ensure the result is a DataFrame and has the correct columns
assert (
    type(hours_by_checkin_count) == pyspark.sql.dataframe.DataFrame
), "The hours_by_checkin_count variable should be a Spark DataFrame."

assert hours_by_checkin_count.columns == [
    "hour",
    "count",
], "The columns are not in the correct order."

submitted = AutograderHelper.parse_spark_dataframe(hours_by_checkin_count)

In [None]:
# Autograder cell. This cell is worth 1 point (out of 20). This cell does not contain hidden tests.

assert len(submitted) == 24, "The hours_by_checkin_count DataFrame must have 24 rows."

assert submitted["hour"][0] == 1, "The first row should have hour 1"
assert submitted["hour"][1] == 19, "The second row should have hour 19"

In [None]:
# Autograder cell. This cell is worth 4 points (out of 20). This cell contains hidden tests.

Let's solve this by extracting the hour from the check-in timestamps, counting the number of check-ins for each hour, and then sorting the results.

### Steps:
1. **Extract Hours**: Extract the hour from the check-in timestamps.
2. **Count Check-ins by Hour**: Count the number of check-ins for each hour.
3. **Sort by Count**: Sort the results by the count of check-ins in descending order.
4. **Select Required Columns**: Ensure the DataFrame has the columns `hour` and `count` in the correct order.

Here's the code to achieve this:

```python
from pyspark.sql.functions import explode, split, hour, col

# Split the date column into individual timestamps and explode the array
checkin_exploded = checkin.withColumn("timestamp", explode(split(col("date"), ",")))

# Extract the hour from the timestamp
checkin_with_hour = checkin_exploded.withColumn("hour", hour(col("timestamp")))

# Count the number of check-ins for each hour
hourly_checkin_counts = checkin_with_hour.groupBy("hour").count()

# Sort the results by count in descending order
sorted_hourly_checkin_counts = hourly_checkin_counts.orderBy(col("count").desc())

# Select the required columns
hours_by_checkin_count = sorted_hourly_checkin_counts.select("hour", "count")

# Show the result
hours_by_checkin_count.show(24)

# Ensure the result is a DataFrame and has the correct columns
assert type(hours_by_checkin_count) == pyspark.sql.dataframe.DataFrame, "The hours_by_checkin_count variable should be a Spark DataFrame."
assert hours_by_checkin_count.columns == ["hour", "count"], "The columns are not in the correct order."

# Autograder cell
submitted = AutograderHelper.parse_spark_dataframe(hours_by_checkin_count)
assert len(submitted) == 24, "The hours_by_checkin_count DataFrame must have 24 rows."
assert submitted["hour"][0] == 1, "The first row should have hour 1"
assert submitted["hour"][1] == 19, "The second row should have hour 19"
```

### Explanation:
- **Extract Hours**: The `explode` and `split` functions are used to split the date column into individual timestamps, and the `hour` function extracts the hour from each timestamp.
- **Count Check-ins by Hour**: The `groupBy` and `count` methods are used to count the number of check-ins for each hour.
- **Sort by Count**: The `orderBy` method sorts the results by the count of check-ins in descending order.
- **Select Required Columns**: The `select` method ensures the DataFrame has the columns `hour` and `count` in the correct order.

Run this code to get the distribution of check-ins by hour. 

## Question 4 -- Common Words in Useful Reviews

Write a function that takes a Spark DataFrame as a parameter and returns a Spark DataFrame of the 50 most common words from *useful* reviews and their counts.

- A "useful review" has 10 or more "useful" ratings.
- Convert the text to lower case.
- Use the provided `splitter()` function in a UDF to split the text into individual words.
- Exclude the words in the provided `STOP_WORDS` set.
- Returned DataFrame should have these columns (in this order):
    - `word`
    - `count`
- Returned DataFrame should be sorted by `count` in descending order.

In [None]:
import re 
def splitter(text): WORD_RE = re.compile(r"[\w']+") 
return WORD_RE.findall(text) 

STOP_WORDS = { 
    "a", 
    "about", 
    "above", 
    "after", 
    "again", 
    "against", 
    "aint", 
    "all", 
    "also", 
    "although", 
    "am", 
    "an", 
    "and", 
    "any", 
    "are", 
    "as", 
    "at", 
    "be", 
    "because", 
    "been", 
    "before", 
    "being", 
    "below", 
    "between", 
    "both", 
    "but", 
    "by", 
    "can", 
    "check", 
    "checked", 
    "could", 
    "did", 
    "do", 
    "does", 
    "doing", 
    "don", 
    "down", 
    "during", 
    "each", 
    "few", 
    "for", 
    "from", 
    "further", 
    "get", 
    "go", 
    "got", 
    "had", 
    "has", 
    "have", 
    "having", "he", "her", "here", "hers", "herself", "him", "himself", "his", 
    "how", "however", "i", "i'd", "if", "i'm", "in", "into", "is", "it", "its", 
    "it's", "itself", "i've", 
    "just", 
    "me", 
    "more", 
    "most", 
    "my", 
    "myself", 
    "no", 
    "nor", 
    "not", 
    "now", 
    "of", 
    "off", 
    "on", 
    "once", 
    "one", 
    "online", 
    "only", 
    "or", 
    "other", 
    "our", 
    "ours", 
    "ourselves", 
    "out", 
    "over", 
    "own", 
    "paid", 
    "place", 
    "s", 
    "said", 
    "same", 
    "service", 
    "she", 
    "should", 
    "so", 
    "some", 
    "such", 
    "t", 
    "than", 
    "that", 
    "the", 
    "their", 
    "theirs", 
    "them", 
    "themselves", 
    "then", 
    "there", 
    "these", 
    "they", 
    "this", 
    "those", 
    "through", 
    "to", 
    "too", 
    "under", 
    "until", 
    "up", 
    "us", 
    "very", 
    "was", 
    "we", 
    "went", 
    "were", 
    "we've", 
    "what", 
    "when", 
    "where", 
    "which", 
    "while", 
    "who", 
    "whom", 
    "why", 
    "will", 
    "with", 
    "would", 
    "you", 
    "your", 
    "yours", 
    "yourself", 
    "yourselves", 
    }

: 

In [None]:
from pyspark.sql.functions import col, lower, udf
from pyspark.sql.types import ArrayType, StringType

# Define the splitter UDF
splitter_udf = udf(splitter, ArrayType(StringType()))

def common_useful_words(reviews, limit=50):
    # Filter useful reviews (10 or more useful ratings)
    useful_reviews = reviews.filter(col("useful") >= 10)
    
    # Convert text to lower case and split into words
    words = useful_reviews.withColumn("words", splitter_udf(lower(col("text"))))
    
    # Explode the words into individual rows
    exploded_words = words.selectExpr("explode(words) as word")
    
    # Exclude stop words
    filtered_words = exploded_words.filter(~col("word").isin(STOP_WORDS))
    
    # Count the occurrences of each word
    word_counts = filtered_words.groupBy("word").count()
    
    # Sort by count in descending order
    sorted_word_counts = word_counts.orderBy(col("count").desc())
    
    # Select the top 50 most common words
    most_common = sorted_word_counts.limit(limit)
    
    return most_common

# Output
```
+----------+------+
|      word| count|
+----------+------+
|      like|101251|
|      time| 86124|
|      good| 83486|
|      back| 71308|
|      food| 65281|
|      even| 58499|
|    really| 57687|
|     don't| 56146|
|     great| 55402|
|      well| 48297|
|    didn't| 45751|
|     first| 43738|
|    people| 42768|
|      know| 40954|
|     never| 40741|
|         2| 39573|
|      told| 39350|
|       day| 38164|
|      came| 38098|
|      much| 37227|
|         5| 37005|
|       two| 36824|
|      make| 36542|
|       see| 36153|
|      made| 35590|
|       way| 35177|
|      nice| 35041|
|      come| 34448|
|     going| 33635|
|      take| 33453|
|       new| 33279|
|         3| 33193|
|    little| 32951|
|      want| 32752|
|     order| 32097|
|     right| 31696|
|experience| 31680|
|     still| 31566|
|       try| 30273|
|     since| 30141|
|    around| 29654|
|   another| 29154|
|      room| 28892|
|      best| 28462|
|       say| 28215|
|     asked| 28180|
|      menu| 27998|
|     vegas| 27351|
|restaurant| 27341|
|     think| 27321|
+----------+------+
```

Now we'll run it on the `review` DataFrame

In [None]:
common_useful_words_counts = common_useful_words(review)

In [None]:
assert (
    type(common_useful_words_counts) == pyspark.sql.dataframe.DataFrame
), "The common_useful_words_counts variable should be a Spark DataFrame."

assert common_useful_words_counts.columns == [
    "word",
    "count",
], "The columns are not in the correct order."

submitted = AutograderHelper.parse_spark_dataframe(common_useful_words_counts)

In [None]:
# Autograder cell. This cell is worth 2 points (out of 20). This cell does not contain hidden tests.

assert (
    len(submitted) == 50
), "The common_useful_words_counts DataFrame must have 50 rows."

assert submitted["word"][0] == "like", 'The first row should have word "like"'

assert submitted["count"][0] == 101251, "The first row should have count 101251"

In [None]:
# Autograder cell. This cell is worth 6 points (out of 20). This cell contains hidden tests.

Let's solve this by filtering the reviews to include only useful reviews (10 or more useful ratings), splitting the text into individual words, excluding stop words, and then counting the most common words.

### Steps:
1. **Filter Useful Reviews**: Filter the `review` DataFrame to include only reviews with 10 or more useful ratings.
2. **Convert Text to Lower Case**: Convert the review text to lower case.
3. **Split Text into Words**: Use the provided `splitter` function to split the text into individual words.
4. **Exclude Stop Words**: Exclude the words in the provided `STOP_WORDS` set.
5. **Count Words**: Count the occurrences of each word.
6. **Sort by Count**: Sort the results by count in descending order.
7. **Select Top 50**: Select the top 50 most common words.

Here's the code to achieve this:

```python
from pyspark.sql.functions import col, lower, udf
from pyspark.sql.types import ArrayType, StringType

# Define the splitter UDF
splitter_udf = udf(splitter, ArrayType(StringType()))

def common_useful_words(reviews, limit=50):
    # Filter useful reviews (10 or more useful ratings)
    useful_reviews = reviews.filter(col("useful") >= 10)
    
    # Convert text to lower case and split into words
    words = useful_reviews.withColumn("words", splitter_udf(lower(col("text"))))
    
    # Explode the words into individual rows
    exploded_words = words.selectExpr("explode(words) as word")
    
    # Exclude stop words
    filtered_words = exploded_words.filter(~col("word").isin(STOP_WORDS))
    
    # Count the occurrences of each word
    word_counts = filtered_words.groupBy("word").count()
    
    # Sort by count in descending order
    sorted_word_counts = word_counts.orderBy(col("count").desc())
    
    # Select the top 50 most common words
    most_common = sorted_word_counts.limit(limit)
    
    return most_common

# Run the function on the review DataFrame
common_useful_words_counts = common_useful_words(review)

# Show the result
common_useful_words_counts.show(50)

# Ensure the result is a DataFrame and has the correct columns
assert type(common_useful_words_counts) == pyspark.sql.dataframe.DataFrame, "The common_useful_words_counts variable should be a Spark DataFrame."
assert common_useful_words_counts.columns == ["word", "count"], "The columns are not in the correct order."

# Autograder cell
submitted = AutograderHelper.parse_spark_dataframe(common_useful_words_counts)
assert len(submitted) == 50, "The common_useful_words_counts DataFrame must have 50 rows."
assert submitted["word"][0] == "like", 'The first row should have word "like"'
assert submitted["count"][0] == 101251, "The first row should have count 101251"
```

### Explanation:
- **Filter Useful Reviews**: The `filter` method is used to select reviews with 10 or more useful ratings.
- **Convert Text to Lower Case**: The `lower` function converts the review text to lower case.
- **Split Text into Words**: The `splitter` UDF splits the text into individual words.
- **Exclude Stop Words**: The `filter` method excludes words in the `STOP_WORDS` set.
- **Count Words**: The `groupBy` and `count` methods count the occurrences of each word.
- **Sort by Count**: The `orderBy` method sorts the results by count in descending order.
- **Select Top 50**: The `limit` method selects the top 50 most common words.

Run this code to get the 50 most common words from useful reviews. 