In [1]:
#importing necessary libraries
import os
import time
import urllib.request

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window


In [2]:
#creating spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("AmazonReviewsSparkAnalytics")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.warehouse.dir", "file:/C:/temp/spark-warehouse")
    .config("spark.ui.enabled", "false")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")
print("Spark version:", spark.version)






Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.NoSuchMethodError: 'void org.apache.spark.internal.LogKey.$init$(org.apache.spark.internal.LogKey)'
	at org.apache.spark.internal.LogKeys$SPARK_VERSION$.<clinit>(LogKey.scala:763)
	at org.apache.spark.SparkContext.$anonfun$new$1(SparkContext.scala:203)
	at org.apache.spark.internal.LogEntry.cachedMessageWithContext$lzycompute(Logging.scala:102)
	at org.apache.spark.internal.LogEntry.cachedMessageWithContext(Logging.scala:102)
	at org.apache.spark.internal.LogEntry.context(Logging.scala:106)
	at org.apache.spark.internal.Logging.logInfo(Logging.scala:189)
	at org.apache.spark.internal.Logging.logInfo$(Logging.scala:187)
	at org.apache.spark.SparkContext.logInfo(SparkContext.scala:86)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:203)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [None]:
#Donloading the dataset from github repo
DATASET_URL = "https://raw.githubusercontent.com/bharathkrishna711-lab/Amazon-Review-spark-analytics/main/AmazonProductReviews.csv"


LOCAL_DIR = "data"
LOCAL_PATH = os.path.join(LOCAL_DIR, "AmazonProductReviews.csv")

os.makedirs(LOCAL_DIR, exist_ok=True)

# Download only if not already downloaded
if not os.path.exists(LOCAL_PATH):
    print("Downloading dataset from GitHub...")
    urllib.request.urlretrieve(DATASET_URL, LOCAL_PATH)
    print("Saved to:", LOCAL_PATH)
else:
    print("Dataset already exists at:", LOCAL_PATH)




Downloading dataset from GitHub...
Saved to: data\AmazonProductReviews.csv


#Q1

In [None]:
#Loading the dataset
df_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("multiLine", "true")
    .option("escape", "\"")
    .csv(LOCAL_PATH)
)

#printing no:of records loaded
print("Total records loaded:", df_raw.count())

print("=== Raw Schema ===")
df_raw.printSchema()



print("Columns:", df_raw.columns)


Total records loaded: 1597
=== Raw Schema ===
root
 |-- id: string (nullable = true)
 |-- asins: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- colors: string (nullable = true)
 |-- dateAdded: timestamp (nullable = true)
 |-- dateUpdated: timestamp (nullable = true)
 |-- dimension: string (nullable = true)
 |-- ean: double (nullable = true)
 |-- keys: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- manufacturerNumber: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prices: string (nullable = true)
 |-- reviews.date: timestamp (nullable = true)
 |-- reviews.doRecommend: boolean (nullable = true)
 |-- reviews.numHelpful: integer (nullable = true)
 |-- reviews.rating: integer (nullable = true)
 |-- reviews.sourceURLs: string (nullable = true)
 |-- reviews.text: string (nullable = true)
 |-- reviews.title: string (nullable = true)
 |-- reviews.userCity: string (nullable = true)
 |-- re

#Q2

In [None]:
# Select required columns 
df = df_raw.select(
    "categories",
    "name",
    F.col("`reviews.date`").alias("reviews_date"),
    F.col("`reviews.rating`").alias("reviews_rating"),
    F.col("`reviews.text`").alias("reviews_text"),
    F.col("`reviews.title`").alias("reviews_title"),
    F.col("`reviews.username`").alias("reviews_username")
)

# primary_category = first category in comma-separated categories
df = df.withColumn("primary_category", F.trim(F.split(F.col("categories"), ",").getItem(0)))

# cast rating to numeric
df = df.withColumn("rating", F.col("reviews_rating").cast("double"))

# parse date 
df = df.withColumn("review_date", F.to_date(F.col("reviews_date")))

# drop invalid ratings 
df_clean = df.filter(
    (F.col("rating").isNotNull()) &
    (F.col("rating") >= 1.0) &
    (F.col("rating") <= 5.0)
)

print("=== Clean Schema ===")
df_clean.printSchema()
print("Records after cleaning:", df_clean.count())



=== Clean Schema ===
root
 |-- categories: string (nullable = true)
 |-- name: string (nullable = true)
 |-- reviews_date: timestamp (nullable = true)
 |-- reviews_rating: integer (nullable = true)
 |-- reviews_text: string (nullable = true)
 |-- reviews_title: string (nullable = true)
 |-- reviews_username: string (nullable = true)
 |-- primary_category: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- review_date: date (nullable = true)

Records after cleaning: 1177


In [None]:
df.show()

+--------------------+-----------------+-------------------+--------------+--------------------+--------------------+--------------------+----------------+------+-----------+
|          categories|             name|       reviews_date|reviews_rating|        reviews_text|       reviews_title|    reviews_username|primary_category|rating|review_date|
+--------------------+-----------------+-------------------+--------------+--------------------+--------------------+--------------------+----------------+------+-----------+
|Amazon Devices,ma...|Kindle Paperwhite|2015-08-08 05:30:00|             5|I initially had t...|Paperwhite voyage...|          Cristina M|  Amazon Devices|   5.0| 2015-08-08|
|Amazon Devices,ma...|Kindle Paperwhite|2015-09-01 05:30:00|             5|Allow me to prefa...|One Simply Could ...|               Ricky|  Amazon Devices|   5.0| 2015-09-01|
|Amazon Devices,ma...|Kindle Paperwhite|2015-07-20 05:30:00|             4|I am enjoying it ...|Great for those t...|       T

In [None]:
df_clean.createOrReplaceTempView("reviews")


#Q3

In [None]:
#product names with at least 20 reviews and rank them by average rating.
q3 = spark.sql("""
SELECT
  name,
  COUNT(*) AS review_count,
  ROUND(AVG(rating), 3) AS avg_rating
FROM reviews
GROUP BY name
HAVING COUNT(*) >= 20
ORDER BY avg_rating DESC, review_count DESC, name ASC
""")

q3.show(20, truncate=False)


+-----------------------------------------------------+------------+----------+
|name                                                 |review_count|avg_rating|
+-----------------------------------------------------+------------+----------+
|Fire HD 6 Tablet                                     |38          |5.0       |
|Kindle Paperwhite                                    |22          |4.591     |
|Amazon Tap - Alexa-Enabled Portable Bluetooth Speaker|542         |4.533     |
|Kindle Fire HDX 7"                                   |23          |4.391     |
|Amazon Fire TV                                       |44          |4.205     |
|Amazon Premium Headphones                            |77          |4.013     |
|All-New Amazon Fire 7 Tablet Case (7th Generation    |27          |3.778     |
+-----------------------------------------------------+------------+----------+



#Q4

In [None]:
# top 10 most active reviewers. 
q4 = spark.sql("""
SELECT
  reviews_username AS username,
  COUNT(*) AS review_count
FROM reviews
GROUP BY reviews_username
ORDER BY review_count DESC, username ASC
LIMIT 10
""")

q4.show(truncate=False)


+---------------+------------+
|username       |review_count|
+---------------+------------+
|A. Younan      |38          |
|Andrew         |23          |
|William Hardin |23          |
|Amazon Customer|17          |
|Victor L.      |15          |
|Earthling1984  |13          |
|NF             |13          |
|Amazon Reviewer|12          |
|Mike W.        |12          |
|D. Miyao       |10          |
+---------------+------------+



#Q5

In [None]:
#Monthly Trend of Average Ratings per primary_category of product. Showing 40 rows.
q5 = (
    df_clean
    .filter(F.col("review_date").isNotNull())
    .withColumn("year_month", F.date_format("review_date", "yyyy-MM"))
    .groupBy("primary_category", "year_month")
    .agg(
        F.round(F.avg("rating"), 3).alias("avg_rating"),
        F.count("*").alias("review_count")
    )
    .orderBy("primary_category", "year_month")
)

q5.show(40, truncate=False)


+----------------------------+----------+----------+------------+
|primary_category            |year_month|avg_rating|review_count|
+----------------------------+----------+----------+------------+
|Amazon Devices              |2012-09   |4.5       |4           |
|Amazon Devices              |2012-10   |4.0       |1           |
|Amazon Devices              |2013-10   |4.313     |16          |
|Amazon Devices              |2013-11   |4.167     |6           |
|Amazon Devices              |2013-12   |5.0       |2           |
|Amazon Devices              |2014-07   |5.0       |39          |
|Amazon Devices              |2014-09   |4.0       |1           |
|Amazon Devices              |2014-10   |3.667     |6           |
|Amazon Devices              |2014-11   |5.0       |1           |
|Amazon Devices              |2015-03   |1.0       |1           |
|Amazon Devices              |2015-04   |5.0       |1           |
|Amazon Devices              |2015-06   |5.0       |1           |
|Amazon De

#Q6

In [None]:
#top 10 Product names by the Ratio of 5-Star to 1-Star Reviews. 
q6 = (
    df_clean
    .groupBy("name")
    .agg(
        F.sum(F.when(F.col("rating") == 5, 1).otherwise(0)).alias("five_star"),
        F.sum(F.when(F.col("rating") == 1, 1).otherwise(0)).alias("one_star"),
        F.count("*").alias("total_reviews")
    )
    .filter(F.col("one_star") > 0)
    .withColumn("ratio_5_to_1", F.round(F.col("five_star") / F.col("one_star"), 3))
    .orderBy(F.col("ratio_5_to_1").desc(), F.col("five_star").desc(), F.col("total_reviews").desc())
    .select("name", "five_star", "one_star", "ratio_5_to_1", "total_reviews")
)

q6.show(10, truncate=False)


+-----------------------------------------------------------------------------------------+---------+--------+------------+-------------+
|name                                                                                     |five_star|one_star|ratio_5_to_1|total_reviews|
+-----------------------------------------------------------------------------------------+---------+--------+------------+-------------+
|Amazon Tap - Alexa-Enabled Portable Bluetooth Speaker                                    |358      |7       |51.143      |542          |
|Echo Dot (2nd Generation) - Black                                                        |12       |1       |12.0        |13           |
|Amazon Fire TV                                                                           |26       |4       |6.5         |44           |
|Amazon Fire TV Game Controller                                                           |6        |1       |6.0         |11           |
|Amazon 5W USB Official OEM Charge

In [None]:
#longest review in each primary category along with review title and length of review text.
#Finding length of each review 
df_len = df_clean.withColumn("review_len", F.length("reviews_text"))

w = Window.partitionBy("primary_category").orderBy(F.col("review_len").desc())

q7 = (
    df_len
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .select(
        "primary_category",
        "name",
        F.col("reviews_title").alias("review_title"),
        "review_len",
        F.col("reviews_text").alias("review_text")
    )
    .orderBy(F.col("review_len").desc())
)

q7.show(truncate=True)


+--------------------+--------------------+--------------------+----------+--------------------+
|    primary_category|                name|        review_title|review_len|         review_text|
+--------------------+--------------------+--------------------+----------+--------------------+
|          Categories|      Amazon Fire TV|This box is a GAM...|     19739|I am not a casual...|
|      Amazon Devices|  Kindle Fire HDX 7"|Excellent 3rd-gen...|     18667|This is the middl...|
|Amazon Devices & ...|Alexa Voice Remot...|Great range, very...|      1925|As other reviewer...|
|         Electronics|All-New Fire HD 8...|Fantastic tablet ...|      1778|Let me start by s...|
|        Kindle Store|     Kindle Keyboard|Worth the money. ...|      1672|The Kindle is my ...|
|Cell Phones & Acc...|Moshi Anti-Glare ...|Moshi's screen pr...|      1379|I like Moshi's an...|
+--------------------+--------------------+--------------------+----------+--------------------+



#Q8

In [None]:
#Year-over-Year Growth in Review Counts. 
df_year = (
    df_clean
    .filter(F.col("review_date").isNotNull())
    .withColumn("year", F.year("review_date"))
    .groupBy("year")
    .agg(F.count("*").alias("review_count"))
    .orderBy("year")
)

w_year = Window.orderBy("year")

q8 = (
    df_year
    .withColumn("prev_year_count", F.lag("review_count").over(w_year))
    .withColumn(
        "yoy_growth_pct",
        F.when(F.col("prev_year_count").isNull(), F.lit(None).cast("double"))
         .otherwise(F.round((F.col("review_count") - F.col("prev_year_count")) / F.col("prev_year_count") * 100, 2))
    )
)

q8.show(truncate=False)


+----+------------+---------------+--------------+
|year|review_count|prev_year_count|yoy_growth_pct|
+----+------------+---------------+--------------+
|2012|5           |NULL           |NULL          |
|2013|24          |5              |380.0         |
|2014|101         |24             |320.83        |
|2015|18          |101            |-82.18        |
|2016|328         |18             |1722.22       |
|2017|484         |328            |47.56         |
+----+------------+---------------+--------------+



#Q9

In [None]:
# Average Rating by Review Length buckets: Analyzes whether longer reviews tend to be more positive or negative. Use 3 buckets as given below: 
 
# Short: length of review text is less than 50. 

# Medium: length of review text is between 50 and 200 (both inclusive) 

# Long: length of review text is above 200.


df_bucket = df_clean.withColumn("review_len", F.length("reviews_text"))

df_bucket = df_bucket.withColumn(
    "len_bucket",
    F.when(F.col("review_len") < 50, "Short")
     .when((F.col("review_len") >= 50) & (F.col("review_len") <= 200), "Medium")
     .otherwise("Long")
)

q9 = (
    df_bucket
    .groupBy("len_bucket")
    .agg(
        F.count("*").alias("review_count"),
        F.round(F.avg("rating"), 3).alias("avg_rating"),
        F.round(F.avg("review_len"), 1).alias("avg_review_len")
    )
    .orderBy(
        F.when(F.col("len_bucket") == "Short", 1)
         .when(F.col("len_bucket") == "Medium", 2)
         .otherwise(3)
    )
)

q9.show(truncate=False)


+----------+------------+----------+--------------+
|len_bucket|review_count|avg_rating|avg_review_len|
+----------+------------+----------+--------------+
|Short     |35          |4.571     |23.6          |
|Medium    |447         |4.481     |109.7         |
|Long      |695         |4.271     |1174.6        |
+----------+------------+----------+--------------+



#Q10

In [None]:
#top 10 product names whose ratings have dropped maximum.

df_monthly = (
    df_clean
    .filter(F.col("review_date").isNotNull())
    .withColumn("year_month", F.date_format("review_date", "yyyy-MM"))
    .groupBy("name", "year_month")
    .agg(
        F.avg("rating").alias("monthly_avg_rating"),
        F.count("*").alias("monthly_review_count")
    )
)

w_asc = Window.partitionBy("name").orderBy("year_month")
w_desc = Window.partitionBy("name").orderBy(F.col("year_month").desc())

df_first_last = (
    df_monthly
    .withColumn("first_month", F.first("year_month").over(w_asc))
    .withColumn("first_avg", F.first("monthly_avg_rating").over(w_asc))
    .withColumn("last_month", F.first("year_month").over(w_desc))
    .withColumn("last_avg", F.first("monthly_avg_rating").over(w_desc))
    .groupBy("name")
    .agg(
        F.first("first_month").alias("first_month"),
        F.first("last_month").alias("last_month"),
        F.first("first_avg").alias("first_avg"),
        F.first("last_avg").alias("last_avg")
    )
    .withColumn("rating_drop", F.round(F.col("first_avg") - F.col("last_avg"), 3))
)

q10 = df_first_last.orderBy(F.col("rating_drop").desc(), F.col("name").asc()).limit(10)
q10.show(truncate=False)


+-----------------------------------------------------------------------------------------+-----------+----------+------------------+-----------------+-----------+
|name                                                                                     |first_month|last_month|first_avg         |last_avg         |rating_drop|
+-----------------------------------------------------------------------------------------+-----------+----------+------------------+-----------------+-----------+
|Amazon 5W USB Official OEM Charger and Power Adapter for Fire Tablets and Kindle eReaders|2017-03    |2017-07   |4.75              |1.0              |3.75       |
|Kindle Fire HDX 8.9"                                                                     |2013-11    |2015-03   |4.0               |1.0              |3.0        |
|Alexa Voice Remote for Amazon Echo and Echo Dot                                          |2016-05    |2016-12   |3.0               |1.0              |2.0        |
|Kindle for Kids

#Q11

In [None]:
#Selecting top product whose ratings dropped maximum
top_product = q10.collect()[0]["name"]
print("Selected product for analysis:", top_product)

# Monthly trend
df_monthly.filter(F.col("name") == top_product).orderBy("year_month").show(60, truncate=False)

# Recent low reviews
(
    df_clean
    .filter((F.col("name") == top_product) & (F.col("rating") <= 2))
    .select("review_date", "rating", "reviews_title", "reviews_text")
    .orderBy(F.col("review_date").desc())
    .show(10, truncate=True)
)


Selected product for analysis: Amazon 5W USB Official OEM Charger and Power Adapter for Fire Tablets and Kindle eReaders
+-----------------------------------------------------------------------------------------+----------+------------------+--------------------+
|name                                                                                     |year_month|monthly_avg_rating|monthly_review_count|
+-----------------------------------------------------------------------------------------+----------+------------------+--------------------+
|Amazon 5W USB Official OEM Charger and Power Adapter for Fire Tablets and Kindle eReaders|2017-03   |4.75              |4                   |
|Amazon 5W USB Official OEM Charger and Power Adapter for Fire Tablets and Kindle eReaders|2017-04   |2.6666666666666665|3                   |
|Amazon 5W USB Official OEM Charger and Power Adapter for Fire Tablets and Kindle eReaders|2017-05   |4.4               |5                   |
|Amazon 5W USB Offici

The product Amazon 5W USB Official OEM Charger and Power Adapter for Fire Tablets and Kindle eReaders shows a clear decline in customer satisfaction over time. Monthly average ratings were consistently high (above 4.4) from March to June 2017, indicating strong initial performance. However, in July 2017 the average rating dropped sharply to 1.0, signaling a sudden quality or performance issue. Analysis of recent low-rating reviews reveals recurring complaints such as the charger not working, short lifespan, repeated failures, confusion about power specifications, and poor value for money. This pattern suggests a possible manufacturing defect, supplier change, or mismatch between advertised and actual product specifications introduced in later batches. It is recommended that Amazon investigate production or sourcing changes around mid-2017, improve quality checks, and clarify product specifications to restore customer trust and prevent further rating decline.

#Q12 

In [None]:
#identify a query where there is a performance bottleneck in your Spark job and propose and implement optimizations. 
# Compare the execution times before and after optimization, and justify your approach. 

#defining function to check the performance
def time_action(label, fn):
    start = time.perf_counter()
    result = fn()
    end = time.perf_counter()
    print(f"{label}: {end - start:.3f} seconds")
    return result


In [None]:
#Before optimization
spark.catalog.clearCache()

df_len_base = df_clean.withColumn("review_len", F.length("reviews_text"))
w_base = Window.partitionBy("primary_category").orderBy(F.col("review_len").desc())

q7_base = (
    df_len_base
    .withColumn("rn", F.row_number().over(w_base))
    .filter(F.col("rn") == 1)
    .select("primary_category", "name", "reviews_title", "review_len")
)

time_action("BEFORE optimization (Qvii) count()", lambda: q7_base.count())


BEFORE optimization (Qvii) count(): 0.295 seconds


6

In [None]:
#After optimization
df_clean.cache()
df_clean.count()  # materialize cache

df_q7 = df_clean.select("primary_category", "name", "reviews_title", "reviews_text")
df_q7 = df_q7.repartition("primary_category")  # helps shuffle for window
df_q7 = df_q7.withColumn("review_len", F.length("reviews_text"))

w_opt = Window.partitionBy("primary_category").orderBy(F.col("review_len").desc())

q7_opt = (
    df_q7
    .withColumn("rn", F.row_number().over(w_opt))
    .filter(F.col("rn") == 1)
    .select("primary_category", "name", "reviews_title", "review_len")
)

time_action("AFTER optimization (Qvii) count()", lambda: q7_opt.count())


AFTER optimization (Qvii) count(): 0.160 seconds


6