In [1]:
# !pip install pyspark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, schema_of_json, col, from_unixtime, date_format
import json

# Step 1: Initialize Spark session

In [2]:
def create_spark_session():
    # Stop the existing Spark session if it exists
    try:
        if 'spark' in locals():
            spark.stop()  # Attempt to stop the existing session
            print("Stopped old spark session")
    except Exception as e:
        print(f"Error stopping Spark session: {e}")

    # Create a new Spark session with increased driver memory and updated temporary directory
    new_spark = SparkSession.builder \
        .appName("JsonLoader") \
        .config("spark.driver.memory", "6g") \
        .config("spark.local.dir", "C:\\Users\\Niresh\\OneDrive\\Documents\\Career\\MS\\NEU\\4th Semester\\MLOps\\Project\\EDA") \
        .getOrCreate()

    print("New Spark session started.")
    return new_spark

# Create or recreate the Spark session
spark = create_spark_session()

New Spark session started.


# Step 2: Load JSONL files into DataFrames

In [3]:
def load_jsonl_with_spark(spark, jsonl_file):
    """
    Load a large JSONL file into a Spark DataFrame.
    :param spark: Spark session.
    :param jsonl_file: Path to the JSONL file.
    :return: Spark DataFrame.
    """
    # Read the file as text
    raw_df = spark.read.text(jsonl_file)
    
    # Parse a sample of the JSON to infer the schema
    sample = raw_df.limit(1).collect()[0][0]
    json_schema = schema_of_json(json.dumps(json.loads(sample)))
    
    # Parse the JSON strings into a proper DataFrame
    parsed_df = raw_df.select(from_json(col("value"), json_schema).alias("parsed_data"))
    
    # Flatten the structure
    flattened_df = parsed_df.select("parsed_data.*")
    
    return flattened_df

# Load the data
reviews_spark_df = load_jsonl_with_spark(spark, "Home_and_Kitchen.jsonl")
metadata_spark_df = load_jsonl_with_spark(spark, "meta_Home_and_Kitchen.jsonl")

# Step 3: Show schema and first few records of metadata and review data

In [4]:
print("Schema:")
metadata_spark_df.printSchema()
metadata_spark_df.show(5)

Schema:
root
 |-- average_rating: double (nullable = true)
 |-- bought_together: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- details: struct (nullable = true)
 |    |-- Best Sellers Rank: struct (nullable = true)
 |    |    |-- Kitchen & Dining: long (nullable = true)
 |    |    |-- Mug Sets: long (nullable = true)
 |    |-- Brand: string (nullable = true)
 |    |-- Capacity: string (nullable = true)
 |    |-- Color: string (nullable = true)
 |    |-- Date First Available: string (nullable = true)
 |    |-- Is Discontinued By Manufacturer: string (nullable = true)
 |    |-- Item Weight: string (nullable = true)
 |    |-- Material: string (nullable = true)
 |    |-- Number of Items: string (nullable = true)
 |    |-- Pattern: string (nullable = true)
 |    |-- Product Care Instructions: string (nullable = true)
 |    |-- Pro

In [5]:
print("Schema:")
reviews_spark_df.printSchema()
reviews_spark_df.show(5)

Schema:
root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)

+----------+------------+--------------------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+
|      asin|helpful_vote|              images|parent_asin|rating|                text|    timestamp|               title|             user_id|verified_purchase|
+----------+------------+--------------------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+
|B007WQ9YNO|           1|                  []| B09XWYG6X1|

# Step 4: Process the reviews DataFrame

In [6]:
# Convert the 'timestamp' from milliseconds to seconds and create 'review_date_timestamp'
reviews_spark_df = reviews_spark_df.withColumn(
    "review_date_timestamp", 
    date_format(from_unixtime(col("timestamp") / 1000), "yyyy-MM-dd HH:mm:ss")
)

# Filter for the desired date range and drop the 'images' column
filtered_reviews_df = reviews_spark_df.filter(
    (col("review_date_timestamp").between("2018-01-01 00:00:00", "2020-12-31 23:59:59"))
).drop("images")

In [7]:
filtered_reviews_df.show(5, truncate=False)

+----------+------------+-----------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Step 5: Perform left join between filtered_reviews_df and metadata_spark_df

In [8]:
# Rename the 'title' column to 'product_name' in the metadata DataFrame before the join
metadata_spark_df_renamed = metadata_spark_df.select(
    "parent_asin", 
    "main_category", 
    col("title").alias("product_name"),  # Rename 'title' to 'product_name'
    "categories", 
    "price", 
    "average_rating", 
    "rating_number"
)

# Perform the join
joined_df = filtered_reviews_df.join(
    metadata_spark_df_renamed,
    on="parent_asin",
    how="left"
)

In [9]:
# Show the result of the join
joined_df.show(10, truncate=False)

+-----------+----------+------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------------------------------------------+----------------------------+-----------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------+-----+--------------+-------------+
|parent_asin|asin      |helpful_vote|rating|text                               

In [None]:
# Checking if data has bloated

# Get the row count of filtered_reviews_df
row_count = filtered_reviews_df.count()
print(f"Row count of filtered_reviews_df: {row_count}")

# 24,607,644 (24M)

In [None]:
# Get the row count of filtered_reviews_df
row_count = joined_df.count()
print(f"Row count of joined_df: {row_count}")

# There is no issue of data bloating

In [10]:
# Get distinct main_categories and their row counts from joined_df
category_counts_df = joined_df.groupBy("main_category").count()

# Show the result
category_counts_df.show(50, truncate=False)

# 85% from Amazon Home
# ~5% from Tools & Home Improvement
# Everything else is 10%


+----------------------------+--------+
|main_category               |count   |
+----------------------------+--------+
|Audible Audiobooks          |14      |
|Entertainment               |124     |
|Premium Beauty              |453     |
|Computers                   |9013    |
|All Electronics             |102392  |
|Home Audio & Theater        |22929   |
|Pet Supplies                |15502   |
|Toys & Games                |274838  |
|NULL                        |579693  |
|Baby                        |57392   |
|Sports & Outdoors           |178889  |
|Grocery                     |26856   |
|Video Games                 |948     |
|Automotive                  |27609   |
|Handmade                    |29638   |
|Books                       |61      |
|Amazon Home                 |21065410|
|Industrial & Scientific     |320027  |
|Health & Personal Care      |203302  |
|Cell Phones & Accessories   |13612   |
|Arts, Crafts & Sewing       |63756   |
|Software                    |155     |


In [11]:
# Checking if some random main_category product logically belongs to Home & Kitchen

# Filter for main_category = 'AMAZON FASHION'
amazon_fashion_df = joined_df.filter(col("main_category") == "AMAZON FASHION")

# Select distinct product names
distinct_product_names = amazon_fashion_df.select("product_name").distinct()

# Show a few distinct product names
distinct_product_names.show(10, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_name                                                                                                                                                                                    |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Reusable Shopping Cart Bags and Grocery Organizer Designed for Trolley Carts by Modern Day Living                                                                                               |
|Karelian Heritage Elite Noble Shungite Crystal | Elite Shungite Stone for Water | Elite Shungite Grounding Crystal | Shungite Stone Crystal for Reiki, Mediation, Chakra Balancing              |
|Dirty Laundry Women's Ta

In [12]:
# Filter for main_category = 'Books'
books_df = joined_df.filter(col("main_category") == "Books")

# Select distinct product names
distinct_product_names = books_df.select("product_name").distinct()

# Show a few distinct product names
distinct_product_names.show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_name                                                                                                                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Instruction Manual for Oster Bread Machine Manual (Model: 4812) Reprint                                                                                                                            |
|Instruction Manual for Panasonic Bread Machine Manual (Model: SD-RD250) Reprint                                                                                                                    |
|Instructi

In [13]:
# Seems like all products are relevant to home and kitchen to some degree. So let's retain this proportion/ditribution and discard categories (hierarchy) as it's too granular.

# Sampling 1% of the data

In [9]:
from pyspark.sql.functions import col, month

In [10]:
# Step 1: Extract month from review_date_timestamp
joined_df = joined_df.withColumn("review_month", month(col("review_date_timestamp")))

In [11]:
# Step 2: Calculate the total count per group (month, rating)
grouped_df = joined_df.groupBy("review_month", "rating").count()

In [12]:
# Step 3: Join the grouped counts back to the original DataFrame
joined_with_count_df = joined_df.join(grouped_df, on=["review_month", "rating"], how="inner")

In [13]:
# Step 4: Calculate the sampling fraction (1% of data)
sampling_fraction = 0.01

In [14]:
# Step 5: Perform stratified sampling by month and rating
sampled_df = joined_with_count_df.sampleBy(
    col="review_month",  # Column for stratification (month)
    fractions={row['review_month']: sampling_fraction for row in grouped_df.collect()},  # Dict for sampling fractions
    seed=42  # Seed for reproducibility
)

# We can change seeds in case we want multiple samples - for reliability - we could train multiple models and average the model parameters/weights

In [17]:
# Step 6: Show some of the sampled data to verify
sampled_df.cache()  # Persist the DataFrame in memory
sampled_df.show(10, truncate=False)

# Might run into memory issues or timeout error - For out of memory we can trweak:

# spark = SparkSession.builder \
#     .appName("Sample Data") \
#     .config("spark.driver.memory", "6g") \
#     .config("spark.executor.memory", "4g") \
#     .config("spark.executor.cores", "2") \
#     .getOrCreate()

# To increase timeout settings:
# spark.conf.set("spark.network.timeout", "10000s")
# spark.conf.set("spark.executor.heartbeatInterval", "60s")


+------------+------+-----------+----------+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------------------------------------------+----------------------------+-----------------+---------------------+-------------+----------------------------

In [16]:
# Getting count of sampled df
row_count = sampled_df.count()
print(f"Row count of sampled_df: {row_count}")

# We've 246k rows

Row count of sampled_df: 246014


In [21]:
sampled_df.dtypes

# Can't save as csv if it's of type array<string>

[('review_month', 'int'),
 ('rating', 'double'),
 ('parent_asin', 'string'),
 ('asin', 'string'),
 ('helpful_vote', 'bigint'),
 ('text', 'string'),
 ('timestamp', 'bigint'),
 ('title', 'string'),
 ('user_id', 'string'),
 ('verified_purchase', 'boolean'),
 ('review_date_timestamp', 'string'),
 ('main_category', 'string'),
 ('product_name', 'string'),
 ('categories', 'array<string>'),
 ('price', 'double'),
 ('average_rating', 'double'),
 ('rating_number', 'bigint'),
 ('count', 'bigint')]

In [27]:
# Set legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

from pyspark.sql.functions import year, to_timestamp, concat_ws

# Step 1: Convert 'review_date_timestamp' to a timestamp type (with correct format)
sampled_df = sampled_df.withColumn("review_date_timestamp", to_timestamp(sampled_df["review_date_timestamp"], 'yyyy-MM-dd HH:mm:ss'))

# Step 2: Extract the year from 'review_date_timestamp'
sampled_df = sampled_df.withColumn("year", year(sampled_df["review_date_timestamp"]))

# Step 3: Convert 'categories' array column into a string
sampled_df = sampled_df.withColumn("categories", concat_ws(",", "categories"))

# Step 4: Drop the 'count' column
sampled_df = sampled_df.drop("count")

In [28]:
sampled_df.show(5)

+------------+------+-----------+----------+------------+--------------------+-------------+--------------------+--------------------+-----------------+---------------------+-------------+--------------------+--------------------+-----+--------------+-------------+----+
|review_month|rating|parent_asin|      asin|helpful_vote|                text|    timestamp|               title|             user_id|verified_purchase|review_date_timestamp|main_category|        product_name|          categories|price|average_rating|rating_number|year|
+------------+------+-----------+----------+------------+--------------------+-------------+--------------------+--------------------+-----------------+---------------------+-------------+--------------------+--------------------+-----+--------------+-------------+----+
|           2|   1.0| B001DTE5Q2|B001DTE5Q2|           0|Very flimsy hooks...|1581880140993|      Waste of money|AFAHC6E2UT4DJT4E2...|             true|  2020-02-16 00:00:00|  Amazon Home

In [29]:
# Step 5: Filter the DataFrame for 2018-2019 and 2020
sampled_2018_2019_df = sampled_df.filter((sampled_df["year"] >= 2018) & (sampled_df["year"] <= 2019))
sampled_2020_df = sampled_df.filter(sampled_df["year"] == 2020)


In [34]:
row_count = sampled_2018_2019_df.count()
print(f"Row count of sampled_2018_2019_df: {row_count}")

row_count = sampled_2020_df.count()
print(f"Row count of sampled_2020_df: {row_count}")

Row count of sampled_2018_2019_df: 140781
Row count of sampled_2020_df: 105233


In [33]:
# Step 6: Write 2018-2019 data to CSV
# Collect Spark DataFrame to Pandas
pandas_df = sampled_2018_2019_df.toPandas()

# Save as CSV locally
pandas_df.to_csv("sampled_data_2018_2019.csv", index=False)

# Tried direct connection but seems like we'll have to install hadoop binaries, set up hadoop_home, etc. for saving the file. 
# Writing it locally kept throwing errors. So converting to pandas and saving


In [35]:
# Step 7: Write 2020 data to CSV
pandas_df = sampled_2020_df.toPandas()

# Save as CSV locally
pandas_df.to_csv("sampled_2020_df.csv", index=False)