In [0]:
%pip install --upgrade pip databricks-sdk==0.74.0


In [0]:
%restart_python

In [0]:
from databricks.sdk.version import __version__
print(f"Databricks SDK version: {__version__}")


In [0]:
catalog = "main"
schema = "dbdemos_aibi_cme_marketing_campaign"

In [0]:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")

In [0]:
spark.sql(f"""
CREATE VOLUME IF NOT EXISTS {catalog}.{schema}.raw_data
""")

In [0]:
current_user = spark.sql("SELECT session_user() AS current_user").collect()[0].current_user

In [0]:
import zipfile

zip_path = f"/Workspace/Users/{current_user}/UIUC_Workshop/data.zip"
extract_dir = f"/Volumes/{catalog}/{schema}/raw_data"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

files = dbutils.fs.ls(f"{extract_dir}")
display(files)

In [0]:
import re

volume_path = f"/Volumes/{catalog}/{schema}/raw_data"
files = dbutils.fs.ls(volume_path)
json_files = [f.path for f in files if f.path.endswith('.json')]

for file_path in json_files:
    file_name = file_path.split('/')[-1]
    table_name = re.sub(r'\.json$', '', file_name)
    df = spark.read.json(file_path)
    df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{table_name}")

In [0]:
%sql
CREATE OR REPLACE FUNCTION main.dbdemos_aibi_cme_marketing_campaign.compute_cost_per_click(
  cam_id INT COMMENT 'Campaign ID, e.g., 168',
  start_dt STRING COMMENT 'Start date for analysis, e.g., 2024-01-01',
  end_dt STRING COMMENT 'End date for analysis, e.g., 2024-01-31'
)
RETURNS TABLE
COMMENT 'Returns cost, total unique clicks, and cost_per_click for a single campaign'
RETURN
SELECT
  c.campaign_id, c.campaign_name, c.cost,
  COALESCE(COUNT(DISTINCT CASE WHEN e.event_type = 'click' THEN e.contact_id END), 0) AS unique_clicks,
  CASE
    WHEN COUNT(DISTINCT CASE WHEN e.event_type = 'click' THEN e.contact_id END) = 0 
    THEN c.cost
    ELSE c.cost / COUNT(DISTINCT CASE WHEN e.event_type = 'click' THEN e.contact_id END)
  END AS cost_per_click
FROM main.dbdemos_aibi_cme_marketing_campaign.campaigns c
LEFT JOIN main.dbdemos_aibi_cme_marketing_campaign.events e ON c.campaign_id = e.campaign_id 
  AND e.event_date >= start_dt AND e.event_date <= end_dt
WHERE c.campaign_id = cam_id
GROUP BY c.campaign_id, c.campaign_name, c.cost;

CREATE OR REPLACE FUNCTION main.dbdemos_aibi_cme_marketing_campaign.compute_spam_ratio(
  cam_id INT COMMENT 'Campaign ID, e.g., 168',
  start_dt STRING COMMENT 'Start date for analysis, e.g., 2024-01-01',
  end_dt STRING COMMENT 'End date for analysis, e.g., 2024-01-31'
)
RETURNS TABLE
COMMENT 'Returns total_delivered, total_spam, and spam_ratio for a single campaign'
RETURN
SELECT
  c.campaign_id, c.campaign_name,
  SUM(CASE WHEN e.event_type = 'delivered' THEN 1 ELSE 0 END) AS total_delivered,
  SUM(CASE WHEN e.event_type = 'spam' THEN 1 ELSE 0 END) AS total_spam,
  CASE
    WHEN SUM(CASE WHEN e.event_type = 'delivered' THEN 1 ELSE 0 END) = 0 THEN 0.0
    ELSE (SUM(CASE WHEN e.event_type = 'spam' THEN 1 ELSE 0 END) * 1.0) / 
         SUM(CASE WHEN e.event_type = 'delivered' THEN 1 ELSE 0 END)
  END AS spam_ratio
FROM main.dbdemos_aibi_cme_marketing_campaign.campaigns c
LEFT JOIN main.dbdemos_aibi_cme_marketing_campaign.events e ON c.campaign_id = e.campaign_id 
  AND e.event_date >= start_dt AND e.event_date <= end_dt
WHERE c.campaign_id = cam_id
GROUP BY c.campaign_id, c.campaign_name;

In [0]:
import requests
import pprint
import json
import time
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Get the SQL warehouse ID
for wh in w.warehouses.list():
    wh_id = wh.id

# Find list of current Genie spaces
current_spaces = [s.title for s in w.genie.list_spaces().as_shallow_dict()['spaces']]

# Load the serialized definition of the Genie space
with open('marketing_campaign_genie_space.json', 'r') as file:
    genie_space_json = json.load(file)

if genie_space_json['title'] not in current_spaces:
    w.genie.create_space(title=genie_space_json['title'], description=genie_space_json['description'], warehouse_id=wh_id, serialized_space=genie_space_json['serialized_space'])
    print('Genie space created!')
else:
    print('Genie space already exists!')
