In this notebook we get the following insights from Amazon Review Data (2018):
1. Price of Products <br>
2. Price of product per category <br>
3. Review Rate of Products <br>
4. The most rated product across(price, brand, category) with different combinations

In [None]:
!pip install pyspark



In [None]:
from pyspark import SparkContext
from itertools import product
import json
import re


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
DATA_PATH = '/content/drive/Shareddrives/CIE/BigData_Final_Project/Data'
RESULTS_PATH = '/content/drive/Shareddrives/CIE/BigData_Final_Project/Results'

In [None]:
# create Spark context with necessary configuration
sc = SparkContext("local","FinalProject")

### Price of Products

In [None]:
def product_price(line):
  # The function maps each json line into a key, value pair of (product_ID, price) 
  line = json.loads(line)  #load json object
  matched = re.match('\$\d+\.\d{2}', line['price']) #Check for the correct price format using regex matching
  if matched:
    return (line['asin'], matched.group(), line['title']) # Mapping each line to (product_ID, price)
  return (line['asin'], '', line['title'])

# Read the data as text file;
# Filter the price with regex matching
# sort by price ascendingly
productPrice = sc.textFile(DATA_PATH + "/meta").map(product_price)\
              .filter(lambda x:re.match('^\$\d+\.\d{2}', x[1]))\
              .sortBy(lambda x:float(x[1][1:]))
productPrice.saveAsTextFile(RESULTS_PATH+"/ProductPricewithName") # Save Output as text file

In [None]:
productPrice.take(2)

[('045122261X', '$0.96', 'In the Blood (The Darkwing Chronicles, Book 4)'),
 ('0594451647',
  '$1.46',
  'Barnes &amp; Noble HDTV Adapter Kit for NOOK HD and NOOK HD+')]

### Price of product per category

In [None]:
def product_price_category(line): 
  # This function maps each object into (category, (product_ID, price))
  line = json.loads(line)
  result = []
  matched = re.match('\$\d+\.\d{2}', line['price'])  #Check for the correct price format using regex matching
  if matched:
    for category in line['category']:
      # For each category this product underlies, a (category, (product_ID, price)) is returned
      result.append((category, (line['asin'], matched.group(),line['title']))) 
    return result
  return [('cat', (line['asin'], '*', line['title']))]  # If no matching price is there, and empty line is returned
# Read data as text files, and maps each object into (category, (product_ID, price))
# Group by category
# Sort the list of products in each category ascendingly according to their price
productPriceCategory = sc.textFile(DATA_PATH+"/meta").flatMap(product_price_category)\
                       .filter(lambda x:re.match('\$\d+\.\d{2}', x[1][1])).groupByKey()\
                       .mapValues(lambda x:sorted(list(x),key=lambda y:float(y[1][1:])))
productPriceCategory.saveAsTextFile(RESULTS_PATH+"/ProductPriceCategorywithName") # Save output as text file

In [None]:
productPriceCategory.take(1)

[('Electronics',
  [('045122261X', '$0.96', 'In the Blood (The Darkwing Chronicles, Book 4)'),
   ('0594451647',
    '$1.46',
    'Barnes &amp; Noble HDTV Adapter Kit for NOOK HD and NOOK HD+'),
   ('9512089130',
    '$1.48',
    'Black Label Bag SX-70 Polaroid Two Compartment Pouch Compatible with Instax'),
   ('1059844575',
    '$1.54',
    'SlimPort sli44532 Adapter for LG G3 Smartphone MyDP/Micro-USB to HDMI Adapter Connects Any MyDP Enabled Mobile Device and Play Content to Any HDTV or HDMI Enabled Device, Black'),
   ('1059999102',
    '$1.54',
    'SlimPort Adapter for LG G Pad 8.3 Tablet, MyDP/Micro-USB to HDMI Adapter Connects Any MyDP Enabled Mobile Device and Play Content to Any HDTV or HDMI Enabled Device, Black'),
   ('9573213893',
    '$1.54',
    'Dell Latitude E6420 E6430 E6430s E6320 E6330 DVD-RW Drive w/Faceplate &amp; caddy'),
   ('9573213877',
    '$1.54',
    'Dell Latitude E4200, E4300, E5400, E6320 E6420 E6520 E6400, E6430 E6500 Precision M2400, M4400, M6400 Exte

### Review Rate of Products

In [None]:
def review(line):
  # This function maps the reviewed products, to be used to count the reviews of each product
  line = json.loads(line)
  return line['asin']
# Read data as text file, and get the product_ids in the review dataset
reviews = sc.textFile(DATA_PATH+"/reviews").map(review)
# word count the presence of each product_ID (asin)
productCount = reviews.map(lambda product: (product, 1)).reduceByKey(lambda a,b:a +b).cache()
# Sorting the products from most reviewed to least, and save the output in a text file
productCount.sortBy(lambda x:x[1],ascending=False).saveAsTextFile(RESULTS_PATH+"/ProductReviewCount")

### The average overall rating of each product, sort them by value, to get the most rated product across(price, brand, category)

In [None]:
def product_rating(line):
  # This function maps the overall rating of each producs as (product_ID, overall_rating)
  line = json.loads(line)
  return (line['asin'], line['overall'])
# Read the review data as a text file, get the (product_ID, overall_rating), then add the value of ratings of a product
productRating = sc.textFile(DATA_PATH+"/reviews").map(product_rating).reduceByKey(lambda a,b:a +b)
# Getting the average rating by joining the number of reciews of a product, with overall_rating sum and getting the avg
productAvgRating = productRating.join(productCount).mapValues(lambda val: round(val[0]/val[1],2)).cache()

In [None]:
def mapping_group(value):
  # This function is a helper function to map the list to be sorted, according to rating 
  def sorting_key(x):
    # The sorting key function to be used by sort function
    return x[1][1]
  value.sort(key=sorting_key, reverse=True) # Sorting the list descendingly according to rating
  return value

In [None]:
def myround(x, base=50):
    # This helper function is used to put the prices in ranges of 50
    return base * round(x/base)

1. Across the price

In [None]:
def product_price(line):
   #This function is to map each product object into (product_ID, price)
  line = json.loads(line)
  matched = re.match('\$\d+\.\d{2}', line['price']) #Check for the correct price format using regex matching
  if matched:
    price = "$"+str(myround(float(matched.group()[1:])))+".00"
    return (line['asin'], price, line['title'])
  return (line['asin'], '', line['title'])
# Read the data as text file;
# Filter the price with regex matching
productPrice = sc.textFile(DATA_PATH+"/meta").map(product_price).filter(lambda x:re.match('\$\d+\.\d{2}', x[1]))
# Joining the output with average rating of each product, sorting with price ascendingly, then group by the price
# For each price range (ranges of 50$), a list of products for each price range is sorted accoding to overall_rating
priceRatings = productPrice.join(productAvgRating).sortBy(lambda x:x[1][1]).groupBy(lambda x:x[1][0])\
                           .mapValues(list).mapValues(mapping_group)
priceRatings.saveAsTextFile(RESULTS_PATH+"/PriceRatingSynergy")  #Saving output as text file

In [None]:
priceRatings.take(2)

[('$0.00',
  [('0312331363', ('$0.00', 5.0)),
   ('0578040069', ('$0.00', 5.0)),
   ('0060009810', ('$0.00', 4.71)),
   ('0545105668', ('$0.00', 4.69)),
   ('0312171048', ('$0.00', 4.62)),
   ('0594459451', ('$0.00', 4.54)),
   ('0446576476', ('$0.00', 4.44)),
   ('0446697192', ('$0.00', 3.93)),
   ('0060786817', ('$0.00', 3.67)),
   ('0151004714', ('$0.00', 3.58)),
   ('0380812916', ('$0.00', 3.55)),
   ('0375503757', ('$0.00', 3.38)),
   ('0375505458', ('$0.00', 3.17))])]

2. Category

In [None]:
def product_category(line): 
  # This function maps each product as (product_ID,category)
  line = json.loads(line)
  result = []
  for category in line['category']:
    # For each category this product underlies, a ((product_ID, category)) is returned
    result.append((line['asin'], category))
  return result
# read data as text files, and map each product as (product_ID,category)
productCategory = sc.textFile(DATA_PATH+"/meta").flatMap(product_category)
# The result is joined with the average rating of each product, then grouped by the Category
# For each category, the list of products is sorted descendingly according to their average overall rating
categoryRatings = productCategory.join(productAvgRating).groupBy(lambda x:x[1][0])\
                                 .mapValues(list).mapValues(mapping_group)
categoryRatings.saveAsTextFile(RESULTS_PATH+"/CategoryRatingsSynergy") # Saving output as text file

3. Brand

In [None]:
def product_brand(line): 
  # This function maps each product as (product_ID,brand)
  line = json.loads(line)
  return (line['asin'], line['brand'])
# read data as text files, and map each product as (product_ID,brand)
productBrand = sc.textFile(DATA_PATH+"/meta").map(product_brand)
# The result is joined with the average rating of each product, then grouped by the Brand
# For each category, the list of products is sorted descendingly according to their average overall rating
brandRatings = productBrand.join(productAvgRating).groupBy(lambda x:x[1][0])\
                           .mapValues(list).mapValues(mapping_group)
brandRatings.saveAsTextFile(RESULTS_PATH+"/BrandRatingsSynergy")  # Saving output as text file

### The average overall rating of each product, sort them by value, to get the most rated product across(price, category)

In [None]:
def product_price_categories(line):
  # This function maps each product as (product_ID,price,[list of categories]) 
  line = json.loads(line)
  matched = re.match('\$\d+\.\d{2}', line['price'])  #Check for the correct price format using regex matching
  if matched:
    return (line['asin'], (matched.group(), [category for category in line['category']]))
  return (line['asin'], ('*', []))
def flatenning_category(x):
  # This function puts the combinctions of each category and so differnt categories of each product are considered
  # The input value is in the form (price, [list of categories], rating)
  # The output value is (price, category, rating) for each category in the list
  return [comb for comb in product([x[0][0]],x[0][1],[x[1]])]
def categorical_prices_rating(line):
  # This function is a simple maping function to put the category as the key
  result = []
  for tuple in line[1]:
    result.append((tuple[1], (line[0], tuple[0], tuple[2])))
  return result
# Read data as text files
productPrice = sc.textFile(DATA_PATH+"/meta").map(product_price_categories)\
                .filter(lambda x:re.match('\$\d+\.\d{2}', x[1][0]))
# The result of (product_ID,price,[list of categories]) is joined with the average rating of each product
# Which is then mapped to split the list of categories using the flatenning fuction
# The result is then grouped by the category, andwithin each category the list of products is sorted by rating
categoryPriceRating = productPrice.join(productAvgRating).mapValues(flatenning_category)\
                                  .flatMap(categorical_prices_rating).groupByKey()\
                                  .mapValues(lambda x:sorted(list(x),key=lambda y:float(y[1][1:])))
categoryPriceRating.saveAsTextFile(RESULTS_PATH+"/CategoryPriceRatingwithPrice") # Save output as text files

### The average overall rating of each product, sort them by value, to get the most rated product across(price, brand)

In [None]:
def product_price_brand(line): 
  # This function maps each product as ((product_ID,(price,brand))
  line = json.loads(line)
  return (line['asin'], (line['price'], line['brand']))
def mapping_line(line):
  # This helper function is used to redistribute each line to put the brand as the key
  return (line[1][0][1], (line[0],line[1][0][0],line[1][1]))
# read data from text files, map each product as ((product_ID,(price,brand)), 
productPriceBrand = sc.textFile(DATA_PATH+"/meta").map(product_price_brand)\
                      .filter(lambda x:re.match('\$\d+\.\d{2}', x[1][0]))
#The result of ((product_ID,(price,brand)) is joined with the average rating
# The result is grouped by the brand, and sorted ascendingly according to the Price
brandPriceRating = productPriceBrand.join(productAvgRating).map(mapping_line).groupByKey()\
                                    .mapValues(lambda x:sorted(list(x),key=lambda y:y[1]))
brandPriceRating.saveAsTextFile(RESULTS_PATH+"/BrandPriceRatingwithPrice") # Save output as text file