# Big Data and Cloud Computing - Final Project

## Initial Data Loading

### Author:
Alen Pavlovic

The University of Chicago

In [1]:
2*3-5

1

In [2]:
import os
import subprocess
import datetime
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import functions as F
from pyspark.sql.types import *

In [3]:
# Display settings
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)

In [4]:
# Enable eager evaluation
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [5]:
# GCS path to the data
gcs_folder = 'gs://msca-bdp-data-open/final_project_reviews'

# Intermediate storage bucket - apavlovic
intermediate_bucket = 'gs://msca-bdp-students-bucket/shared_data/apavlovic/final_project'

In [6]:
# ---------------------------------------------------
# 1. CHECK DATA SIZE IN GCS
# ---------------------------------------------------

def check_folder_size(folder_path):
    """Check the size of a GCS folder"""
    cmd = f'gsutil du -s -h {folder_path}'
    
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
    for line in p.stdout.readlines():
        print(f'Total directory size: {line}')
    
    retval = p.wait()  # Wait for child process to terminate

In [7]:
print("Checking data size in GCS bucket...")
check_folder_size(gcs_folder)

Checking data size in GCS bucket...
Total directory size: 75.75 GiB    gs://msca-bdp-data-open/final_project_reviews



In [8]:
# ---------------------------------------------------
# 2. BASIC DATA LOADING
# ---------------------------------------------------

def load_reviews_sample(sample_fraction=0.001):
    """Load a small sample of the reviews data for exploration"""
    print(f"Loading {sample_fraction*100}% sample of reviews data...")
    
    # Read the entire reviews dataset
    df_reviews = spark.read.parquet(os.path.join(gcs_folder, 'reviews_parquet'))
    
    # Take a sample for exploration
    if sample_fraction < 1.0:
        df_reviews_sample = df_reviews.sample(fraction=sample_fraction, seed=42)
        print(f"Sample size: {df_reviews_sample.count():,} records")
        return df_reviews_sample
    else:
        print(f"Full dataset size: {df_reviews.count():,} records")
        return df_reviews
    
def load_meta_sample(sample_fraction=0.01):
    """Load a small sample of the metadata for exploration"""
    print(f"Loading {sample_fraction*100}% sample of metadata...")
    
    # Read the entire metadata dataset
    df_meta = spark.read.parquet(os.path.join(gcs_folder, 'meta_parquet'))
    
    # Take a sample for exploration
    if sample_fraction < 1.0:
        df_meta_sample = df_meta.sample(fraction=sample_fraction, seed=42)
        print(f"Sample size: {df_meta_sample.count():,} records")
        return df_meta_sample
    else:
        print(f"Full dataset size: {df_meta.count():,} records")
        return df_meta

In [9]:
reviews_sample = load_reviews_sample()
meta_sample = load_meta_sample()

Loading 0.1% sample of reviews data...


                                                                                

Sample size: 64,380 records
Loading 1.0% sample of metadata...




Sample size: 43,305 records


                                                                                

In [10]:
#reviews_sample
#meta_sample

In [11]:
# ---------------------------------------------------
# 3. EXPLORE DATA STRUCTURE
# ---------------------------------------------------

print("\n--- Reviews Data Schema ---")
reviews_sample.printSchema()

print("\n--- Metadata Schema ---")
meta_sample.printSchema()


--- Reviews Data Schema ---
root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)


--- Metadata Schema ---
root
 |-- author: struct (nullable = true)
 |    |-- about: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- avatar: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- bought_together: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- main_category: string (nullable = true)
 |-- parent_asin: string (nullable = true)

In [12]:
# Print few rows
print("\n--- Sample Reviews Data ---")
reviews_sample.limit(5).show()

print("\n--- Sample Metadata ---")
meta_sample.limit(5).show()


--- Sample Reviews Data ---


                                                                                

+----------+------------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+
|      asin|helpful_vote|parent_asin|rating|                text|    timestamp|               title|             user_id|verified_purchase|
+----------+------------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+
|B09CBNY1SB|           2| B09CBNY1SB|   3.0|[[VIDEOID:3f9ba83...|1658512339071|           It's Ok🧡|AGCFOUKPOYBQJFN4F...|            false|
|B00I3N6AM8|           0| B00I3N6AM8|   3.0|I have received t...|1461374668000|Reasonable Price ...|AFDOQTCP7SI36QPDC...|            false|
|B01KI40G3I|           0| B01LX6I5X0|   5.0|           Love it!!|1523381799799|          Five Stars|AEKIAS5INUOBSFLUH...|             true|
|B0058K1MJ0|           4| B0058K1MJ0|   5.0|I don't understan...|1621392460645|Why are they do e...|AE3G4X4NUNDMZESTV...|            false|
|B08R7P97F6|         

[Stage 9:>                                                          (0 + 1) / 1]

+------+--------------+---------------+--------------------+--------------------+--------------+-----------+-----+-------------+--------------+--------+--------------------+
|author|average_rating|bought_together|          categories|         description| main_category|parent_asin|price|rating_number|         store|subtitle|               title|
+------+--------------+---------------+--------------------+--------------------+--------------+-----------+-----+-------------+--------------+--------+--------------------+
|  NULL|           4.4|           NULL|[Beauty & Persona...|[Wow your friends...|    All Beauty| B0C1P4S34H| 5.99|           16|    Fawalyanle|    NULL|Personalized Temp...|
|  NULL|           4.1|           NULL|[Beauty & Persona...|[The Healthy You ...|          NULL| B08769W6BC| NULL|           31|   Healthy You|    NULL|Massage Face Pill...|
|  NULL|           4.3|           NULL|[Beauty & Persona...|[One Smart Cookie...|          NULL| B06X16GZCB| NULL|            7|Pe

                                                                                

In [13]:
# ---------------------------------------------------
# 4. BASIC DATA EXPLORATION
# ---------------------------------------------------

print("\n--- Missing Values in Reviews ---")
for col in reviews_sample.columns:
    null_count = reviews_sample.filter(F.col(col).isNull()).count()
    total_count = reviews_sample.count()
    print(f"Column '{col}': {null_count:,} nulls ({null_count/total_count*100:.2f}%)")


--- Missing Values in Reviews ---


                                                                                

Column 'asin': 0 nulls (0.00%)


                                                                                

Column 'helpful_vote': 0 nulls (0.00%)


                                                                                

Column 'parent_asin': 0 nulls (0.00%)


                                                                                

Column 'rating': 0 nulls (0.00%)


                                                                                

Column 'text': 0 nulls (0.00%)


                                                                                

Column 'timestamp': 0 nulls (0.00%)


                                                                                

Column 'title': 0 nulls (0.00%)


                                                                                

Column 'user_id': 0 nulls (0.00%)




Column 'verified_purchase': 0 nulls (0.00%)


                                                                                

In [14]:
print("\n--- Review Date Distribution ---")
reviews_sample_with_date = reviews_sample.withColumn(
    "review_date", 
    F.to_date(F.from_unixtime(F.col("timestamp") / 1000))
)

date_dist = reviews_sample_with_date.groupBy(
    F.year("review_date").alias("year")
).count().orderBy("year")

date_dist.show()


--- Review Date Distribution ---




+----+-----+
|year|count|
+----+-----+
|2004|    1|
|2005|   11|
|2006|    6|
|2007|   11|
|2008|   37|
|2009|   60|
|2010|  105|
|2011|  232|
|2012|  529|
|2013| 1486|
|2014| 2545|
|2015| 4157|
|2016| 4946|
|2017| 5164|
|2018| 5840|
|2019| 8162|
|2020| 8875|
|2021| 9447|
|2022| 8938|
|2023| 3828|
+----+-----+



                                                                                

In [15]:
# ---------------------------------------------------
# 5. CREATE HELPER FUNCTIONS FOR FULL ANALYSIS
# ---------------------------------------------------

def create_intermediate_folder():
    """Create intermediate folder if it doesn't exist"""
    cmd = f'gsutil ls {intermediate_bucket}'
    try:
        p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output, error = p.communicate()
        
        if p.returncode != 0:
            # Bucket/folder doesn't exist, create it
            cmd = f'gsutil mkdir -p {intermediate_bucket}'
            subprocess.run(cmd, shell=True)
            print(f"Created intermediate bucket: {intermediate_bucket}")
        else:
            print(f"Intermediate bucket already exists: {intermediate_bucket}")
    except Exception as e:
        print(f"Error checking/creating bucket: {str(e)}")

In [16]:
def process_full_reviews_data():
    """Process the full reviews dataset with basic transformations"""
    # Load full reviews dataset
    df_reviews = spark.read.parquet(os.path.join(gcs_folder, 'reviews_parquet'))
    
    # Basic preprocessing
    df_reviews_processed = df_reviews.withColumn(
        "review_date", 
        F.to_date(F.from_unixtime(F.col("timestamp") / 1000))
    )
    
    # Add year and month columns for time analysis
    df_reviews_processed = df_reviews_processed.withColumn("review_year", F.year("review_date"))
    df_reviews_processed = df_reviews_processed.withColumn("review_month", F.month("review_date"))
    
    # Filter out records with null essential fields
    df_reviews_clean = df_reviews_processed.filter(
        (F.col("asin").isNotNull()) & 
        (F.col("user_id").isNotNull()) &
        (F.col("review_date").isNotNull())
    )
    
    # Save processed data
    df_reviews_clean.write.mode("overwrite").parquet(f"{intermediate_bucket}/reviews_processed")
    print(f"Processed {df_reviews_clean.count():,} review records and saved to {intermediate_bucket}/reviews_processed")
    
    return df_reviews_clean

In [17]:
def process_full_reviews_data_v2():
    """Enhanced version with better partitioning"""
    # Load full reviews dataset
    df_reviews = spark.read.parquet(os.path.join(gcs_folder, 'reviews_parquet'))
    
    # Basic preprocessing
    df_reviews_processed = df_reviews.withColumn(
        "timestamp", F.to_timestamp(F.col("timestamp"))
    ).withColumn(
        "year", F.year("timestamp")
    ).withColumn(
        "month", F.month("timestamp")
    )
    
    # Filter out nulls
    df_reviews_clean = df_reviews_processed.filter(
        (F.col("parent_asin").isNotNull()) & 
        (F.col("user_id").isNotNull()) &
        (F.col("timestamp").isNotNull())
    )
    
    # Repartition by year for better downstream processing
    df_reviews_clean = df_reviews_clean.repartition(200, "year")
    
    # Save with partitioning
    df_reviews_clean.write \
        .mode("overwrite") \
        .partitionBy("year") \
        .parquet(f"{intermediate_bucket}/reviews_processed_v2")
    
    print(f"Processed {df_reviews_clean.count():,} review records")
    print(f"Saved to {intermediate_bucket}/reviews_processed_v2 partitioned by year")
    
    return df_reviews_clean

In [18]:
def create_summary_statistics():
    """Create summary statistics for processed data"""
    print("Creating summary statistics...")
    
    # Load processed data
    reviews_df = spark.read.parquet(f"{intermediate_bucket}/reviews_processed")
    meta_df = spark.read.parquet(f"{intermediate_bucket}/meta_processed")
    
    # Get basic counts
    review_count = reviews_df.count()
    meta_count = meta_df.count()
    
    # Get date range - convert timestamp to readable format
    date_stats = reviews_df.select(
        F.from_unixtime(F.col("timestamp")/1000).cast("date").alias("earliest_review"),
        F.from_unixtime(F.col("timestamp")/1000).cast("date").alias("latest_review")
    ).agg(
        F.min("earliest_review").alias("earliest_review"),
        F.max("latest_review").alias("latest_review")
    ).collect()[0]
    
    # Get category distribution
    category_counts = meta_df.groupBy("main_category").count().orderBy(F.desc("count"))
    
    # Get unique reviewers and products
    unique_reviewers = reviews_df.select("user_id").distinct().count()
    unique_products = reviews_df.select("parent_asin").distinct().count()
    
    # Create summary dict
    summary = {
        "total_reviews": review_count,
        "total_products": meta_count,
        "unique_reviewers": unique_reviewers,
        "unique_products": unique_products,
        "date_range": f"{date_stats['earliest_review']} to {date_stats['latest_review']}"
    }
    
    print("\n=== DATA SUMMARY ===")
    for key, value in summary.items():
        print(f"{key}: {value:,}" if isinstance(value, int) else f"{key}: {value}")
    
    print("\n=== CATEGORIES ===")
    category_counts.show(20)
    
    # Save category list for later use
    category_counts.coalesce(1).write.mode("overwrite").parquet(f"{intermediate_bucket}/category_distribution")
    
    return summary

In [19]:
def process_full_meta_data():
    """Process the full metadata dataset with basic transformations"""
    # Load full metadata
    df_meta = spark.read.parquet(os.path.join(gcs_folder, 'meta_parquet'))
    
    # Basic preprocessing - clean up price field and explode categories
    df_meta_processed = df_meta.withColumn(
        "price_clean", 
        F.regexp_replace(F.col("price"), "\\$", "").cast("float")
    )
    
    # Ensure parent_asin is not null
    df_meta_clean = df_meta_processed.filter(F.col("parent_asin").isNotNull())
    
    # Save processed data
    df_meta_clean.write.mode("overwrite").parquet(f"{intermediate_bucket}/meta_processed")
    print(f"Processed {df_meta_clean.count():,} metadata records and saved to {intermediate_bucket}/meta_processed")
    
    return df_meta_clean

In [20]:
create_intermediate_folder()

Intermediate bucket already exists: gs://msca-bdp-students-bucket/shared_data/apavlovic/final_project


In [21]:
reviews_processed = process_full_reviews_data()



Processed 64,679,785 review records and saved to gs://msca-bdp-students-bucket/shared_data/apavlovic/final_project/reviews_processed


                                                                                

In [22]:
meta_processed = process_full_meta_data()



Processed 4,320,533 metadata records and saved to gs://msca-bdp-students-bucket/shared_data/apavlovic/final_project/meta_processed


                                                                                

In [23]:
def create_dev_dataset(sample_fraction=0.01):
    """Create a smaller development dataset for faster iteration"""
    print(f"Creating {sample_fraction*100}% development dataset...")
    
    # Sample reviews
    reviews_df = spark.read.parquet(f"{intermediate_bucket}/reviews_processed")
    reviews_sample = reviews_df.sample(fraction=sample_fraction, seed=42)
    
    # Get corresponding products
    sampled_asins = reviews_sample.select("parent_asin").distinct()
    meta_df = spark.read.parquet(f"{intermediate_bucket}/meta_processed")
    meta_sample = meta_df.join(sampled_asins, on="parent_asin", how="inner")
    
    # Save dev datasets
    reviews_sample.repartition(10).write.mode("overwrite").parquet(f"{intermediate_bucket}/reviews_dev")
    meta_sample.repartition(5).write.mode("overwrite").parquet(f"{intermediate_bucket}/meta_dev")
    
    print(f"Dev dataset created:")
    print(f"  - Reviews: {reviews_sample.count():,} records")
    print(f"  - Products: {meta_sample.count():,} records")

In [24]:
def validate_processed_data():
    """Quick validation of processed data"""
    print("Validating processed data...")
    
    reviews_df = spark.read.parquet(f"{intermediate_bucket}/reviews_processed")
    
    # Check for duplicates using combination of columns that should be unique
    total_records = reviews_df.count()
    unique_records = reviews_df.dropDuplicates(["user_id", "parent_asin", "timestamp"]).count()
    
    print(f"Total records: {total_records:,}")
    print(f"Unique records (by user/product/time): {unique_records:,}")
    print(f"Potential duplicates: {total_records - unique_records:,}")
    
    # Check for data quality issues
    null_counts = reviews_df.select([
        F.sum(F.col(c).isNull().cast("int")).alias(c) 
        for c in ["parent_asin", "user_id", "rating", "timestamp", "text", "title"]
    ]).collect()[0]
    
    print("\nNull counts per column:")
    for col in ["parent_asin", "user_id", "rating", "timestamp", "text", "title"]:
        print(f"  {col}: {null_counts[col]:,}")
    
    # Rating distribution
    print("\nRating distribution:")
    reviews_df.groupBy("rating").count().orderBy("rating").show()
    
    # Show sample of timestamp values to verify conversion
    print("\nSample timestamps:")
    reviews_df.select(
        F.col("timestamp"),
        F.from_unixtime(F.col("timestamp")/1000).alias("readable_date")
    ).show(5, truncate=False)

In [25]:
summary_stats = create_summary_statistics()

Creating summary statistics...


                                                                                


=== DATA SUMMARY ===
total_reviews: 64,679,785
total_products: 4,320,533
unique_reviewers: 22,789,619
unique_products: 4,320,211
date_range: 1998-01-19 to 2023-09-13

=== CATEGORIES ===


                                                                                

+--------------------+-------+
|       main_category|  count|
+--------------------+-------+
|          Automotive|1719649|
|Cell Phones & Acc...|1051457|
|          All Beauty| 739608|
|                NULL| 249444|
|Tools & Home Impr...|  81103|
|         Amazon Home|  79311|
|Health & Personal...|  78383|
|Industrial & Scie...|  69175|
|      AMAZON FASHION|  68949|
|     All Electronics|  54489|
|      Premium Beauty|  34910|
|   Sports & Outdoors|  27718|
|           Computers|  11031|
|     Car Electronics|   7797|
|               Books|   6249|
|      Camera & Photo|   6139|
|     Office Products|   5269|
|        Toys & Games|   5246|
|Home Audio & Theater|   4447|
|Arts, Crafts & Se...|   4356|
+--------------------+-------+
only showing top 20 rows



In [26]:
validate_processed_data()

Validating processed data...


                                                                                

Total records: 64,679,785
Unique records (by user/product/time): 63,968,446
Potential duplicates: 711,339


                                                                                


Null counts per column:
  parent_asin: 0
  user_id: 0
  rating: 0
  timestamp: 0
  text: 0
  title: 0

Rating distribution:


                                                                                

+------+--------+
|rating|   count|
+------+--------+
|   1.0| 7945517|
|   2.0| 3378242|
|   3.0| 4444637|
|   4.0| 7235328|
|   5.0|41676061|
+------+--------+


Sample timestamps:
+-------------+-------------------+
|timestamp    |readable_date      |
+-------------+-------------------+
|1453819207000|2016-01-26 14:40:07|
|1600341347791|2020-09-17 11:15:47|
|1607280179211|2020-12-06 18:42:59|
|1609835116809|2021-01-05 08:25:16|
|1520895879064|2018-03-12 23:04:39|
+-------------+-------------------+
only showing top 5 rows



In [27]:
create_dev_dataset(sample_fraction=0.1)  

Creating 10.0% development dataset...


                                                                                

Dev dataset created:


                                                                                

  - Reviews: 6,468,523 records




  - Products: 1,424,997 records


                                                                                

In [28]:
1+1

2