# Merge all 6 categories into one dataset

In [1]:
import os
import glob
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, size, when, length
from functools import reduce

In [3]:
spark = SparkSession.builder \
    .appName("Amazon Reviews Local Merge") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.caseSensitive", "true") \
    .config("spark.sql.parquet.mergeSchema", "false") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("Spark Session created successfully!")
print(f"  Version: {spark.version}")
print(f"  caseSensitive: true\n")

Spark Session created successfully!
  Version: 3.5.5
  caseSensitive: true



In [5]:
LOCAL_EXTRACT_PATH = "../data/temp_unzipped_data" 
TEMP_OUTPUT_DIR = "../data/temp_output"           
FINAL_OUTPUT_DIR = "../data/final_merged_data"     

if os.path.exists(TEMP_OUTPUT_DIR): 
    shutil.rmtree(TEMP_OUTPUT_DIR)
    
os.makedirs(TEMP_OUTPUT_DIR, exist_ok=True)
os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)

print("✓ Working directories created:")
print(f"  Unzipped data: {LOCAL_EXTRACT_PATH}")
print(f"  Temporary files: {TEMP_OUTPUT_DIR}")
print(f"  Final archives: {FINAL_OUTPUT_DIR}\n")

✓ Working directories created:
  Unzipped data: data/temp_unzipped_data
  Temporary files: data/temp_output
  Final archives: data/final_merged_data



In [6]:
def read_parquet_safe(spark, path):
    df = spark.read.parquet(path)
    
    columns = df.columns
    if len(columns) != len(set(columns)):
        duplicates = [col for col in set(columns) if columns.count(col) > 1]
        print(f"Duplicate columns found: {duplicates}")
        
        unique_cols = []
        seen = set()
        for col in columns:
            if col not in seen:
                unique_cols.append(col)
                seen.add(col)
        
        df = df.select(unique_cols)
        print(f"Duplicates removed, remaining: {len(df.columns)} columns")
    
    return df

In [7]:
def select_safe(df, target_cols):
    available = set(df.columns)
    to_select = [col for col in target_cols if col in available]
    
    if to_select:
        return df.select(to_select)
    else:
        return None

In [8]:
target_meta_cols = [
    "parent_asin", 
    "title", 
    "main_category", 
    "average_rating", 
    "rating_number", 
    "features", 
    "description", 
    "price", 
    "store"
]

target_nlp_cols = [
    "rating", 
    "title", 
    "text", 
    "verified_purchase", 
    "parent_asin"
]

### DATA LOADING AND PROCESSING

In [None]:
import zipfile

INPUT_DIR = '../data/cleaned'
zip_files = [f for f in os.listdir(INPUT_DIR) if f.endswith('.zip')]

extracted_folders = []

for zip_file in zip_files:
    path_to_zip = os.path.join(INPUT_DIR, zip_file)
    category_name = zip_file.replace("_cleaned.zip", "").replace(".zip", "") 
    
    extract_to = os.path.join(LOCAL_EXTRACT_PATH, category_name)
    os.makedirs(extract_to, exist_ok=True)
    
    print(f"Unzip {zip_file} --> {extract_to} ...")
    
    try:
        with zipfile.ZipFile(path_to_zip, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        extracted_folders.append(category_name)
        print("   -> OK")
    except Exception as e:
        print(f"   !!! ERROR {zip_file}: {e}")

In [None]:
meta_dfs = []   
review_dfs = [] 

categories = [d for d in os.listdir(LOCAL_EXTRACT_PATH) if os.path.isdir(os.path.join(LOCAL_EXTRACT_PATH, d))]

for cat in categories:
    cat_path = os.path.join(LOCAL_EXTRACT_PATH, cat)
    
    found_meta = glob.glob(f"{cat_path}/**/*meta*.parquet", recursive=True)
    found_reviews = glob.glob(f"{cat_path}/**/*review*.parquet", recursive=True)
    
    if found_meta:
        try:
            path = found_meta[0]
            print(f"  Reading Meta: {os.path.basename(path)}")
            
            df = read_parquet_safe(spark, path)
            
            df_clean = select_safe(df, target_meta_cols)
            
            if df_clean is not None:
                df_clean = df_clean.withColumn("main_category_label", lit(cat))
                meta_dfs.append(df_clean)
                print(f"Meta: {df_clean.count()} records")
            else:
                print(f"Meta: no columns needed")
        except Exception as e:
            print(f"ERROR Meta {cat}: {e}")

    if found_reviews:
        try:
            path = sorted(found_reviews, key=len)[0]
            if "part-" in path: 
                path = os.path.dirname(path)
                
            print(f"  Reading Reviews: {os.path.basename(path)}")
            
            df = read_parquet_safe(spark, path)
            
            df_clean = select_safe(df, target_nlp_cols)
            
            if df_clean is not None:
                df_clean = df_clean.withColumn("category_label", lit(cat))
                review_dfs.append(df_clean)
                print(f"Reviews: {df_clean.count()} records")
            else:
                print(f"Reviews: no columns needed")
        except Exception as e:
            print(f"ERROR Reviews {cat}: {e}")
    else:
        print(f" NOT FOUND Reviews {cat}!")

### CREATING DATASETS

In [None]:
if meta_dfs:
    print(f"\nMerging {len(meta_dfs)} Meta datasets...")
    full_meta = reduce(lambda x, y: x.unionByName(y, allowMissingColumns=True), meta_dfs)
    
    if "price" in full_meta.columns:
        full_meta = full_meta.withColumn("price", col("price").cast("float"))
        full_meta = full_meta.filter(col("price").isNotNull() & (col("price") > 0)) \
                             .dropDuplicates(["parent_asin"])

    print(f"Saving Regression data ({full_meta.count()} rows)...")
    reg_path = f"{TEMP_OUTPUT_DIR}/regression_price"
    full_meta.write.mode("overwrite").parquet(reg_path)
    
    print("Archiving...")
    shutil.make_archive(reg_path, 'zip', reg_path)
    shutil.copy(f"{reg_path}.zip", f"{FINAL_OUTPUT_DIR}/regression_price.zip")

In [None]:
if review_dfs:
    print(f"\nMerging {len(review_dfs)} Review datasets...")
    full_reviews = reduce(lambda x, y: x.unionByName(y, allowMissingColumns=True), review_dfs)
    
    full_reviews = full_reviews.filter(col("text").isNotNull() & (length(col("text")) > 0))
    full_reviews = full_reviews.withColumn("label", 
        when(col("rating") <= 2, 0)
        .when(col("rating") == 3, 1)
        .otherwise(2)
    )
    
    print(f"Saving Classification data ({full_reviews.count()} rows)...")
    nlp_path = f"{TEMP_OUTPUT_DIR}/classification_reviews"
    full_reviews.write.mode("overwrite").parquet(nlp_path)
    
    print("Archiving...")
    shutil.make_archive(nlp_path, 'zip', nlp_path)
    shutil.copy(f"{nlp_path}.zip", f"{FINAL_OUTPUT_DIR}/classification_reviews.zip")

**META:**

root

 |-- parent_asin: string (nullable = true)

 |-- title: string (nullable = true)

 |-- main_category: string (nullable = true)

 |-- average_rating: double (nullable = true)

 |-- rating_number: long (nullable = true)

 |-- features: array (nullable = true)

 |    |-- element: string (containsNull = true)

 |-- description: array (nullable = true)

 |    |-- element: string (containsNull = true)

 |-- price: float (nullable = true)

 |-- store: string (nullable = true)
 
 |-- main_category_label: string (nullable = false)

 699,283 records

**REVIEWS:**

root

 |-- rating: double (nullable = true)

 |-- title: string (nullable = true)

 |-- text: string (nullable = true)

 |-- verified_purchase: boolean (nullable = true)

 |-- parent_asin: string (nullable = true)

 |-- category_label: string (nullable = false)
 
 |-- label: integer (nullable = false)

 35,202,489 records
