In [None]:
# Baixi Guo, Jason Limfueco, Laura Ngo, Pooja Panchal

In [5]:
import pyspark
from pyspark.sql import SparkSession

Let's set up our spark session and rationale before we attend to uploading and partitiioning our data.
It‚Äôs a collection of Amazon customer reviews and metadata for Sports category products sold in the U.S.,
with star ratings, review text, and related fields, and it is part of a larger Amazon US customer reviews dataset spanning many product types.

In [6]:
# Initializing our spark
"""
Formula == 
Executor instances = Total Cores - 1
Executor memory = (Total Memory - Driver Memory) / Executor Instances

Executor instances = 32 ‚àí 1 = 31
Executor memory = (128 ‚àí 2) / 31 ‚âà 4.06GB ‚Üí 4GB per executor

NOTE:
The execution size was the driving factor for this set based on previous experience \
working with social media analysis.
Given the 54GB size of the Amazon dataset, this configuration provides:

-High parallelism (31 concurrent tasks)
-Sufficient executor memory to reduce shuffle spill
-Balanced memory distribution to prevent executor OOM during aggregations

The 4GB executor size was chosen to provide adequate memory headroom for groupBy and \
aggregation operations without creating excessively large JVM heaps, \
which can increase garbage collection overhead.
"""
spark = (
    SparkSession.builder
    .appName('amazon_set')
    .config('spark.driver.memory', '2g')
    .config('spark.executor.instances', '31')
    .config('spark.executor.memory', '4g')
    .config('spark.executor.cores', '1')
    .getOrCreate()
)

In [7]:
# CHECK IF OUR SPARK SHOWS SOME SORT OF ACTIVE EXECUTOR SET UP
import requests
import pandas as pd # RECALL we're going to use spark not pandas

# Get the active Spark Context and URL
sc = spark.sparkContext
url = f"{sc.uiWebUrl}/api/v1/applications/{sc.applicationId}/executors"

# Fetch the executor data from the API
response = requests.get(url)
executors = response.json()

# Format into a readable DataFrame
df = pd.DataFrame(executors)[['id', 'totalCores', 'maxMemory', 'activeTasks', 'isActive']]
df['maxMemory_GB'] = (df['maxMemory'] / (1024**3)).round(2)
df

Unnamed: 0,id,totalCores,maxMemory,activeTasks,isActive,maxMemory_GB
0,driver,32,1099746508,0,True,1.02


In [8]:
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType
)

In [9]:
import kagglehub
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Download dataset (cached by kagglehub)
base_dir = kagglehub.dataset_download('cynthiarempel/amazon-us-customer-reviews-dataset')
print('Downloaded to:', base_dir)

# Schema (recommended)
schema = StructType([
    StructField('marketplace', StringType(), True),
    StructField('customer_id', StringType(), True),
    StructField('review_id', StringType(), True),
    StructField('product_id', StringType(), True),
    StructField('product_parent', StringType(), True),
    StructField('product_title', StringType(), True),
    StructField('product_category', StringType(), True),
    StructField('star_rating', IntegerType(), True),
    StructField('helpful_votes', IntegerType(), True),
    StructField('total_votes', IntegerType(), True),
    StructField('vine', StringType(), True),
    StructField('verified_purchase', StringType(), True),
    StructField('review_headline', StringType(), True),
    StructField('review_body', StringType(), True),
    StructField('review_date', StringType(), True),
])

pattern = f'file:{base_dir}/amazon_reviews_us_*_v*.tsv'

reviews_df = (
    spark.read
        .option('header', 'true')
        .option('sep', '\t')
        .schema(schema)
        .csv(pattern)
        .withColumn('source_file', F.input_file_name())
        .withColumn('category', F.regexp_extract('source_file', r'amazon_reviews_us_([^/]+?)_v', 1))
        .filter(F.col('category') != 'multilingual')
        .drop('source_file')
)

reviews_df.select('category', 'star_rating', 'product_id').show(10, truncate=False)

Downloaded to: /home/jlimfueco/.cache/kagglehub/datasets/cynthiarempel/amazon-us-customer-reviews-dataset/versions/9
+--------+-----------+----------+
|category|star_rating|product_id|
+--------+-----------+----------+
|Apparel |4          |B01KL6O72Y|
|Apparel |5          |B01ID3ZS5W|
|Apparel |5          |B01I497BGY|
|Apparel |5          |B01HDXFZK6|
|Apparel |5          |B01G6MBEBY|
|Apparel |5          |B01FWRXN0Y|
|Apparel |5          |B01EXNH1HE|
|Apparel |4          |B01E7OL09O|
|Apparel |5          |B01DXHX81O|
|Apparel |3          |B01DDULIJK|
+--------+-----------+----------+
only showing top 10 rows



In [10]:
print('Partitions:', reviews_df.rdd.getNumPartitions())

reviews_df.groupBy('category') \
    .count() \
    .orderBy(F.desc('count')) \
    .show(10, truncate=False)

Partitions: 384
+----------------------+-------+
|category              |count  |
+----------------------+-------+
|Wireless              |9002021|
|PC                    |6908554|
|Apparel               |5906333|
|Health_Personal_Care  |5331449|
|Beauty                |5115666|
|Digital_Ebook_Purchase|5101693|
|Video_DVD             |5069140|
|Mobile_Apps           |5033376|
|Toys                  |4864249|
|Sports                |4850360|
+----------------------+-------+
only showing top 10 rows



In [11]:
reviews_df.printSchema()
print('master:', spark.sparkContext.master)
print('partitions:', reviews_df.rdd.getNumPartitions())

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- category: string (nullable = false)

master: local[*]
partitions: 384


In [12]:
# Let's... see how big this data is!
reviews_df.count()

102899354

In [13]:
reviews_df.describe().show()

+-------+-----------+--------------------+--------------+--------------------+-------------------+--------------------+--------------------+-----------------+------------------+-----------------+---------+-----------------+--------------------+--------------------+-----------+---------+
|summary|marketplace|         customer_id|     review_id|          product_id|     product_parent|       product_title|    product_category|      star_rating|     helpful_votes|      total_votes|     vine|verified_purchase|     review_headline|         review_body|review_date| category|
+-------+-----------+--------------------+--------------+--------------------+-------------------+--------------------+--------------------+-----------------+------------------+-----------------+---------+-----------------+--------------------+--------------------+-----------+---------+
|  count|  102899354|           102899354|     102899354|           102899354|          102899354|           102899354|           102897

In [15]:
# Let's play with the data. What are our most popular products?

top_categories = (
    reviews_df
    .groupBy('product_category')
    .agg(F.count('*').alias('review_count'))
    .orderBy(F.desc('review_count'))
    .limit(10)
)

top_categories.show(truncate=False)

+----------------------+------------+
|product_category      |review_count|
+----------------------+------------+
|Wireless              |9001881     |
|PC                    |6908551     |
|Apparel               |5906322     |
|Health & Personal Care|5331215     |
|Beauty                |5115452     |
|Digital_Ebook_Purchase|5101676     |
|Video DVD             |5069136     |
|Mobile_Apps           |5033376     |
|Toys                  |4864243     |
|Sports                |4849563     |
+----------------------+------------+



In [17]:
# How many sports products do we have?
sports_count = (
    reviews_df
    .filter(F.col('product_category') == 'Sports')
    .count()
)

print(f"Total Sports reviews: {sports_count}")

Total Sports reviews: 4849563


In [18]:
# How about if we check for the unique sports products by product_id

distinct_sports_products = (
    reviews_df
    .filter(F.col('product_category') == 'Sports')
    .select('product_id')
    .distinct()
    .count()
)

print(f'Distinct Sports products: {distinct_sports_products}')

Distinct Sports products: 1046129
