## Assignment 2
### Group No: 29

### Group Member Names:
| Name              | ID           | Contribution |
|-------------------|--------------|--------------|
| Mohit Goyal       | 2024DA04070  | 100%         |
| Vishal L          | 2024DA04071  | 100%         |
| Saran Kumar R     | 2024DA04072  | 100%         |


## Initialize Spark Session and Load Data

1. Data loading: Load the dataset into a Spark with schema inference as appropriate. Print the schema and the number of records loaded

In [None]:
import io
import json
import requests

from pyspark.sql import SparkSession, functions as F, types as T, Window

spark = (
    SparkSession.builder
    .appName('AmazonProductReviewsAnalysis')
    .getOrCreate()
)

dataset_url = 'https://raw.githubusercontent.com/MohitGoyal19/Amazon-Product-Review-Analytics/refs/heads/main/data/AmazonProductReviews.csv'
data = io.BytesIO(requests.get(dataset_url).content)
text = data.getvalue().decode('utf-8').replace('""', '\\"')
rdd = spark.sparkContext.parallelize(text.splitlines())

df = (
    spark.read
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('sep', ',')
    .option('quote', '"')
    .option('escape', '\\')
    .csv(rdd)
)

print('DataFrame Schema: ', end='')
df.printSchema()

df.show(20)
print('Total number of records:', df.count())

DataFrame Schema: root
 |-- id: string (nullable = true)
 |-- asins: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- colors: string (nullable = true)
 |-- dateAdded: timestamp (nullable = true)
 |-- dateUpdated: timestamp (nullable = true)
 |-- dimension: string (nullable = true)
 |-- ean: double (nullable = true)
 |-- keys: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- manufacturerNumber: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prices: string (nullable = true)
 |-- reviews.date: timestamp (nullable = true)
 |-- reviews.doRecommend: boolean (nullable = true)
 |-- reviews.numHelpful: integer (nullable = true)
 |-- reviews.rating: integer (nullable = true)
 |-- reviews.sourceURLs: string (nullable = true)
 |-- reviews.text: string (nullable = true)
 |-- reviews.title: string (nullable = true)
 |-- reviews.userCity: string (nullable = true)
 |-- reviews.userProvince: string (

## Data Cleansing

2. Data cleansing: Make necessary changes in Schema, as required for the given analytical queries. Create a new column with name “primary_category” and populate it with the first category present in the ‘categories’ column. Drop rows where rating is missing or outside the valid range (1–5). List the modified schema and find the number of records after dropping the rows

In [None]:
df_clean = (
    df
    .withColumnRenamed('reviews.rating', 'rating')
    .withColumnRenamed('reviews.date', 'review_date')
    .withColumnRenamed('reviews.text', 'review_text')
    .withColumnRenamed('reviews.title', 'review_title')
    .withColumnRenamed('reviews.username', 'reviewer_username')
    .withColumn('categories', F.split(F.col('categories'), ',')) # parse as list of strings
    .withColumn('primary_category', F.when(F.size('categories') > 0, F.trim(F.col('categories').getItem(0))))
    .select('name', 'categories', 'primary_category','rating', 'review_date', 'review_text', 'review_title', 'reviewer_username')
    .filter((F.col('rating')>=1) & (F.col('rating')<=5) & F.col('rating').isNotNull())
)

print('DataFrame Schema: ', end='')
df_clean.printSchema()
print('Number of records after cleaning:', df_clean.count())


DataFrame Schema: root
 |-- name: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- primary_category: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- reviewer_username: string (nullable = true)

Number of records after cleaning: 1177


3. Top Products by Average Rating with Minimum Reviews: Find product names with at least 20 reviews and rank them by average rating

In [None]:
(
    df_clean
    .groupBy(F.col('name'))
    .agg(F.count(F.col('rating')).alias('num_reviews'), F.avg(F.col('rating')).alias('avg_rating'))
    .filter(F.col('num_reviews')>=20)
    .orderBy(F.col('avg_rating').desc())
).show()

+--------------------+-----------+------------------+
|                name|num_reviews|        avg_rating|
+--------------------+-----------+------------------+
|    Fire HD 6 Tablet|         38|               5.0|
|   Kindle Paperwhite|         22| 4.590909090909091|
|Amazon Tap - Alex...|        542| 4.533210332103321|
|  Kindle Fire HDX 7"|         23| 4.391304347826087|
|      Amazon Fire TV|         44| 4.204545454545454|
|Amazon Premium He...|         77| 4.012987012987013|
|All-New Amazon Fi...|         27|3.7777777777777777|
+--------------------+-----------+------------------+



4. Most Active Reviewers (by Review Count): List top 10 most active reviewers.

In [None]:
(
    df_clean
    .groupBy(F.col('reviewer_username'))
    .agg(F.count(F.col('rating')).alias('num_reviews'))
    .orderBy(F.col('num_reviews').desc())
).show(10)

+-----------------+-----------+
|reviewer_username|num_reviews|
+-----------------+-----------+
|        A. Younan|         38|
|   William Hardin|         23|
|           Andrew|         23|
|  Amazon Customer|         17|
|        Victor L.|         15|
|    Earthling1984|         13|
|               NF|         13|
|  Amazon Reviewer|         12|
|          Mike W.|         12|
|         D. Miyao|         10|
+-----------------+-----------+
only showing top 10 rows


5. Show how ratings evolve over time per category: Show the Monthly Trend of Average Ratings per primary_category of product. Show 40 rows.

In [None]:
(
    df_clean.filter(F.col('review_date').isNotNull())
    .groupBy(F.col('primary_category'), F.month(F.col('review_date')).alias('review_month'), F.year(F.col('review_date')).alias('review_year'))
    .agg(F.avg(F.col('rating')).alias('avg_rating'))
    .withColumn('review_period', F.concat(F.lit('review_'), F.col('review_year'), F.lit('_'), F.lpad(F.col('review_month'), 2, '0')))
    .withColumn('rn', F.dense_rank().over(Window.orderBy(F.col('review_period').desc())))
    .filter(F.col('rn')<41)
    .orderBy(F.col('review_period').asc())
    .groupBy(F.col('primary_category'))
    .pivot('review_period')
    .agg(F.first('avg_rating'))
).show()

+--------------------+--------------+--------------+--------------+-----------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+------------------+------------------+--------------+--------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+--------------+
|    primary_category|review_2012_09|review_2012_10|review_2013_10|   review_2013_11|review_2013_12|review_2014_04|review_2014_05|review_2014_06|review_2014_07|review_2014_08|review_2014_09|    review_2014_10|review_2014_11|review_2014_12|review_2015_03|review_2015_04|review_2015_05|review_2015_06|review_2015_07|review_

6. Find products loved by some and hated by few: List top 10 Product names by the Ratio of 5-Star to 1-Star Reviews.

In [None]:
(
    df_clean
    .groupBy(F.col('name'))
    .agg(F.count(F.when(F.col('rating')==5, F.lit(1))).alias('five_star_reviews'), F.count(F.when(F.col('rating')==1, F.lit(1))).alias('one_star_reviews'))
    .orderBy(F.when(F.col('one_star_reviews')!=0, F.col('five_star_reviews')/F.col('one_star_reviews')).otherwise(F.lit(float('inf'))).desc(), F.col('five_star_reviews').desc())
).show(10)

+--------------------+-----------------+----------------+
|                name|five_star_reviews|one_star_reviews|
+--------------------+-----------------+----------------+
|Amazon Premium He...|               39|               0|
|    Fire HD 6 Tablet|               38|               0|
|   Kindle Paperwhite|               15|               0|
|All-New Amazon Ki...|               12|               0|
| Amazon Echo - Black|                9|               0|
|All-New Amazon Ki...|                9|               0|
|  Kindle Fire HDX 7"|                9|               0|
|Amazon Echo Dot C...|                8|               0|
|Amazon Echo Dot C...|                8|               0|
|Amazon Echo Dot C...|                8|               0|
+--------------------+-----------------+----------------+
only showing top 10 rows


7. Longest Review Texts per Category: List the longest review in each primary category along with review title and length of review text. List it in the order of review length. For this question, result can be displayed with “truncate=True” as the reviews can be very long.

In [None]:
(
    df_clean
    .withColumn('review_length', F.length(F.col('review_text')))
    .withColumn('rn', F.row_number().over(Window.partitionBy(F.col('primary_category')).orderBy(F.col('review_length').desc())))
    .filter(F.col('rn')==1)
    .select('primary_category', 'review_title', 'review_text', 'review_length')
    .orderBy(F.col('review_length').desc())
).show(truncate=True)

+--------------------+--------------------+--------------------+-------------+
|    primary_category|        review_title|         review_text|review_length|
+--------------------+--------------------+--------------------+-------------+
|          Categories|This box is a GAM...|I am not a casual...|        19739|
|      Amazon Devices|Excellent 3rd-gen...|This is the middl...|        18667|
|Amazon Devices & ...|Great range, very...|As other reviewer...|         1925|
|         Electronics|Fantastic tablet ...|Let me start by s...|         1778|
|        Kindle Store|Worth the money. ...|The Kindle is my ...|         1672|
|Cell Phones & Acc...|Moshi's screen pr...|I like Moshi's an...|         1379|
+--------------------+--------------------+--------------------+-------------+



8. Growth in review volume: Show the Year-over-Year Growth in Review Counts.

In [None]:
(
    df_clean.filter(F.col('review_date').isNotNull())
    .groupBy(F.year(F.col('review_date')).alias('review_year'))
    .agg(F.count(F.col('rating')).alias('num_reviews'))
    .withColumn('review_period', F.concat(F.lit('year_'), F.col('review_year')))
    .orderBy(F.col('review_period').asc())
    .groupBy()
    .pivot('review_period')
    .agg(F.first('num_reviews'))
).show()

+---------+---------+---------+---------+---------+---------+
|year_2012|year_2013|year_2014|year_2015|year_2016|year_2017|
+---------+---------+---------+---------+---------+---------+
|        5|       24|      101|       18|      328|      484|
+---------+---------+---------+---------+---------+---------+



9. Average Rating by Review Length buckets: Analyzes whether longer reviews tend to be more positive or negative. Use 3 buckets as given below:
- Short: length of review text is less than 50.
- Medium: length of review text is between 50 and 200 (both inclusive)
- Long: length of review text is above 200

In [None]:
(
    df_clean
    .withColumn(
        'review_length_category', F.when(
            F.length(F.col('review_text'))<50, F.lit('Short')
        ).when(
            F.length(F.col('review_text'))>200, F.lit('Long')
        ).otherwise(F.lit('Medium'))
    ).groupBy(F.col('review_length_category'))
    .agg(F.avg(F.col('rating')).alias('avg_rating'))
    .orderBy(F.col('review_length_category').asc())
).show()

+----------------------+-----------------+
|review_length_category|       avg_rating|
+----------------------+-----------------+
|                  Long|4.270503597122302|
|                Medium|4.480984340044743|
|                 Short|4.571428571428571|
+----------------------+-----------------+



The long reviews are relatively negative compared to medium and short reviews and short reviews are comparatively more positive. Although, the diffence is very small in the average ratings.

10. Products with Declining Ratings Over Time: Identify the top 10 product names whose ratings have dropped maximum. Use monthly average rating (first and last averages) of a product to identify its rating drop.

In [None]:
(
    df_clean.filter(F.col('review_date').isNotNull())
    .groupBy(F.col('name'), F.month(F.col('review_date')).alias('review_month'), F.year(F.col('review_date')).alias('review_year'))
    .agg(F.avg(F.col('rating')).alias('avg_rating'))
    .withColumn('review_period', F.concat(F.lit('review_'), F.col('review_year'), F.lit('_'), F.lpad(F.col('review_month'), 2, '0')))
    .withColumn('rn_first_average', F.row_number().over(Window.partitionBy(F.col('name')).orderBy(F.col('review_period').asc())))
    .withColumn('rn_last_average', F.row_number().over(Window.partitionBy(F.col('name')).orderBy(F.col('review_period').desc())))
    .filter((F.col('rn_first_average')==1) | (F.col('rn_last_average')==1))
    .withColumn('review_period', F.when(F.col('rn_first_average')==1, F.lit('first_average')).otherwise(F.lit('last_average')))
    .groupBy(F.col('name'))
    .pivot('review_period')
    .agg(F.first(F.col('avg_rating')))
    .orderBy(F.when(F.col('first_average').isNotNull() & F.col('last_average').isNotNull(), F.col('first_average')-F.col('last_average')).desc())
).show(10)

+--------------------+------------------+-----------------+
|                name|     first_average|     last_average|
+--------------------+------------------+-----------------+
|Amazon 5W USB Off...|              4.75|              1.0|
|Kindle Fire HDX 8.9"|               4.0|              1.0|
|Alexa Voice Remot...|               3.0|              1.0|
|Kindle for Kids B...|               5.0|              3.0|
|Replacement Remot...|               3.0|              1.0|
|All-New Amazon Fi...|3.8333333333333335|              2.5|
|Amazon Kindle Oas...|               5.0|              4.0|
|Certified Refurbi...|               4.0|              3.0|
|Moshi Anti-Glare ...|               2.5|              2.0|
|Amazon Tap - Alex...|               5.0|4.545454545454546|
+--------------------+------------------+-----------------+
only showing top 10 rows


11. Products with Declining Ratings Over Time - Analysis: From on the results of the above question (top products whose ratings dropped maximum), select one product and do a data-driven analysis of the decline and provide your finding and recommendations (max 150 words).

In [None]:
(
    df_clean.filter(F.col('review_date').isNotNull() & (F.col('name')=='All-New Amazon Fire HD 8 Tablet Case (7th Generation'))
    .orderBy(F.col('review_date').desc())
).show()

+--------------------+--------------------+----------------+------+-------------------+--------------------+--------------------+-----------------+
|                name|          categories|primary_category|rating|        review_date|         review_text|        review_title|reviewer_username|
+--------------------+--------------------+----------------+------+-------------------+--------------------+--------------------+-----------------+
|All-New Amazon Fi...|[Amazon Devices, ...|  Amazon Devices|     2|2017-07-25 00:00:00|I have purchased ...|Cover material is...|           TerriG|
|All-New Amazon Fi...|[Amazon Devices, ...|  Amazon Devices|     3|2017-07-25 00:00:00|I like the case f...|          Yes and no|          Dimples|
|All-New Amazon Fi...|[Amazon Devices, ...|  Amazon Devices|     2|2017-07-13 00:00:00|1. While a great ...|Worth it to buy t...|          Darlene|
|All-New Amazon Fi...|[Amazon Devices, ...|  Amazon Devices|     3|2017-07-08 00:00:00|The case does wha...|Pric

The product has multiple 4 and 5 ratings in the June 2017 which got reduced to 2 and 3 ratings in July 2017, where the customers complained about lint on the cover and the cover blocking microSD card slot. The reviews suggest, it is a great product at the first glance but blocks other tasks.

The product needs to improve its overall quality and be more effective for long-term use.

#### Final Task

12. After implementing all the above analytical queries, identify a query where there is a performance bottleneck in your Spark job and propose and implement optimizations. Compare the execution times before and after optimization, and justify your approach

Answer: The most resource inefficient query was task 10 where we initially grouped the data by month first in and had two dataframes one with first average and another with last average, and then joined together on name. The join posed an performance overhead which we overcame by using only a single dataframe and using two partition functions to find both first and last averages. This optimization improved the performance by ~1.5 seconds