# GA4 Sweaty Betty Gold Layer Processing <br>
## Product Revenue<br>
This notebook reads raw data and processes it for Product Revenue Table


In [None]:
import concurrent.futures
from delta import *
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, LongType, DoubleType, BooleanType, MapType,IntegerType
from pyspark.sql.functions import *
from functools import reduce
from pyspark.sql.dataframe import DataFrame
import pyspark.sql.functions as F
import json
import base64
from datetime import datetime,timedelta
from time import sleep
spark.conf.set("spark.sql.sources.partitionOverwriteMode","DYNAMIC")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
from azure.storage.blob import BlobServiceClient
from pyspark.sql.functions import max as spark_max

In [None]:
%run /utils/common_functions

In [None]:
account_name = raw_adls_path.split('@')[1].split('.')[0]
json_blob_path =f"{raw_adls_path}/GA4_SweatyBetty/bigquery_datasets_tables.json"
#base_folder = "GA4_SweatyBetty"
base_folder = "GA4_SweatyBetty/analytics_292120381/events"
gold_container = 'gold'

In [None]:
# Product Revenue
product_revenue_target_folder = '/GA4-SweatyBetty/product_revenue/'
product_revenue_delta_table_path = f"abfss://{gold_container}@{account_name}.dfs.core.windows.net/{product_revenue_target_folder}"

In [None]:
# Product Revenue
print(f"Product Revenue Target Folder: {product_revenue_target_folder}")
print(f"Product Revenue Delta Table Path: {product_revenue_delta_table_path}")

# Variables for Currency Exchange

In [None]:
silver_container = 'silver'
currency_exchange_source_folder = '/SAP/BW/FxRates/'
currency_exchange_delta_table_path = f"abfss://{silver_container}@{account_name}.dfs.core.windows.net/{currency_exchange_source_folder}"
print(currency_exchange_delta_table_path)

In [None]:
# Read the Currency Delta table from the specified path
df_currency_exchange = spark.read.format("delta").load(currency_exchange_delta_table_path)

In [None]:
# Step 1: Truncate CalendarDay to month
df_monthly_avg = (
    df_currency_exchange
    .withColumn("month_start", trunc("CalendarDay", "month"))
    .groupBy("FromCurrency", "ToCurrency", "month_start")
    .agg(avg("FxRate").alias("exchange_rate_amount"))
)

# Step 2: Add valid_from and valid_to
transformed_exchange_df = (
    df_monthly_avg
    .withColumn("valid_from", col("month_start"))
    .withColumn("valid_to", add_months(col("month_start"), 1))
    .withColumnRenamed("FromCurrency", "base_currency_code")
    .withColumnRenamed("ToCurrency", "target_currency_code")
    .select(
        "base_currency_code",
        "target_currency_code",
        "exchange_rate_amount",
        "valid_from",
        "valid_to"
    )
)

# Optional: filter for GBP only
transformed_exchange_df = transformed_exchange_df.filter(col("target_currency_code") == "GBP")

In [None]:
exchange_rate_df = transformed_exchange_df

# Transform

In [None]:
from pyspark.sql.functions import col, md5, concat_ws, coalesce, lit, from_unixtime, to_date, explode_outer, row_number, min as min_, when
from pyspark.sql.window import Window


In [None]:
# 1. Add session_hkey
def add_session_hkey(df):
    return df.withColumn(
        "session_hkey",
        md5(concat_ws("-",
            coalesce(col("user_pseudo_id").cast("string"), lit("")),
            coalesce(col("event_params.ga_session_id.int_value").cast("string"), lit("")),
            coalesce(col("event_params.ga_session_number.int_value").cast("string"), lit(""))
        ))
    )

In [None]:
# 2. Explode items array

def explode_items(df):
    return df.withColumn("item", explode_outer("items"))

In [None]:
#This function avoids duplicates, but product_index does not match
#3 compute_product_metrics with Unique

from pyspark.sql.functions import col, from_unixtime, min as min_, row_number, date_format, sha2, concat_ws
from pyspark.sql.window import Window

def compute_product_metrics(df):
    # Filter out records where product_sku (item.item_id) is null
    df = df.filter(col("item.item_id").isNotNull())

    # Add event_timestamp_ts
    df = df.withColumn("event_timestamp_ts", from_unixtime(col("event_timestamp") / 1000000).cast("timestamp"))

    # Add visit_key
    df = df.withColumn("visit_key", col("event_params.ga_session_id.int_value"))

    # Compute visit_start_time
    window_vs = Window.partitionBy("visit_key")
    df = df.withColumn("visit_start_time", min_("event_timestamp_ts").over(window_vs))
    df = df.withColumn("visit_start_time", date_format(col("visit_start_time"), "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp"))

    # Compute hit_number
    window_hit = Window.partitionBy("user_pseudo_id", "visit_key").orderBy("event_timestamp")
    df = df.withColumn("hit_number", row_number().over(window_hit))

    # New product_index logic
    window_index = Window.partitionBy("event_timestamp_ts") \
                         .orderBy(col("event_timestamp_ts").asc(),
                                  col("item.item_id").asc(),
                                  col("event_params.engagement_time_msec.int_value").cast("int").asc())
    df = df.withColumn("product_index", row_number().over(window_index))

    # Generate event_product_unique_key and event_unique_key
    df = df.withColumn("event_product_unique_key", concat_ws("_", col("event_name"), col("item.item_id")))
    df = df.withColumn("event_unique_key", col("event_params.event_id.string_value"))

    # Generate unique_key (equivalent to dbt_utils.generate_surrogate_key)
    df = df.withColumn("unique_key", sha2(
        concat_ws("||",
            col("session_hkey"),
            col("user_pseudo_id"),
            col("visit_start_time").cast("string"),
            col("event_product_unique_key"),
            col("event_unique_key")
        ), 256
    ))

    # Select required columns
    return df.select(
        "event_timestamp",
        "user_pseudo_id",
        "session_hkey",
        "user_id",
        "visit_key",
        "visit_start_time",
        "hit_number",
        col("event_name").alias("ecommerceaction_actiontype"),
        col("ecommerce.transaction_id").alias("transaction_id"),
        col("geo.country").alias("country"),
        "product_index",
        col("item.item_id").alias("product_sku"),
        col("item.quantity").alias("product_quantity"),
        col("event_params.currency.string_value").alias("transaction_currency"),
        col("item.item_revenue").alias("local_product_revenue"),
        col("item.item_revenue_in_usd").alias("product_revenue_usd"),
        col("event_timestamp_ts"),
        "unique_key"
    )


In [None]:
# 4. Join with exchange rate to compute product_revenue_gbp
def join_exchange_rates(df, exchange_rate_df):
    df = df.join(
        exchange_rate_df,
        (df.transaction_currency == exchange_rate_df.base_currency_code) &
        (df.visit_start_time.between(exchange_rate_df.valid_from, exchange_rate_df.valid_to)) &
        (exchange_rate_df.target_currency_code == "GBP"),
        how="left"
    ).withColumn(
        "product_revenue_gbp",
        when(col("transaction_currency") == "GBP", col("local_product_revenue"))
        .otherwise(col("local_product_revenue") * coalesce(col("exchange_rate_amount"), lit(1)))

    )
    #df.printSchema()
    return df

In [None]:

from pyspark.sql.functions import md5, concat_ws, coalesce, col, lit

def compute_unique_key(df):
    return df.withColumn(
        "unique_key",
        md5(concat_ws("-",
            coalesce(col("session_hkey").cast("string"), lit("")),
            coalesce(col("visit_key").cast("string"), lit("")),
            coalesce(col("ecommerceaction_actiontype").cast("string"), lit("")),
            coalesce(col("event_timestamp_ts").cast("string"), lit("")),
            coalesce(col("hit_number").cast("string"), lit(""))
        ))
    )


In [None]:
# 6. Apply incremental filter if needed
def filter_incremental(df, last_loaded_datetime):
    return df.filter(col("load_datetime") > lit(last_loaded_datetime))

In [None]:
# 7. Full transformation pipeline
def transform_product_revenue(df_events, exchange_rate_df):
    df = add_session_hkey(df_events)
    df = explode_items(df)
    df = compute_product_metrics(df)
    df = join_exchange_rates(df, exchange_rate_df)
    df = compute_unique_key(df)
    #df.printSchema()
    return df


# Function to Save to Delta
Hit number is non deterministic, this code can give duplicate records later.

In [None]:
# Save and Merge Function - New Logic
from delta.tables import DeltaTable

def save_to_delta_merge(df, spark, delta_table_path: str):
    """
    Save the given DataFrame to a Delta table using merge on 'unique_key'.
    If the table exists, update matching rows and insert new ones.
    If it doesn't exist, create it.

    Parameters:
    - df: DataFrame to save
    - spark: SparkSession
    - delta_table_path: path to the Delta table
    """
    if DeltaTable.isDeltaTable(spark, delta_table_path):
        delta_table = DeltaTable.forPath(spark, delta_table_path)

        # Merge using unique_key
        merge_condition = "target.unique_key = source.unique_key"

        delta_table.alias("target").merge(
            source=df.alias("source"),
            condition=merge_condition
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        # If the table doesn't exist, write the full DataFrame
        df.write.format("delta").mode("overwrite").save(delta_table_path)


# Process a Date Range

In [None]:
#Process data for Day before Yesterday or a Date Range
from datetime import datetime, timedelta

def process_and_save_product_revenue_date_range(
    raw_adls_path: str,
    base_folder: str,
    delta_base_path: str,
    exchange_rate_df,
    spark,
    start_date: str = None,
    end_date: str = None
):
    """
    Process and save product revenue tables over a date range.

    Parameters:
    - raw_adls_path: Base path to raw data in ADLS
    - base_folder: Subfolder for event tables
    - delta_base_path: Path to write delta tables
    - exchange_rate_df: Pre-loaded DataFrame with exchange rates
    - spark: SparkSession
    - start_date: Start of date range (format: YYYYMMDD)
    - end_date: End of date range (format: YYYYMMDD)
    """

    if not start_date or not end_date:
        # Compute day before yesterday in UTC
        day_before_yesterday = (datetime.utcnow() - timedelta(days=2)).strftime("%Y%m%d")
        start_date = end_date = day_before_yesterday

    start = datetime.strptime(start_date, "%Y%m%d")
    end = datetime.strptime(end_date, "%Y%m%d")

    current = start
    while current <= end:
        date_str = current.strftime("%Y%m%d")
        table_name = f"events_{date_str}"
        print(f"Processing table: {table_name}")

        try:
            full_path = f"{raw_adls_path}{base_folder}/{table_name}"
            df_events = spark.read.parquet(full_path)

            df_processed = transform_product_revenue(df_events, exchange_rate_df)
            # df_processed = df_processed.dropDuplicates()

            delta_table_path = f"{delta_base_path}"

            save_to_delta_merge(df_processed, spark, delta_table_path)
            print(f"Saved to {delta_table_path}")
        
        except Exception as e:
            print(f"Error processing {table_name}: {e}")
        
        current += timedelta(days=1)


# Call Product Revenue for Day before yesterday 

In [None]:

process_and_save_product_revenue_date_range(
    raw_adls_path=raw_adls_path,
    base_folder=base_folder,
    delta_base_path=product_revenue_delta_table_path,
    exchange_rate_df=exchange_rate_df,
    spark=spark
)


In [None]:
#The following code can be uncommented to process data for a specific date range
'''
process_and_save_product_revenue_date_range(
    raw_adls_path=raw_adls_path,
    base_folder=base_folder,
    delta_base_path=product_revenue_delta_table_path,
    exchange_rate_df=exchange_rate_df,
    spark=spark,
    start_date="20250525",
    end_date="20250525"
)'''
