In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd
import os

### Creating a BigQuery Dataset
(pyspark_env) PS C:\Users\David\my_dbt_project> bq mk --dataset --location=US ga4-ecommerce-project-1:pyspark_output
>>
Dataset 'ga4-ecommerce-project-1:pyspark_output' successfully created.

### Creating a BigQuery Temporary Bucket
(pyspark_env) PS C:\Users\David\my_dbt_project> gsutil mb -l US gs://ga4-ecommerce-project-1-temp
Creating gs://ga4-ecommerce-project-1-temp/...


In [2]:
def get_event_param(params_col, param_key, value_type):
  """
  A helper function to safely extract a value from the GA4 event_params MAP.
  """
  # Access the struct from the map by its key, then get the specific value type
  return params_col.getItem(param_key).getItem(value_type)

def write_to_gcs_for_dbt(df, table_name, gcs_bucket, format="parquet"):
    """
    Generic function to write DataFrames to GCS for dbt consumption
    """
    output_path = f"gs://{gcs_bucket}/pyspark_output/{table_name}"
    
    if format == "parquet":
        df.write.mode("overwrite").parquet(output_path)
    elif format == "csv":
        df.write.mode("overwrite").option("header", "true").csv(output_path)
    elif format == "json":
        df.write.mode("overwrite").json(output_path)
    
    return output_path

def run_dq_check(df, description, condition):
    """
    Runs a data quality check.
    If the condition returns any rows, it means there is bad data, and an error is raised.
    """
    invalid_rows_df = df.filter(condition)
    invalid_row_count = invalid_rows_df.count()

    if invalid_row_count > 0:
        print(f"FAILED DQ CHECK: {description}")
        print(f"Found {invalid_row_count} invalid rows. Showing examples:")
        invalid_rows_df.show(5)
        raise Exception(f"Data quality check failed: {description}")
    else:
        print(f"PASSED DQ CHECK: {description}")

def write_local_for_dbt(df, table_name, format="parquet"):
    """Write DataFrames locally for dbt consumption"""
    output_path = f"./data/{table_name}"
    df.write.mode("overwrite").parquet(output_path)
    print(f"Data written to: {output_path}")
    return output_path

In [3]:
# Set environment variables
os.environ['JAVA_HOME'] = r"C:/Program Files/Eclipse Adoptium/jdk-17.0.16.8-hotspot"
os.environ['SPARK_SKIP_JAVA_VERSION_CHECK'] = '1'

# --- Key Configurations ---
project_id = 'ga4-ecommerce-project-1' 

connector_package = "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.37.0,javax.inject:javax.inject:1"

# Define your packages
bq_package = "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.37.0"
gcs_package = "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.20" # Add this GCS connector
inject_package = "javax.inject:javax.inject:1"
gcs_jar_path = "C:/Users/David/spark_jar/gcs-connector-hadoop3-2.2.20-shaded.jar"
bq_jar_path = "C:/Users/David/spark_jar/spark-bigquery-with-dependencies_2.12-0.37.0.jar"

# Combine the packages into a comma-separated string
all_packages = f"{bq_package},{gcs_package},{inject_package}"

In [4]:
print("Initializing SparkSession...")

spark = (
    SparkSession.builder
    .appName("BigQueryAndGcsApp")
    .config("spark.jars.packages", all_packages)
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile",
            "C:/Users/David/AppData/Roaming/gcloud/application_default_credentials.json")
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
    .config("spark.hadoop.fs.gs.auth.service.account.enable", "true")
    .config("spark.hadoop.fs.gs.auth.service.account.json.keyfile",
            "C:/Users/David/AppData/Roaming/gcloud/application_default_credentials.json")
    .getOrCreate()
)

BASE_SELECT = """
WITH base AS (
  SELECT
    TO_HEX(MD5(CONCAT(user_pseudo_id, '_', CAST(event_timestamp AS STRING), '_', event_name))) AS event_id,
    event_date,
    event_timestamp,
    event_name,
    user_pseudo_id,
    (SELECT ep.value.int_value FROM UNNEST(event_params) ep WHERE ep.key = 'ga_session_id') AS ga_session_id,
    (SELECT ep.value.string_value FROM UNNEST(event_params) ep WHERE ep.key = 'page_location') AS page_location,
    (SELECT ep.value.string_value FROM UNNEST(event_params) ep WHERE ep.key = 'page_referrer') AS page_referrer,
    (SELECT ep.value.int_value FROM UNNEST(event_params) ep WHERE ep.key = 'engagement_time_msec') AS engagement_time_msec,
    (SELECT ep.value.string_value FROM UNNEST(event_params) ep WHERE ep.key = 'gclid') AS gclid,
    (SELECT ep.value.string_value FROM UNNEST(event_params) ep WHERE ep.key = 'utm_source') AS utm_source,
    (SELECT ep.value.string_value FROM UNNEST(event_params) ep WHERE ep.key = 'utm_medium') AS utm_medium,
    (SELECT ep.value.string_value FROM UNNEST(event_params) ep WHERE ep.key = 'utm_campaign') AS utm_campaign,
    (SELECT ep.value.string_value FROM UNNEST(event_params) ep WHERE ep.key = 'utm_term') AS utm_term,
    (SELECT ep.value.string_value FROM UNNEST(event_params) ep WHERE ep.key = 'utm_content') AS utm_content,
    ecommerce.transaction_id,
    event_value_in_usd,
    traffic_source.source,
    traffic_source.medium,
    traffic_source.name AS campaign_name,
    geo.continent,
    geo.country,
    geo.region,
    geo.city,
    device.category,
    device.operating_system,
    device.web_info.browser,
    device.mobile_brand_name,
    items,
    user_id,
    user_first_touch_timestamp
  FROM
    `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_*`
  WHERE _TABLE_SUFFIX BETWEEN '{START}' AND '{END}'
)

SELECT * FROM base
"""

months = [
    ('20201101', '20201130'),
    ('20201201', '20201231'),
    ('20210101', '20210131')
]

paths = []
for start, end in months:
    sql = BASE_SELECT.format(START=start, END=end)
    part = (
        spark.read.format("bigquery")
        .option("parentProject", project_id)
        .option("viewsEnabled", "true")
        .option("materializationProject", project_id)
        .option("materializationDataset", "spark_bq_temp")
        .option("readDataFormat", "AVRO")
        .option("query", sql)
        .load()
    )

    # quick sanity check
    part.select(
        F.min("event_timestamp").alias("min_ts"),
        F.max("event_timestamp").alias("max_ts"),
        F.count("*").alias("rows")
    ).show(truncate=False)

    # adjust partition count for your machine
    part = part.repartition(8)

    path = f"C:/Users/David/my_dbt_project/ga4_data/ga4_{start}_{end}.parquet"
    part.write.mode("overwrite").parquet(path)
    paths.append(path)

df = spark.read.parquet(*paths)

print("SparkSession initialized and data loaded successfully.")

Initializing SparkSession...
+----------------+----------------+-------+
|min_ts          |max_ts          |rows   |
+----------------+----------------+-------+
|1604188804579566|1606780799060669|1472712|
+----------------+----------------+-------+

+----------------+----------------+-------+
|min_ts          |max_ts          |rows   |
+----------------+----------------+-------+
|1606780800730722|1609459163197109|1612725|
+----------------+----------------+-------+

+----------------+----------------+-------+
|min_ts          |max_ts          |rows   |
+----------------+----------------+-------+
|1609459208136569|1612137595412363|1210147|
+----------------+----------------+-------+

SparkSession initialized and data loaded successfully.


In [None]:
df = df.select('*').withColumn('event_id', F.md5(F.concat_ws('_', F.col('user_pseudo_id'), F.col('event_timestamp'), F.col('event_name'))))

In [None]:
dim_users = df.groupBy("user_pseudo_id").agg(
    F.first("user_id").alias("user_id"),
    F.min("user_first_touch_timestamp").alias("user_first_touch_timestamp")
)

In [None]:
run_dq_check(dim_users, "user_pseudo_id in dim_users should not be null", F.col("user_pseudo_id").isNull())

invalid_df = dim_users.groupBy("user_pseudo_id").count().filter("count > 1")
run_dq_check(invalid_df, "user_pseudo_id in dim_users should be unique", F.lit(True))

In [None]:
write_local_for_dbt(dim_users, "dim_users")

In [None]:
'''
dim_users.write \
    .format("bigquery") \
    .option("table", f"{project_id}.pyspark_output.dim_users") \
    .option("temporaryGcsBucket", "ga4-ecommerce-project-1-temp") \
    .mode("overwrite") \
    .save()
'''

In [None]:
# write_to_gcs_for_dbt(dim_users, "dim_users", 'ga4-ecommerce-project-1', format="parquet")

In [None]:
fact_events = df.select(
    F.col('event_id'),
    F.col('event_date'),
    F.col('event_timestamp'),
    F.col('event_name'),
    F.col('user_pseudo_id'),
    get_event_param(F.col('event_params'), 'ga_session_id', 'int_value').alias('ga_session_id'),
    get_event_param(F.col('event_params'), 'page_location', 'string_value').alias('page_location'),
    get_event_param(F.col('event_params'), 'page_referrer', 'string_value').alias('page_referrer'),
    get_event_param(F.col('event_params'), 'engagement_time_msec', 'int_value').alias('engagement_time_msec'),
    get_event_param(F.col('event_params'), 'gclid', 'string_value').alias('gclid'),
    get_event_param(F.col('event_params'), 'utm_source', 'string_value').alias('utm_source'),
    get_event_param(F.col('event_params'), 'utm_medium', 'string_value').alias('utm_medium'),
    get_event_param(F.col('event_params'), 'utm_campaign', 'string_value').alias('utm_campaign'),
    get_event_param(F.col('event_params'), 'utm_term', 'string_value').alias('utm_term'),
    get_event_param(F.col('event_params'), 'utm_content', 'string_value').alias('utm_content'),
    F.col('ecommerce.transaction_id').alias('transaction_id'),
    F.col('event_value_in_usd'),
    F.col('traffic_source.source').alias('source'),
    F.col('traffic_source.medium').alias('medium'),
    F.col('traffic_source.name').alias('campaign_name'),
    F.col('geo.continent').alias('continent'),
    F.col('geo.country').alias('country'), 
    F.col('geo.region').alias('region'),
    F.col('geo.city').alias('city'), 
    F.col('device.category').alias('category'), 
    F.col('device.operating_system').alias('operating_system'), 
    F.col('device.web_info.browser').alias('browser'), 
    F.col('device.mobile_brand_name').alias('mobile_brand_name')
)

In [None]:
fact_events.select(F.min(F.col('event_date'))).show()

In [None]:
tst = fact_events.toPandas()

In [None]:
tst['event_name'].unique()

In [None]:
tst.head(10)

In [None]:
fact_events.printSchema()

In [None]:
run_dq_check(fact_events, "event_id should not be null", F.col("event_id").isNull())
run_dq_check(fact_events, "user_pseudo_id should not be null", F.col("user_pseudo_id").isNull())

invalid_df = fact_events.groupBy("event_id").count().filter("count > 1")
run_dq_check(invalid_df, "event_id should be unique", F.lit(True)) 

orphaned_events = fact_events.join(dim_users, on="user_pseudo_id", how="left_anti")
run_dq_check(orphaned_events, "All events must have a valid user", F.lit(True))

In [None]:
# write_to_gcs_for_dbt(fact_events, "fct_events", 'ga4-ecommerce-project-1', format="parquet")

In [None]:
write_local_for_dbt(fact_events, "fact_events")

In [None]:
'''
fact_events.write \
    .format("bigquery") \
    .option("table", f"{project_id}.pyspark_output.fact_events") \
    .option("temporaryGcsBucket", "ga4-ecommerce-project-1-temp") \
    .mode("overwrite") \
    .save()
'''

In [None]:
dim_items = df.select(F.explode(F.col('items')).alias('items')).select(

    F.col("items.item_id").alias("item_id"),
    F.col("items.item_name").alias("item_name"),
    F.col("items.item_brand").alias("item_brand"),
    F.col("items.item_variant").alias("item_variant"),
    F.col("items.item_category").alias("item_category"),
    F.col("items.item_category2").alias("item_category2"),
    F.col("items.item_category3").alias("item_category3"),
    F.col("items.item_category4").alias("item_category4"),
    F.col("items.item_category5").alias("item_category5")
    
).dropDuplicates(["item_id"])

In [None]:
run_dq_check(dim_items, "item_id in dim_items should not be null", F.col("item_id").isNull())

invalid_df = dim_items.groupBy("item_id").count().filter("count > 1")
run_dq_check(invalid_df, "item_id in dim_items should be unique", F.lit(True))

In [None]:
# write_to_gcs_for_dbt(dim_items, "dim_items", 'ga4-ecommerce-project-1', format="parquet")

In [None]:
write_local_for_dbt(dim_items, "dim_items")

In [None]:
'''
dim_items.write \
    .format("bigquery") \
    .option("table", f"{project_id}.pyspark_output.dim_items") \
    .option("temporaryGcsBucket", "ga4-ecommerce-project-1-temp") \
    .mode("overwrite") \
    .save()
'''

In [None]:
fact_event_items = df.select(F.col('event_id'), F.explode(F.col('items')).alias('items')).select(
    
    F.col('event_id'),
    F.col('items.item_id').alias('item_id'),
    F.col('items.quantity').alias('quantity'),
    F.col('items.price').alias('price'),
    F.col('items.price_in_usd').alias('price_in_usd'),
    F.col('items.item_revenue_in_usd').alias('item_revenue_in_usd'),
    F.col('items.item_revenue').alias('item_revenue'),
    F.col('items.item_refund_in_usd').alias('item_refund_in_usd'),
    F.col('items.item_refund').alias('item_refund'),
    F.col('items.coupon').alias('coupon'),
    F.col('items.affiliation').alias('affiliation'),
    F.col('items.location_id').alias('location_id'),
    F.col('items.item_list_id').alias('item_list_id'),
    F.col('items.item_list_name').alias('item_list_name'),
    F.col('items.item_list_index').alias('item_list_index'),
    F.col('items.promotion_id').alias('promotion_id'),
    F.col('items.promotion_name').alias('promotion_name'),
    F.col('items.creative_name').alias('creative_name'),
    F.col('items.creative_slot').alias('creative_slot')
)

In [None]:
tst_2 = fact_event_items.toPandas()

In [None]:
tst_2.head(10) 

In [None]:
fact_event_items.withColumn('revenue', F.col('price') * F.col('quantity')).filter(F.col('revenue').isNotNull()).count()

In [None]:
fact_event_items.filter(F.col('price').isNotNull()).count()

In [None]:
run_dq_check(fact_event_items, "event_id in bridge table should not be null", F.col("event_id").isNull())
run_dq_check(fact_event_items, "item_id in bridge table should not be null", F.col("item_id").isNull())

orphaned_items_by_event = fact_event_items.join(fact_events, on="event_id", how="left_anti")
run_dq_check(orphaned_items_by_event, "All items must link to a valid event", F.lit(True))

orphaned_items_by_item = fact_event_items.join(dim_items, on="item_id", how="left_anti")
run_dq_check(orphaned_items_by_item, "All event items must link to a valid item in dim_items", F.lit(True))

In [None]:
write_local_for_dbt(fact_event_items, "fact_event_items")

In [None]:
# write_to_gcs_for_dbt(fact_event_items, "fct_event_items", 'ga4-ecommerce-project-1', format="parquet")

In [None]:
'''
fact_event_items.write \
    .format("bigquery") \
    .option("table", f"{project_id}.pyspark_output.fact_event_items") \
    .option("temporaryGcsBucket", "ga4-ecommerce-project-1-temp") \
    .mode("overwrite") \
    .save()
'''