In [0]:
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col

**CREATE CATALOG AND DATABASE**

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS web_catalog
MANAGED LOCATION 'abfss://unity-catalog-storage@dbstoragedlwvnba4d5k72.dfs.core.windows.net/2141168493853526';

CREATE DATABASE IF NOT EXISTS web_catalog.web_database;


**BRONZE TABLE: RAW GA4 EVENTS**

In [0]:
raw_data = (
    spark.read.format("csv")
    .option("header", True)
    #.option("dateFormat", "YYYY-MM-dd")
    .option("inferSchema", True)
    .load("abfss://dbfs-container@joyonuoha.dfs.core.windows.net/external/ga4_ingestion_output.csv")
    .withColumnRenamed("date", "event_date")
)



# change the date format from integer to date
raw_data = raw_data.withColumn("event_date", to_date(col("event_date"), "yyyyMMdd"))

display(raw_data)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS web_catalog.web_database.bronze (
    event_date date,
    country string,
    deviceCategory string,
    browser string,
    eventName string,
    activeUsers integer,
    engagedSessions integer,
    engagementRate double,
    averageSessionDuration double,
    userEngagementDuration integer,
    bounceRate double,
    eventsPerSession double,
    screenPageViews integer,
    screenPageViewsPerSession double,
    newUsers integer
)
USING DELTA

In [0]:
raw_data.write.format("delta").mode("append").saveAsTable("web_catalog.web_database.bronze")

**SILVER TRANSFORMATION**

In [0]:
%sql
-- Silver Layer: Cleaned and enriched data

CREATE OR REPLACE TABLE web_catalog.web_database.bricks_silver AS
SELECT 
    event_date,
    country,
    deviceCategory,
    browser,
    eventName,
    activeUsers,
    engagedSessions,
    engagementRate,
    averageSessionDuration,
    userEngagementDuration,
    bounceRate,
    eventsPerSession,
    screenPageViews,
    screenPageViewsPerSession,
    newUsers,
    current_timestamp() AS ingestion_time
FROM web_catalog.web_database.bronze
WHERE eventName IS NOT NULL
  AND engagedSessions IS NOT NULL
  AND isnan(engagementRate) = false;

**GOLD TRANSFORMATION**

In [0]:
%sql
-- Gold Layer: Aggregated business metrics

CREATE OR REPLACE TABLE web_catalog.web_database.bricks_gold AS
SELECT 
    country,
    browser,
    COUNT(DISTINCT engagedSessions) AS total_engaged_sessions,
    AVG(engagementRate) AS avg_engagement_rate,
    AVG(averageSessionDuration) AS avg_session_duration,
    AVG(eventsPerSession) AS avg_events_per_session,
    current_date() AS report_date
FROM web_catalog.web_database.bricks_silver
GROUP BY country, browser;
