In [0]:
show catalogs;

catalog
samples
system
workspace


In [0]:
CREATE VOLUME IF NOT EXISTS workspace.default.my_volume;

In [0]:
%python
from pyspark.sql.functions import to_date, from_unixtime

df = spark.read.option("header", True) \
               .option("inferSchema", True) \
               .option("nullValue", "") \
               .csv("/Volumes/workspace/default/my_volume/events.csv")

In [0]:
%python
from pyspark.sql.functions import col

df = df.filter(
    (col("timestamp").cast("long").isNotNull()) &
    (col("timestamp").cast("long") > 1000000000000)
)

df = df.withColumn(
    "event_date",
    to_date(from_unixtime(col("timestamp").cast("long") / 1000))
)

In [0]:
%python
df.createOrReplaceTempView("events_staging")

## Static Files Loading

In [0]:
%python
df_items_1 = spark.read.option("header", True).csv("/Volumes/workspace/default/my_volume/item_properties_part1.csv")
df_items_1 = df_items_1.withColumn("itemid", col("itemid").cast("int")) \
                        .withColumn("timestamp", col("timestamp").cast("bigint"))
df_items_1.createOrReplaceTempView("items_staging_1")

df_items_2 = spark.read.option("header", True).csv("/Volumes/workspace/default/my_volume/item_properties_part2.csv")
df_items_2 = df_items_2.withColumn("itemid", col("itemid").cast("int")) \
                        .withColumn("timestamp", col("timestamp").cast("bigint"))
df_items_2.createOrReplaceTempView("items_staging_2")

df_category = spark.read.option("header", True).csv("/Volumes/workspace/default/my_volume/category_tree.csv")
df_category = df_category.withColumn("categoryid", col("categoryid").cast("int")) \
                         .withColumn("parentid", col("parentid").cast("int"))
df_category.createOrReplaceTempView("items_category")

## Dimensional tables

In [0]:
%python
print("Dim table creation starts")

In [0]:
CREATE OR REPLACE TABLE workspace.default.dim_item AS
SELECT
    ROW_NUMBER() OVER (ORDER BY itemid) AS item_sk,
    itemid
FROM (
    SELECT DISTINCT itemid FROM events_staging
    WHERE itemid IS NOT NULL
);


num_affected_rows,num_inserted_rows


In [0]:
DESCRIBE workspace.default.dim_item

col_name,data_type,comment
item_sk,int,
itemid,int,


In [0]:
CREATE OR REPLACE TABLE workspace.default.dim_category AS
SELECT
    ROW_NUMBER() OVER (ORDER BY categoryid) AS category_sk,
    categoryid,
    parentid
FROM items_category;


num_affected_rows,num_inserted_rows


In [0]:
DESCRIBE workspace.default.dim_category

col_name,data_type,comment
category_sk,int,
categoryid,int,
parentid,int,


In [0]:
CREATE OR REPLACE TABLE workspace.default.dim_user AS
SELECT
    ROW_NUMBER() OVER (ORDER BY visitorid) AS user_sk,
    visitorid
FROM (
    SELECT DISTINCT visitorid FROM events_staging
    WHERE visitorid IS NOT NULL
);


num_affected_rows,num_inserted_rows


In [0]:
DESCRIBE workspace.default.dim_user

col_name,data_type,comment
user_sk,int,
visitorid,int,


In [0]:
CREATE OR REPLACE TABLE workspace.default.dim_event AS
SELECT
    ROW_NUMBER() OVER (ORDER BY event) AS event_sk,
    event
FROM (
    SELECT DISTINCT event FROM events_staging
    WHERE event IS NOT NULL
);


num_affected_rows,num_inserted_rows


In [0]:
DESCRIBE workspace.default.dim_event

col_name,data_type,comment
event_sk,int,
event,string,


## Creating Fact Table

In [0]:
CREATE TABLE IF NOT EXISTS workspace.default.fact_events (
    timestamp BIGINT,
    event_date DATE,
    event_sk INT,
    user_sk INT,
    item_sk INT,
    transactionid INT
)
USING DELTA
PARTITIONED BY (event_date);

## ETL Pipeline Starts

### Creating ETL metadata

In [0]:
%python
print("ETL starts")

In [0]:
CREATE OR REPLACE TABLE workspace.default.etl_metadata (
    table_name STRING,
    last_loaded_date DATE
);

In [0]:
INSERT INTO workspace.default.etl_metadata VALUES ('fact_events', '2015-06-2');

num_affected_rows,num_inserted_rows
1,1


In [0]:
select * from workspace.default.etl_metadata

table_name,last_loaded_date
fact_events,2015-06-02


### Incremental data loading logic

In [0]:
%python
print("Incremental data loading starts")

In [0]:
%python
last_date_row = spark.sql("""
    SELECT last_loaded_date FROM workspace.default.etl_metadata 
    WHERE table_name = 'fact_events'
""").collect()

if last_date_row:
    last_loaded_date = last_date_row[0]['last_loaded_date']
    print(f"Last loaded date: {last_loaded_date}")
else:
    raise Exception("No metadata found for 'fact_events' in etl_metadata.")


In [0]:
%python
from pyspark.sql.functions import col

try:
    print("Starting ETL job...")

    print(f"Fetching new events after {last_loaded_date}")
    new_events = spark.sql(f"""
        SELECT * FROM events_staging
        WHERE event_date > DATE('{last_loaded_date}')
    """)

    print("Joining new events with dimension tables...")
    new_data = new_events.alias("e") \
        .join(spark.table("workspace.default.dim_event").alias("ev"), col("e.event") == col("ev.event"), "inner") \
        .join(spark.table("workspace.default.dim_user").alias("u"), col("e.visitorid") == col("u.visitorid"), "inner") \
        .join(spark.table("workspace.default.dim_item").alias("i"), col("e.itemid") == col("i.itemid"), "inner") \
        .select(
            col("e.timestamp"),
            col("e.event_date"),
            col("ev.event_sk"),
            col("u.user_sk"),
            col("i.item_sk"),
            col("e.transactionid")
        )

    print("Schema of transformed data:")
    new_data.printSchema()

    count = new_data.count()
    print(f"New records to load: {count}")

    if count > 0:
        print("Writing new data to fact_events table...")
        new_data.write.format("delta") \
            .mode("append") \
            .partitionBy("event_date") \
            .saveAsTable("workspace.default.fact_events")

        max_date = new_data.agg({"event_date": "max"}).collect()[0]["max(event_date)"]
        print(f"Updating metadata with last loaded date: {max_date}")

        spark.sql(f"""
            DELETE FROM workspace.default.etl_metadata
            WHERE table_name = 'fact_events'
        """)
        spark.sql(f"""
            INSERT INTO workspace.default.etl_metadata
            VALUES ('fact_events', DATE('{max_date}'))
        """)

        print("ETL job completed successfully.")
        dbutils.notebook.exit("SUCCESS")
    else:
        print("No new data to load.")
        dbutils.notebook.exit("SUCCESS")

except Exception as e:
    print("ETL job failed.")
    print(str(e))
    dbutils.notebook.exit(f"FAILURE: {str(e)}")


root
 |-- timestamp: long (nullable = true)
 |-- event_date: date (nullable = true)
 |-- event_sk: integer (nullable = true)
 |-- user_sk: integer (nullable = true)
 |-- item_sk: integer (nullable = true)
 |-- transactionid: integer (nullable = true)



## Testing

##### how many events of a type?

In [0]:
SELECT ev.event, COUNT(*) as total_events
FROM workspace.default.fact_events f
JOIN workspace.default.dim_event ev ON f.event_sk = ev.event_sk
GROUP BY ev.event;

event,total_events
addtocart,53916
view,2047961
transaction,17440


##### Items with most views

In [0]:
SELECT item.itemid as item_id, count(*) as viewCount
FROM
workspace.default.fact_events f 
JOIN workspace.default.dim_event ev ON f.event_sk = ev.event_sk and ev.event = 'view'
JOIN workspace.default.dim_item item ON item.item_sk = f.item_sk
GROUP BY item.itemid
ORDER BY viewCount DESC
limit 1

item_id,viewCount
187946,3403


##### Active users by day

In [0]:
SELECT 
  event_date,
  COUNT(DISTINCT user_sk) AS daily_active_users
FROM workspace.default.fact_events
GROUP BY event_date
ORDER BY event_date;

event_date,daily_active_users
2015-06-03,13414
2015-06-04,13088
2015-06-05,12137
2015-06-06,9698
2015-06-07,9423
2015-06-08,12521
2015-06-09,13684
2015-06-10,13995
2015-06-11,12387
2015-06-12,9281
