# 0. Installing PySpark in Google Colab

Install Dependencies (needs to be done once each time you re-open this notebook):

1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)

In [2]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz

# unzip the spark file to the current folder
!tar xf spark-3.5.4-bin-hadoop3.tgz

# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

# install findspark using pip
!pip install -q findspark
import findspark
findspark.init()


- Mount your Google Drive folder to access files
- Needs to be done once each time you restart your runtime

In [3]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


# 1. Download Amazon Review Dataset

- Run the following cell to download the dataset and copy them to your Google Drive to keep a permanent copy so that you don't need to re-download everytime
- Only need to run for the first time
- We will be using the following two files for this task. Details of dataset can be found at https://cseweb.ucsd.edu/%7Ejmcauley/datasets/amazon_v2/#subsets
  - Grocery_and_Gourmet_Food_5.json.gz is from 5-core review data
  - meta_Grocery_and_Gourmet_Food.json.gz is from metadata under Per-category data

# 2. Amazon Review Analysis

**Task 1: Write a Spark program to find the top 15 products based on their number of reviews and average ratings**

Step 1. Calculate the number of reviews and average ratings for each asin. Use RDD, reduceByKey and map function to accomplish this step. Your RDD should have product asin as the key, and tuple of (#reviews, average_rating) as the value.


In [5]:
from pyspark import SparkContext
sc = SparkContext()

import json
file_path1 = './gdrive/MyDrive/CS5344_AY2425Sem2_Lab/Grocery_and_Gourmet_Food_5.json'
file_path2 = './gdrive/MyDrive/CS5344_AY2425Sem2_Lab/meta_Grocery_and_Gourmet_Food.json'
# read the json file
data1 = sc.textFile(file_path1)
data2 = sc.textFile(file_path2)

In [7]:
print(data1.take(1))
print(data2.take(1))

['{"overall": 5.0, "verified": true, "reviewTime": "11 19, 2014", "reviewerID": "A1QVBUH9E1V6I8", "asin": "4639725183", "reviewerName": "Jamshed Mathur", "reviewText": "No adverse comment.", "summary": "Five Stars", "unixReviewTime": 1416355200}']
['{"category": ["Grocery & Gourmet Food", "Dairy, Cheese & Eggs", "Cheese", "Gouda"], "tech1": "", "description": ["BEEMSTER GOUDA CHEESE AGED 18/24 MONTHS", "Statements regarding dietary supplements have not been evaluated by the FDA and are not intended to diagnose, treat, cure, or prevent any disease or health condition."], "fit": "", "title": "Beemster Gouda - Aged 18/24 Months - App. 1.5 Lbs", "also_buy": [], "tech2": "", "brand": "Ariola Imports", "feature": [], "rank": "165,181 in Grocery & Gourmet Food (", "also_view": ["B0000D9MYM", "B0000D9MYL", "B00ADHIGBA", "B00H9OX598", "B001LM42GY", "B001LM5TDY"], "main_cat": "Grocery", "similar_item": "", "date": "", "price": "$41.91", "asin": "0681727810", "imageURL": [], "imageURLHighRes": []

In [9]:
# target => (asin, (#reviews, average_rating))

# map => (asin, (1, rating))
def mapper1(data):
  line = json.loads(data)
  asin = line['asin']
  rating = line['overall']
  return (asin, (1, rating))

# reduce => (asin, (#reviews, total_rating))
def reducer1(a, b):
  return (a[0]+b[0], a[1]+b[1])

In [10]:
mapped_data1 = data1.map(mapper1)
reduced_data1 = mapped_data1.reduceByKey(reducer1)
results1 = reduced_data1.mapValues(lambda x: (x[0], x[1]/x[0]))

In [11]:
print(results1.take(10))

[('B000052X2S', (18, 4.555555555555555)), ('B0000CDBPT', (48, 4.5)), ('B0000D9MQV', (7, 4.428571428571429)), ('B0000DHZY1', (24, 4.791666666666667)), ('B0000DID5R', (42, 4.4523809523809526)), ('B0000E5L25', (6, 4.666666666666667)), ('B0000GHNV8', (24, 4.458333333333333)), ('B0000GK068', (19, 4.473684210526316)), ('B0000GHNUE', (22, 4.318181818181818)), ('B0000TW1NU', (5, 5.0))]


Step 2.  Create an RDD where the key is the product_asin and value is the brand name of the product using the metadata file. Remove any duplicated entries in the RDD.

In [12]:
# target output => (asin, brand)
def mapper2(data):
  line = json.loads(data)
  asin = line['asin']
  brand = line['brand']
  return (asin, brand)

mapped_data2 = data2.map(mapper2)
results2 = mapped_data2.distinct()

In [13]:
print(results2.take(10))

[('4858582000', 'Wagh Bakri'), ('9177121805', 'IKEA'), ('B000096O8Y', ''), ('B0000A0BS5', 'Starbucks'), ('B0000A0MCW', 'Wilton'), ('B0000CAV37', '1001 Coffees'), ('B0000CD07A', ''), ('B0000CD9PL', 'Hancock Gourmet Lobster Company'), ('B0000CDC3J', ''), ('B0000CDE88', 'The Ginger People')]


Step 3.  Join the pair RDD obtained in Step 1 and the RDD created in Step 2 by the product asin.

In [14]:
# Step1: RDD results1 => (asin, (#reviews, average_rating))
# Step2: RDD results2 => (asin, brand)

joined_rdd = results1.join(results2)  #(asin, ((#reviews, average_rating), brand))

In [15]:
for asin, ((reviews, avg_rating), brand) in joined_rdd.take(5):
  print(f"ASIN: {asin}, Reviews: {reviews}, Average Rating: {avg_rating}, Brand: {brand}")

ASIN: B0000CDBPT, Reviews: 48, Average Rating: 4.5, Brand: Magic Seasoning Blends
ASIN: B0000D9MQV, Reviews: 7, Average Rating: 4.428571428571429, Brand: Val de Saone
ASIN: B0000DID5R, Reviews: 42, Average Rating: 4.4523809523809526, Brand: Dave's Gourmet
ASIN: B0000GK068, Reviews: 19, Average Rating: 4.473684210526316, Brand: Pulparindo
ASIN: B0000GHNUE, Reviews: 22, Average Rating: 4.318181818181818, Brand: El Yucateco


Step 4. Take the top 15 entries, sorted by number of reviews in descending order. If multiple entries with same number of reviews, sort by average rating in descending order.


In [16]:
# sort by descending: #reviews, avg_rating
# x = joined_rdd: (asin, ((#reviews, average_rating), brand))
sorted_rdd = joined_rdd.sortBy(lambda x: (-x[1][0][0], -x[1][0][1]))

In [17]:
top_15 = sorted_rdd.take(15)

In [18]:
for asin, ((reviews, avg_rating), brand) in top_15:
  print(f"ASIN: {asin}, #reviews: {reviews}, avg rating: {avg_rating}, brand: {brand}")

ASIN: B00BUKL666, #reviews: 7387, avg rating: 4.585352646541221, brand: KIND
ASIN: B008QMX2SG, #reviews: 6228, avg rating: 4.573538856775851, brand: KIND
ASIN: B00D3M2QP4, #reviews: 6221, avg rating: 4.573701977174088, brand: KIND
ASIN: B00R7PWK7W, #reviews: 3387, avg rating: 4.568644818423383, brand: KIND
ASIN: B000X3TPHS, #reviews: 3030, avg rating: 4.756435643564356, brand: YumEarth
ASIN: B000F4DKAI, #reviews: 2922, avg rating: 4.623545516769336, brand: Twinings
ASIN: B0001LO3FG, #reviews: 2922, avg rating: 4.623545516769336, brand: Twinings
ASIN: B00KSN9TME, #reviews: 2637, avg rating: 4.583996966249526, brand: KIND
ASIN: B000U0OUP6, #reviews: 2560, avg rating: 4.55859375, brand: Planters
ASIN: B000E1FZHS, #reviews: 2555, avg rating: 4.55812133072407, brand: Planters
ASIN: B00542YXFW, #reviews: 2455, avg rating: 4.374745417515275, brand: Davidson's Tea
ASIN: B00RW0MZ6S, #reviews: 2168, avg rating: 4.550738007380073, brand: Planters
ASIN: B000Z93FQC, #reviews: 2064, avg rating: 4.72

Step 5: Save your RDD into a file. Print out the product asin, #reviews, average rating and brand of your 15 entries and the expected output is:
```
[0] ASIN: B00BUKL666, #reviews: 7387, avg rating: 4.585352646541221, brand: KIND
[1] ASIN: B008QMX2SG, #reviews: 6228, avg rating: 4.573538856775851, brand: KIND
[2] ASIN: B00D3M2QP4, #reviews: 6221, avg rating: 4.573701977174088, brand: KIND
[3] ASIN: B00R7PWK7W, #reviews: 3387, avg rating: 4.568644818423383, brand: KIND
[4] ASIN: B000X3TPHS, #reviews: 3030, avg rating: 4.756435643564356, brand: YumEarth
[5] ASIN: B000F4DKAI, #reviews: 2922, avg rating: 4.623545516769336, brand: Twinings
[6] ASIN: B0001LO3FG, #reviews: 2922, avg rating: 4.623545516769336, brand: Twinings
[7] ASIN: B00KSN9TME, #reviews: 2637, avg rating: 4.583996966249526, brand: KIND
[8] ASIN: B000U0OUP6, #reviews: 2560, avg rating: 4.55859375, brand: Planters
[9] ASIN: B000E1FZHS, #reviews: 2555, avg rating: 4.55812133072407, brand: Planters
[10] ASIN: B00542YXFW, #reviews: 2455, avg rating: 4.374745417515275, brand: Davidson's Tea
[11] ASIN: B00RW0MZ6S, #reviews: 2168, avg rating: 4.550738007380073, brand: Planters
[12] ASIN: B000Z93FQC, #reviews: 2064, avg rating: 4.724806201550388, brand: YS Royal Jelly/Honey Bee
[13] ASIN: B00XA8XWGS, #reviews: 2053, avg rating: 4.590842669264491, brand: Twinings
[14] ASIN: B00XOORKRK, #reviews: 1980, avg rating: 4.543939393939394, brand: Planters
```






In [32]:
# save the top_15 rdd
save_path = './gdrive/MyDrive/CS5344_AY2425Sem2_Lab/Lab-part2_out'  # the save_path cannot exist before
sc.parallelize(top_15).coalesce(1).saveAsTextFile(save_path)    # coalesce(1) specifies the number of partitions to be 1 and does not generate multiple files

In [33]:
# same with above
txt_path = './gdrive/MyDrive/CS5344_AY2425Sem2_Lab/Lab-part2_out/task1_top15.txt'
with open(txt_path, "w") as f:
  for record in top_15:
    f.write(f"{record}\n")

**Task 2: Write a Spark program to compute word counts and find common words in the reviews of each product**

Step 1: Read the reviews from the review file and preprocess the text. Eliminate punctuation and special characters, convert all words to lowercase, and tokenize the text into individual words. Your RDD should have the product asin as key and the list of words as value. You can remove entries with missing reviews.

In [19]:
# target output => (asin, ['word1','word2',...])
import re

def mapper3(data):
  line = json.loads(data)
  asin = line['asin']
  text = line.get('reviewText')   # use get to avoid error with no reviewText

  if not text:    # remove missing reviews
    return None

  words = re.findall(r'\b[a-z]+\b', text.lower())  # r'\b\W+\b'

  return (asin, words)


mapped_data3 = data1.map(mapper3).filter(lambda x: x is not None)

In [20]:
print(mapped_data3.take(5))

[('4639725183', ['no', 'adverse', 'comment']), ('4639725183', ['gift', 'for', 'college', 'student']), ('4639725183', ['if', 'you', 'like', 'strong', 'tea', 'this', 'is', 'for', 'you', 'it', 'might', 'even', 'be', 'a', 'little', 'too', 'strong', 'for', 'me']), ('4639725183', ['love', 'the', 'tea', 'the', 'flavor', 'is', 'way', 'better', 'than', 'the', 'regular', 'lipton', 'black', 'tea', 'definetly', 'worth', 'the', 'money']), ('4639725183', ['i', 'have', 'searched', 'everywhere', 'until', 'i', 'browsed', 'amazon', 'and', 'found', 'it', 'this', 'is', 'the', 'tea', 'lipton', 'should', 'be', 'selling', 'on', 'grocery', 'store', 'shelves', 'the', 'stuff', 'i', 'last', 'purchased', 'is', 'just', 'awful', 'and', 'nowhere', 'near', 'as', 'good', 'as', 'i', 'remember'])]


Step 2: Calculate the frequency of each word in each product asin. Your RDD should have tuple (product_asin, word) as key and count of the corresponding word for that product as value. You can use reduceByKey() to accomplish this step.

In [21]:
# target output => ((asin, word), count)
def reducer3(a, b):
  return a+b

shuffled_data3 = mapped_data3.flatMapValues(lambda x: x).map(lambda x: ((x[0], x[1]), 1))
reduced_data3 = shuffled_data3.reduceByKey(reducer3)

In [22]:
print(shuffled_data3.take(5))
print(reduced_data3.take(5))

[(('4639725183', 'no'), 1), (('4639725183', 'adverse'), 1), (('4639725183', 'comment'), 1), (('4639725183', 'gift'), 1), (('4639725183', 'for'), 1)]
[(('4639725183', 'this'), 12), (('4639725183', 'it'), 12), (('4639725183', 'too'), 1), (('4639725183', 'love'), 2), (('4639725183', 'regular'), 7)]


Step 3: Find the top 10 words from reviews of the same product. Your RDD should have product_asin as key and the list containing the tuples (word, count) as value.

In [23]:
# ((asin, word), count) => (asin, (word, count))
mapped_data4 = reduced_data3.map(lambda x: (x[0][0], (x[0][1], x[1])))
reduced_data4 = mapped_data4.groupByKey().mapValues(
    lambda words: sorted(list(words), key=lambda x: x[1], reverse=True)[:10]    # key=lambda x: x[1] means sorted by 'count' in x=(word, count)
)

In [24]:
print(reduced_data4.take(5))

[('B000052X2S', [('of', 11), ('i', 11), ('the', 11), ('a', 8), ('to', 8), ('drops', 8), ('in', 7), ('that', 6), ('is', 6), ('and', 6)]), ('B0000CDBPT', [('the', 87), ('and', 71), ('i', 71), ('a', 44), ('to', 41), ('of', 37), ('it', 29), ('is', 28), ('this', 23), ('are', 20)]), ('B0000D9MQV', [('and', 6), ('was', 5), ('cheese', 4), ('is', 4), ('brie', 4), ('it', 4), ('the', 4), ('taste', 4), ('great', 3), ('i', 3)]), ('B0000DHZY1', [('good', 10), ('vanilla', 7), ('this', 7), ('i', 6), ('product', 6), ('but', 5), ('as', 5), ('is', 5), ('a', 5), ('great', 5)]), ('B0000DID5R', [('a', 123), ('the', 120), ('and', 91), ('it', 88), ('i', 86), ('to', 83), ('this', 81), ('is', 74), ('of', 74), ('you', 59)])]


Step 4: Save your RDD into a file. For your reference, the first 10 entries, ordered by product asin, is as follows:
```
[0] 4639725043: [('tea', 73), ('i', 59), ('the', 52), ('it', 39), ('a', 36), ('this', 34), ('and', 33), ('is', 31), ('of', 30), ('to', 22)]
[1] 4639725183: [('the', 30), ('i', 25), ('tea', 24), ('is', 13), ('a', 12), ('it', 12), ('this', 12), ('and', 12), ('lipton', 9), ('to', 9)]
[2] 5463213682: [('the', 12), ('i', 10), ('and', 6), ('coffee', 4), ('not', 4), ('supreme', 3), ('love', 3), ('cafe', 3), ('sugar', 3), ('it', 3)]
[3] 9742356831: [('i', 189), ('the', 179), ('and', 158), ('a', 156), ('it', 140), ('to', 106), ('curry', 93), ('this', 86), ('is', 77), ('in', 72)]
[4] B00004S1C5: [('i', 30), ('the', 29), ('to', 20), ('and', 19), ('a', 15), ('is', 14), ('this', 14), ('it', 14), ('for', 14), ('are', 12)]
[5] B00004W4VD: [('jerky', 9), ('of', 6), ('the', 5), ('and', 5), ('for', 5), ('to', 4), ('a', 4), ('meat', 4), ('you', 3), ('is', 3)]
[6] B000052X2S: [('the', 11), ('i', 11), ('of', 11), ('to', 8), ('a', 8), ('drops', 8), ('in', 7), ('is', 6), ('and', 6), ('for', 6)]
[7] B000052Y74: [('i', 29), ('it', 20), ('gum', 19), ('to', 18), ('and', 18), ('of', 18), ('a', 16), ('the', 16), ('you', 12), ('mouth', 12)]
[8] B00005344V: [('i', 91), ('the', 76), ('it', 63), ('and', 57), ('tea', 53), ('this', 50), ('to', 44), ('a', 41), ('my', 33), ('of', 30)]
[9] B00005BPQ9: [('the', 59), ('i', 51), ('a', 49), ('and', 37), ('to', 34), ('for', 31), ('in', 25), ('they', 22), ('of', 22), ('milk', 20)]
```






In [25]:
sorted_rdd4 = reduced_data4.sortByKey(ascending=True)
print(sorted_rdd4.take(10))

[('4639725043', [('tea', 73), ('i', 59), ('the', 52), ('it', 39), ('a', 36), ('this', 34), ('and', 33), ('is', 31), ('of', 30), ('to', 22)]), ('4639725183', [('the', 30), ('i', 25), ('tea', 24), ('is', 13), ('this', 12), ('it', 12), ('a', 12), ('and', 12), ('to', 9), ('lipton', 9)]), ('5463213682', [('the', 12), ('i', 10), ('and', 6), ('not', 4), ('coffee', 4), ('supreme', 3), ('sugar', 3), ('was', 3), ('love', 3), ('it', 3)]), ('9742356831', [('i', 189), ('the', 179), ('and', 158), ('a', 156), ('it', 140), ('to', 106), ('curry', 93), ('this', 86), ('is', 77), ('in', 72)]), ('B00004S1C5', [('i', 30), ('the', 29), ('to', 20), ('and', 19), ('a', 15), ('this', 14), ('it', 14), ('is', 14), ('for', 14), ('are', 12)]), ('B00004W4VD', [('jerky', 9), ('of', 6), ('and', 5), ('the', 5), ('for', 5), ('meat', 4), ('to', 4), ('a', 4), ('is', 3), ('works', 3)]), ('B000052X2S', [('of', 11), ('i', 11), ('the', 11), ('a', 8), ('to', 8), ('drops', 8), ('in', 7), ('that', 6), ('is', 6), ('and', 6)]), ('B

In [28]:
txt_path = './gdrive/MyDrive/CS5344_AY2425Sem2_Lab/Lab-part2_out/task2_top10.txt'
with open(txt_path, "w") as f:
  for record in sorted_rdd4.take(10):
    f.write(f"{record}\n")

In [29]:
sc.stop()