In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

In [None]:
# 🔧 SOLUTION 1: Check if Spark session exists, create if needed
def get_spark_session():
    try:
        # Try to get existing active session
        spark = SparkSession.getActiveSession()
        if spark is None:
            # Create new session if none exists
            spark = SparkSession.builder \
                .appName("Module-5") \
                .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.2") \
                .getOrCreate()
        return spark
    except:
        # If anything goes wrong, create fresh session
        spark = SparkSession.builder \
            .appName("Module-5") \
            .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.2") \
            .getOrCreate()
        return spark

In [None]:
# Get or create Spark session
spark = get_spark_session()

In [None]:
print("✅ Spark Session Status:")
print(f"   App Name: {spark.sparkContext.appName}")
print(f"   Master: {spark.sparkContext.master}")
print(f"   Spark Version: {spark.version}")
print(f"   Active: {not spark.sparkContext._jsc.sc().isStopped()}")

In [None]:
# Define the base GCS bucket path
gcs_bucket_path = "gs://retail-order-data-bucket/output/"

In [None]:
# Read each Parquet file into a DataFrame
# Customers DataFrame
customers_df = spark.read.parquet(f"{gcs_bucket_path}customers_clean_df.parquet")
# Geolocation DataFrame
geolocation_df = spark.read.parquet(f"{gcs_bucket_path}geolocation_clean_df.parquet/")
# Order Items DataFrame
order_items_df = spark.read.parquet(f"{gcs_bucket_path}order_items_clean_df.parquet/")
# Order Payments DataFrame
order_payments_df = spark.read.parquet(f"{gcs_bucket_path}order_payments_clean_df.parquet/")
# Orders DataFrame
orders_df = spark.read.parquet(f"{gcs_bucket_path}orders_clean_df.parquet/")
# Sellers DataFrame
sellers_df = spark.read.parquet(f"{gcs_bucket_path}sellers_clean_df.parquet/")
# Order Reviews DataFrame
order_reviews_df = spark.read.parquet(f"{gcs_bucket_path}orders_reviews_clean_df.parquet/")
# Products DataFrame
products_df = spark.read.parquet(f"{gcs_bucket_path}products_clean_df.parquet/")


### Data Serving Layer

In [None]:
#--- 1. Pre-process Geolocation Data ---
# Average lat/lng for each zip code prefix to get a unique coordinate set.
avg_geolocation_df = geolocation_df.groupBy("geolocation_zip_code_prefix").agg(
    avg("geolocation_lat").alias("avg_lat"),
    avg("geolocation_lng").alias("avg_lng")
)

# --- 2. Join Core Order, Customer, Payment, and Review Data using INNER JOIN ---
# Only orders with matches in all these core tables will be kept.
combined_df = orders_df.alias("o") \
    .join(customers_df.alias("c"), col("o.customer_id") == col("c.customer_id"), "inner") \
    .join(order_payments_df.alias("op"), col("o.order_id") == col("op.order_id"), "inner") \
    .join(order_reviews_df.alias("orv"), col("o.order_id") == col("orv.order_id"), "inner")

# --- 3. Join Order Items, Products, and Sellers Data using INNER JOIN ---
# Only records with matches in these will be kept.
combined_df = combined_df \
    .join(order_items_df.alias("oi"), col("o.order_id") == col("oi.order_id"), "inner") \
    .join(products_df.alias("p"), col("oi.product_id") == col("p.product_id"), "inner") \
    .join(sellers_df.alias("s"), col("oi.seller_id") == col("s.seller_id"), "inner")

# --- 4. Join Geolocation Data for Customers using INNER JOIN ---
# Only records where customer zip code has a geolocation match will be kept.
combined_df = combined_df.join(
    avg_geolocation_df.alias("geo_cust"),
    col("c.customer_zip_code_prefix") == col("geo_cust.geolocation_zip_code_prefix"),
    "inner"
).withColumnRenamed("avg_lat", "customer_lat") \
 .withColumnRenamed("avg_lng", "customer_lng")

# --- 5. Join Geolocation Data for Sellers using INNER JOIN ---
# Only records where seller zip code has a geolocation match will be kept.
combined_df = combined_df.join(
    avg_geolocation_df.alias("geo_seller"),
    col("s.seller_zip_code_prefix") == col("geo_seller.geolocation_zip_code_prefix"),
    "inner"
).withColumnRenamed("avg_lat", "seller_lat") \
 .withColumnRenamed("avg_lng", "seller_lng")

# --- 6. Select and Rename Columns for Final Output ---
# This step is crucial to avoid duplicate column names and make the schema clean
final_combined_df = combined_df.select(
    # Order Details
    col("o.order_id"),
    col("o.order_status"),
    col("o.order_purchase_timestamp"),
    col("o.order_approved_at"),
    col("o.order_delivered_carrier_date"),
    col("o.order_delivered_customer_date"),
    col("o.order_estimated_delivery_date"),

    # Customer Details
    col("c.customer_id"),
    col("c.customer_unique_id"),
    col("c.customer_zip_code_prefix"),
    col("c.customer_city"),
    col("c.customer_state"),
    col("customer_lat"), # from geo_cust join
    col("customer_lng"), # from geo_cust join

    # Order Item Details
    col("oi.order_item_id"),
    col("oi.product_id"),
    col("oi.price").alias("item_price"), # Renamed to avoid conflict with payment_value
    col("oi.freight_value").alias("item_freight_value"),
    col("oi.total_value").alias("item_total_value"), # total_value from order_items
    col("oi.shipping_limit_date"),
    col("oi.shipping_date"),
    col("oi.shipping_year"),
    col("oi.shipping_month"),

    # Product Details
    col("p.product_category_name"),
    col("p.product_name_lenght"),
    col("p.product_description_lenght"),
    col("p.product_photos_qty"),
    col("p.product_weight_g"),
    col("p.product_length_cm"),
    col("p.product_height_cm"),
    col("p.product_width_cm"),
    col("p.product_weight_kg"),
    col("p.product_category_clean"),

    # Seller Details
    col("s.seller_id"),
    col("s.seller_zip_code_prefix"),
    col("s.seller_city"),
    col("s.seller_state"),
    col("seller_lat"), # from geo_seller join
    col("seller_lng"), # from geo_seller join

    # Payment Details
    col("op.payment_sequential"),
    col("op.payment_type"),
    col("op.payment_installments"),
    col("op.payment_value").alias("order_payment_value"), # Renamed for clarity
    col("op.installment_value"),

    # Review Details
    col("orv.review_id"),
    col("orv.review_score"),
    col("orv.review_comment_title"),
    col("orv.review_comment_message"),
    col("orv.review_creation_date"),
    col("orv.review_answer_timestamp")
)

In [None]:
# Set your temp GCS bucket path
temp_gcs_bucket = "temp-buck-111"  # just the bucket name, without gs://

In [None]:
try:
    final_combined_df.write \
        .format("bigquery") \
        .option("temporaryGcsBucket", temp_gcs_bucket) \
        .option("table", bigquery_table) \
        .mode("overwrite") \
        .save()
    print(f"✅ Successfully wrote combined data to BigQuery table: {bigquery_table}")
except Exception as e:
    print(f"❌ Error writing to BigQuery: {e}")

In [None]:
spark.stop()