In [0]:
!pip install kaggle

In [0]:
import os

os.environ["KAGGLE_USERNAME"] = "sumeshmajee"
os.environ["KAGGLE_KEY"] = "KGAT_e8b80db00821caf2bf9e6a6690d004c4"

print("Kaggle credentials configured!")

In [0]:
spark.sql("""
CREATE SCHEMA IF NOT EXISTS workspace.ecommerce
""")

In [0]:
spark.sql("""
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.ecommerce_data
""")

In [0]:
%sh
cd /Volumes/workspace/ecommerce/ecommerce_data
kaggle datasets download -d mkechinov/ecommerce-behavior-data-from-multi-category-store

In [0]:
%sh
cd /Volumes/workspace/ecommerce/ecommerce_data
unzip -o ecommerce-behavior-data-from-multi-category-store.zip
ls -lh

In [0]:
%sh
cd /Volumes/workspace/ecommerce/ecommerce_data
rm -f ecommerce-behavior-data-from-multi-category-store.zip
ls -lh

In [0]:
%restart_python

In [0]:
df_n = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv")

In [0]:
df = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")

In [0]:
print(f"October 2019 - Total Events: {df.count():,}")
print("\n" + "="*60)
print("SCHEMA:")
print("="*60)
df.printSchema()

In [0]:
print("\n" + "="*60)
print("SAMPLE DATA (First 5 rows):")
print("="*60)
df.show(5, truncate=False)

Day 1


In [0]:
# Create simple DataFrame
data = [("iPhone", 999), ("Samsung", 799), ("MacBook", 1299)]
df = spark.createDataFrame(data, ["product", "price"])
df.show()

# Filter expensive products
df.filter(df.price > 1000).show() 


In [0]:
import os
import getpass

# 1. Enter your token securely when prompted (It won't be saved in the file)
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# --- Everything else stays the same ---
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 2. Update Remote
!git -c safe.directory='*' remote set-url origin {auth_url}

# 3. Add files (Now the file doesn't contain the secret!)
!git -c safe.directory='*' add .

# 4. Commit
!git -c safe.directory='*' commit -m "Secure push using getpass"

# 5. Push
!git -c safe.directory='*' push origin main

Day 2

In [0]:
# Load data
events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)


In [0]:

# Basic operations
events.select("event_time", "product_id", "price").show(10)
events.filter("price > 100").count()
events.groupBy("event_time").count().show()
top_brands = events.groupBy("brand").count().orderBy("count", ascending=False).limit(5)


In [0]:
import os
import getpass

# 1. UNDO the last commit (so we can save it again with the private email)
!git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY (Using your GitHub no-reply alias)
# This format (username@users.noreply.github.com) hides your real email
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. COMMIT & PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}
!git -c safe.directory='*' commit -m "Day 2"
!git -c safe.directory='*' push origin main

Day 3

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Top 5 products by revenue
revenue = (
    events.filter(F.col("event_type") == "purchase")
    .groupBy("product_id", "product_id")
    .agg(F.sum("price").alias("revenue"))
    .orderBy(F.desc("revenue"))
    .limit(5)
)

# Running total per user
window = Window.partitionBy("user_id").orderBy("event_time")
events_with_cumulative = events.withColumn(
    "cumulative_events",
    F.count("*").over(window)
)

# Conversion rate by category (replace pivot with conditional aggregation)
conversion = (
    events.groupBy("category_code")
    .agg(
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchase"),
        F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("view")
    )
    .withColumn(
        "conversion_rate",
        F.col("purchase") / F.col("view") * 100
    )
)

display(revenue)
display(events_with_cumulative)
display(conversion)

In [0]:
import os
import getpass

# 1. UNDO the last commit (so we can try the process again from a clean state)
# We do this to ensure the commit and pull happen in the right order.
!git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. COMMIT, PULL (WITH FIX), THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}
!git -c safe.directory='*' commit -m "Day 3"

# FIX IS HERE: We added '--no-rebase' to tell Git to use the default merge strategy
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main

Day 4

In [0]:
# Use a Unity Catalog volume directory path, not a file
volume_path = "/Volumes/workspace/ecommerce/ecommerce_data/events_delta"

# Write DataFrame as Delta table to the directory
events.write.format("delta").mode("overwrite").save(volume_path)

# Register the Delta table in Unity Catalog
events.write.format("delta").saveAsTable("workspace.ecommerce.events_table")

# SQL approach to create a managed Delta table
spark.sql("""
    CREATE TABLE workspace.ecommerce.events_delta
    USING DELTA
    AS SELECT * FROM workspace.ecommerce.events_table
""")

# Test schema enforcement
try:
    wrong_schema = spark.createDataFrame([("a","b","c")], ["x","y","z"])
    wrong_schema.write.format("delta").mode("append").save(volume_path)
except Exception as e:
    print(f"Schema enforcement: {e}")

In [0]:
import os
import getpass

# # 1. UNDO the last commit (so we can try the process again from a clean state)
# # We do this to ensure the commit and pull happen in the right order.
# !git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. ADD, COMMIT, PULL, THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}

# --- FIX: ADD FILES FIRST ---
!git -c safe.directory='*' add . 
# ----------------------------

!git -c safe.directory='*' commit -m "Day 4"

# Pull just in case there are remote changes
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main

Day 5


In [0]:
from delta.tables import DeltaTable
volume_path = "/Volumes/workspace/ecommerce/ecommerce_data/events_delta"
# MERGE for incremental updates
deltaTable = DeltaTable.forPath(spark, volume_path)
updates = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)

deltaTable.alias("t").merge(
    updates.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Time travel
v0 = spark.read.format("delta").option("versionAsOf", 0).load(volume_path)
yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01").load(volume_path)

# Optimize
spark.sql("OPTIMIZE events_table ZORDER BY (event_type, user_id)")
spark.sql("VACUUM events_table RETAIN 168 HOURS")


In [0]:
import os
import getpass

# # 1. UNDO the last commit (so we can try the process again from a clean state)
# # We do this to ensure the commit and pull happen in the right order.
# !git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. ADD, COMMIT, PULL, THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}

# --- FIX: ADD FILES FIRST ---
!git -c safe.directory='*' add . 
# ----------------------------

!git -c safe.directory='*' commit -m "Day 5"

# Pull just in case there are remote changes
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main

Day 6


In [0]:
from pyspark.sql import functions as F
# BRONZE: Raw ingestion
raw = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)
raw = raw.withColumn("price", F.col("price").cast("double"))
raw.withColumn("ingestion_ts", F.current_timestamp()) \
   .write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/bronze_events")

# SILVER: Cleaned data
bronze = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/bronze_events")
silver = bronze.filter(F.col("price") > 0) \
    .filter(F.col("price") < 10000) \
    .dropDuplicates(["user_session", "event_time"]) \
    .withColumn("event_date", F.to_date("event_time")) \
    .withColumn("price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium"))
silver.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/silver_events")

# GOLD: Aggregates
silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/silver_events")
product_perf = silver.groupBy("product_id") \
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", "user_id")).alias("views"),
        F.countDistinct(F.when(F.col("event_type")=="purchase", "user_id")).alias("purchases"),
        F.sum(F.when(F.col("event_type")=="purchase", F.col("price"))).alias("revenue")
    ).withColumn(
        "conversion_rate",
        F.when(F.col("views") > 0, F.col("purchases")/F.col("views")*100).otherwise(None)
    )
product_perf.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/gold_products")


In [0]:
import os
import getpass

# # 1. UNDO the last commit (so we can try the process again from a clean state)
# # We do this to ensure the commit and pull happen in the right order.
# !git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. ADD, COMMIT, PULL, THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}

# --- FIX: ADD FILES FIRST ---
!git -c safe.directory='*' add . 
# ----------------------------

!git -c safe.directory='*' commit -m "Day 6"

# Pull just in case there are remote changes
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main

Day 7


In [0]:
dbutils.widgets.text("source_path", "/Volumes/workspace/ecommerce/ecommerce_data/bronze_events")
dbutils.widgets.dropdown("layer", "bronze", ["bronze","silver","gold"])

# Use parameters
source = dbutils.widgets.get("source_path")
layer = dbutils.widgets.get("layer")

def run_layer(layer_name):
    if layer_name == "bronze":
        # Bronze logic
        df = spark.read.csv(source, header=True, inferSchema=True)
        df = df.withColumn("price", F.col("price").cast("double"))
        df = df.withColumn("ingestion_ts", F.current_timestamp())
        df.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/bronze_events")
    elif layer_name == "silver":
        # Silver logic
        bronze = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/bronze_events")
        silver = bronze.filter(F.col("price") > 0) \
            .filter(F.col("price") < 10000) \
            .dropDuplicates(["user_session", "event_time"]) \
            .withColumn("event_date", F.to_date("event_time")) \
            .withColumn("price_tier",
                F.when(F.col("price") < 10, "budget")
                 .when(F.col("price") < 50, "mid")
                 .otherwise("premium"))
        silver.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/silver_events")
    elif layer_name == "gold":
        # Gold logic
        silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/silver_events")
        product_perf = silver.groupBy("product_id") \
            .agg(
                F.countDistinct(F.when(F.col("event_type")=="view", "user_id")).alias("views"),
                F.countDistinct(F.when(F.col("event_type")=="purchase", "user_id")).alias("purchases"),
                F.sum(F.when(F.col("event_type")=="purchase", F.col("price"))).alias("revenue")
            ).withColumn(
                "conversion_rate",
                F.when(F.col("views") > 0, F.col("purchases")/F.col("views")*100).otherwise(None)
            )
        product_perf.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/gold_products")

# To schedule this notebook to run daily at 2 AM:
# 1. In Databricks, go to "Jobs" > "Create Job".
# 2. Select this notebook as the task.
# 3. Set the schedule to "Daily" and choose "2:00 AM" as the start time.
# 4. Save the job.

# UI: Create Databricks Job
# Task 1: bronze_layer (notebook)
# Task 2: silver_layer (notebook, depends on Task 1)
# Task 3: gold_layer (notebook, depends on Task 2)
# Schedule: Daily at 2 AM
# Use the Databricks Jobs UI to configure these tasks and dependencies.


In [0]:
import os
import getpass

# # 1. UNDO the last commit (so we can try the process again from a clean state)
# # We do this to ensure the commit and pull happen in the right order.
# !git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. ADD, COMMIT, PULL, THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}

# --- FIX: ADD FILES FIRST ---
!git -c safe.directory='*' add . 
# ----------------------------

!git -c safe.directory='*' commit -m "Day 7"

# Pull just in case there are remote changes
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main

Day 8

In [0]:
%sql
DROP TABLE IF EXISTS gold.products; 

In [0]:
%sql
-- Create structure
CREATE CATALOG IF NOT EXISTS ecommerce;
USE CATALOG ecommerce;
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;

-- Register tables as MANAGED (no LOCATION clause)
CREATE TABLE IF NOT EXISTS bronze.events 
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/ecommerce_data/bronze_events`;
CREATE TABLE IF NOT EXISTS silver.events 
USING DELTA 
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/ecommerce_data/silver_events`;
CREATE OR REPLACE TABLE gold.products
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/ecommerce_data/gold_products`;
-- Controlled view
CREATE OR REPLACE VIEW gold.top_products AS
SELECT product_id, revenue, conversion_rate
FROM gold.products
WHERE purchases > 10
ORDER BY revenue DESC LIMIT 100;

In [0]:
import os
import getpass

# # 1. UNDO the last commit (so we can try the process again from a clean state)
# # We do this to ensure the commit and pull happen in the right order.
# !git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. ADD, COMMIT, PULL, THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}

# --- FIX: ADD FILES FIRST ---
!git -c safe.directory='*' add . 
# ----------------------------

!git -c safe.directory='*' commit -m "Day 8"

# Pull just in case there are remote changes
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main

Day 9

In [0]:
%sql
-- 1. Overall Performance Metrics (Replacing Category Funnel)
-- Calculates the total funnel across all products
SELECT
  SUM(views) as total_views,
  SUM(purchases) as total_purchases,
  SUM(revenue) as total_revenue,
  ROUND(SUM(purchases) * 100.0 / SUM(views), 2) as global_conversion_rate
FROM gold.products;

-- 2. Top 10 Revenue Generators (Replacing Daily Revenue)
-- Focuses on top performing products since we don't have dates
SELECT
  product_id,
  revenue,
  purchases,
  views
FROM gold.products
ORDER BY revenue DESC
LIMIT 10;

-- 3. Top Converting Products (High Efficiency)
-- Shows products effectively converting views to sales (filtered for noise)
SELECT
  product_id,
  views,
  purchases,
  conversion_rate
FROM gold.products
WHERE views > 50  -- Filter to avoid 1 view / 1 purchase anomalies
ORDER BY conversion_rate DESC
LIMIT 10;

In [0]:
import os
import getpass

# # 1. UNDO the last commit (so we can try the process again from a clean state)
# # We do this to ensure the commit and pull happen in the right order.
# !git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. ADD, COMMIT, PULL, THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}

# --- FIX: ADD FILES FIRST ---
!git -c safe.directory='*' add . 
# ----------------------------

!git -c safe.directory='*' commit -m "Day 9"

# Pull just in case there are remote changes
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main

Day 10


In [0]:
%python
import time

# 1. Ensure context
spark.sql("USE CATALOG ecommerce")
spark.sql("USE SCHEMA gold")

# 2. Setup: Create the table for optimization (if not exists)
spark.sql("""
  CREATE OR REPLACE TABLE gold.products_opt
  USING DELTA
  AS SELECT * FROM gold.products
""")

# 3. Optimize
# Serverless handles file management, but ZORDER is still useful for layout
print("--- Optimizing (Z-Ordering) ---")
spark.sql("OPTIMIZE gold.products_opt ZORDER BY (product_id, revenue)")

# 4. Get a valid ID for benchmarking (so we don't get Count: 0)
# We fetch one existing product_id to test with
valid_id_row = spark.sql("SELECT product_id FROM gold.products_opt LIMIT 1").collect()

if valid_id_row:
    target_id = valid_id_row[0]['product_id']
    print(f"\n--- Benchmarking for Product ID: {target_id} ---")

    start = time.time()
    count = spark.sql(f"SELECT * FROM gold.products_opt WHERE product_id = {target_id}").count()
    end = time.time()

    print(f"✅ Records found: {count}")
    print(f"⏱️ Time taken: {end - start:.4f}s")
else:
    print("⚠️ Table is empty. Cannot run benchmark.")

# Note: .cache() is removed because Serverless Compute caches hot data automatically.

In [0]:
import os
import getpass

# # 1. UNDO the last commit (so we can try the process again from a clean state)
# # We do this to ensure the commit and pull happen in the right order.
# !git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. ADD, COMMIT, PULL, THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}

# --- FIX: ADD FILES FIRST ---
!git -c safe.directory='*' add . 
# ----------------------------

!git -c safe.directory='*' commit -m "Day 10"

# Pull just in case there are remote changes
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main

Day 11

In [0]:
%python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Descriptive stats
print("--- Price Stats ---")
events.describe(["price"]).show()

# 2. Hypothesis: weekday vs weekend conversion
# FIX: Changed 'event_date' to 'event_time'
weekday = events.withColumn("is_weekend", 
    F.dayofweek("event_time").isin([1,7])) 

print("--- Weekend Activity ---")
weekday.groupBy("is_weekend", "event_type").count().show()

# 3. Correlation
# NOTE: Skipped because 'conversion_rate' is not in the column list you provided.
# If you need this, you must calculate conversion_rate per user/product first.
# print(events.stat.corr("price", "conversion_rate")) 

# 4. Feature engineering
# FIX: Changed 'event_date' to 'event_time'
features = events.withColumn("hour", F.hour("event_time")) \
    .withColumn("day_of_week", F.dayofweek("event_time")) \
    .withColumn("price_log", F.log(F.col("price")+1)) \
    .withColumn("time_since_first_view",
        # FIX: Cast both to 'long' to perform subtraction in seconds
        F.col("event_time").cast("long") - 
        F.min("event_time").over(Window.partitionBy("user_id")).cast("long")
    )

print("--- Feature Engineering Sample ---")
features.select("user_id", "event_time", "time_since_first_view").show(5)

In [0]:
import os
import getpass

# # 1. UNDO the last commit (so we can try the process again from a clean state)
# # We do this to ensure the commit and pull happen in the right order.
# !git -c safe.directory='*' reset --soft HEAD~1

# 2. SET PRIVATE IDENTITY
os.environ["GIT_AUTHOR_NAME"] = "Sumesh Majee"
os.environ["GIT_AUTHOR_EMAIL"] = "SRMajee@users.noreply.github.com"
os.environ["GIT_COMMITTER_NAME"] = "Sumesh Majee"
os.environ["GIT_COMMITTER_EMAIL"] = "SRMajee@users.noreply.github.com"

# 3. ASK FOR TOKEN
print("Paste your GitHub Token below and hit Enter:")
GITHUB_TOKEN = getpass.getpass()

# 4. PREPARE URL
USERNAME = "SRMajee"
REPO_NAME = "DataBricks_Challenge"
auth_url = f"https://{GITHUB_TOKEN}@github.com/{USERNAME}/{REPO_NAME}.git"

# 5. ADD, COMMIT, PULL, THEN PUSH
!git -c safe.directory='*' remote set-url origin {auth_url}

# --- FIX: ADD FILES FIRST ---
!git -c safe.directory='*' add . 
# ----------------------------

!git -c safe.directory='*' commit -m "Day 11"

# Pull just in case there are remote changes
!git -c safe.directory='*' pull origin main --no-rebase --no-edit

# NOW PUSH
!git -c safe.directory='*' push origin main