In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, ArrayType
from delta.tables import DeltaTable

In [0]:
spark = SparkSession.builder \
    .appName("Mixpanel to Delta") \
    .getOrCreate()

In [0]:
df = spark.read.format("json") \
    .option("multiLine", "false") \
    .option("inferSchema", "true") \
    .load(f"/Volumes/workspace/dashtoon_data/data/part-00000-tid-717492462703940002-1f7f4721-a439-4329-9e81-fd25cf4b9e97-412-1-c000.json.gz")

df = df.withColumn("event_date", to_date(from_unixtime(col("time"), "yyyy-MM-dd")))

# Show result with all columns
df.show(truncate=False)

In [0]:
from pyspark.sql.functions import year, month, dayofmonth

# Extract year, month, and day from event_date column
df = df.withColumn("year", year(col("event_date"))) \
       .withColumn("month", month(col("event_date"))) \
       .withColumn("day", dayofmonth(col("event_date")))


In [0]:
df.show(10)

In [0]:
df.printSchema()

In [0]:
# Define Column Name Cleaning Functions
# Functions to clean column names with invalid characters
def clean_column_name(name):
    """Replace invalid characters in column names with underscores"""
    import re
    # Replace invalid characters with underscores
    return re.sub(r'[ ,;{}\(\)\n\t=]', '_', name)

def clean_schema(schema, prefix=""):
    """Recursively clean column names in a schema"""
    clean_fields = []
    
    for field in schema.fields:
        clean_name = clean_column_name(field.name)
        
        # Handle nested structures recursively
        if isinstance(field.dataType, StructType):
            clean_data_type = clean_schema(field.dataType, prefix + clean_name + ".")
            clean_field = StructField(clean_name, clean_data_type, field.nullable)
        elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
            element_type = clean_schema(field.dataType.elementType, prefix + clean_name + ".")
            clean_data_type = ArrayType(element_type, field.dataType.containsNull)
            clean_field = StructField(clean_name, clean_data_type, field.nullable)
        else:
            clean_field = StructField(clean_name, field.dataType, field.nullable)
        
        clean_fields.append(clean_field)
    
    return StructType(clean_fields)


# Clean Top-Level Column Names
print("Cleaning column names...")
# Create a clean schema
clean_schema_struct = clean_schema(df.schema)

In [0]:
column_mappings = []
for original_field, clean_field in zip(df.schema, clean_schema_struct):
    if original_field.name != clean_field.name:
        column_mappings.append((original_field.name, clean_field.name))
        df = df.withColumnRenamed(original_field.name, clean_field.name)

In [0]:
# Clean Properties Struct and Handle Duplicates
# For nested fields in properties struct
if "properties" in [f.name for f in df.schema.fields]:
    print("Found properties struct, cleaning nested column names...")
    
    # Get all properties fields from schema
    props_schema = [f for f in df.schema.fields if f.name == "properties"][0].dataType
    
    # First, print all property field names for debugging
    print("All property fields:")
    for i, field in enumerate(props_schema.fields):
        print(f"  {i}: {field.name}")
    
    # Create a map to track field names case-insensitively
    field_name_map = {}
    clean_props = []
    
    # First pass - catalog all field names and their cleaned versions
    for i, field in enumerate(props_schema.fields):
        original_name = field.name
        clean_name = clean_column_name(original_name)
        
        # Track by lowercase name for case-insensitive deduplication
        key = clean_name.lower()
        
        if key not in field_name_map:
            field_name_map[key] = []
        
        field_name_map[key].append((i, original_name, clean_name))
    
    # Second pass - create expressions with unique names
    for key, entries in field_name_map.items():
        # If multiple entries with same name (case-insensitive), add suffixes
        if len(entries) > 1:
            print(f"Found {len(entries)} duplicates for '{key}':")
            for idx, (i, original_name, _) in enumerate(entries):
                unique_name = f"{key}_{idx + 1}"
                print(f"  Renaming: properties.`{original_name}` -> {unique_name}")
                clean_props.append(expr(f"properties.`{original_name}`").alias(unique_name))
        else:
            # Single entry - just use the cleaned name
            i, original_name, clean_name = entries[0]
            if original_name != clean_name:
                print(f"  Cleaning: properties.`{original_name}` -> {clean_name}")
            clean_props.append(expr(f"properties.`{original_name}`").alias(clean_name))
    
    # Create new properties struct with unique column names
    print(f"Creating new properties struct with {len(clean_props)} unique fields")
    df = df.withColumn("clean_properties", struct(*clean_props))
    df = df.drop("properties").withColumnRenamed("clean_properties", "properties")

# Print a few rows to verify structure
print("Sample data after cleaning:")
df.select("event_name", "properties.*").show(2, truncate=True)

In [0]:
delta_table_path = "dbfs:/Volumes/workspace/dashtoon_data/data/mixpanel_events_data"
delta_table_name = "workspace.dashtoon_data.mixpanel_events"

# delta_table_path = "dbfs:/Volumes/workspace/dashtoon_data/data/incremental_data"
# delta_table_name = "workspace.dashtoon_data.incremental_data"

write_options = {
    "format": "delta",
    "partitionBy": ["year", "month", "day"],
    "mode": "append"
}

# Write Data to Delta Format inside the Volume
print("Writing to Delta table...")
df.write \
    .format(write_options["format"]) \
    .partitionBy(*write_options["partitionBy"]) \
    .mode(write_options["mode"]) \
    .option("mergeSchema", "true") \
    .save(delta_table_path)

print(f"Successfully processed Mixpanel data into Delta format at: {delta_table_path}")

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable(delta_table_name)

In [0]:
df.printSchema()

In [0]:
# Import necessary libraries
from pyspark.sql import functions as F
from pyspark.sql.types import StructType

# Function to flatten the properties struct with better name handling
def flatten_mixpanel_df(df):
    """
    Flatten the properties struct in Mixpanel data with better unique column naming
    """
    # Keep original top-level columns
    base_columns = [col for col in df.columns if col != "properties"]
    
    # Generate column expressions with explicit aliases that preserve uniqueness
    properties_cols = []
    
    # First, collect all field names to check for potential collisions
    all_field_names = [field.name for field in df.schema["properties"].dataType.fields]
    
    for field in df.schema["properties"].dataType.fields:
        field_name = field.name
        
        # Preserve dollar sign with underscore to maintain uniqueness
        if field_name.startswith('$'):
            column_name = f"prop_dollar_{field_name[1:].replace('.', '_').replace('?', '_q_')}"
        elif field_name.startswith('payload.'):
            column_name = f"prop_payload_{field_name[8:].replace('.', '_').replace('?', '_q_')}"
        elif field_name.startswith('payload?'):
            column_name = f"prop_payload_q_{field_name[8:].replace('.', '_').replace('?', '_q_')}"
        else:
            column_name = f"prop_{field_name.replace('.', '_').replace('?', '_q_')}"
            
        # Add original field to new dataframe with explicit alias
        properties_cols.append(F.col(f"properties.`{field_name}`").alias(column_name))
    
    # Select all columns (base + flattened properties)
    return df.select(*[F.col(c) for c in base_columns], *properties_cols)

# Use the enhanced function
flattened_df = flatten_mixpanel_df(df)

In [0]:
# Write to Delta table
flattened_df.write.format("delta").mode("overwrite").saveAsTable("workspace.dashtoon_data.flattened_mixpanel_events")

In [0]:
flattened_df.printSchema()

In [0]:
# Load mixpanel_events DataFrame from Delta Table
mixpanel_events_df = spark.read.format("delta").load("dbfs:/Volumes/workspace/dashtoon_data/data/mixpanel_events_data")

# Register the DataFrame as a Temporary View to use in SQL Queries
mixpanel_events_df.createOrReplaceTempView("mixpanel_events")

In [0]:
#User Profile Query Db
query = """
WITH subscription_events AS (
  SELECT 
    user_id,
    time,
    event_name AS sub_state,
    ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY time DESC) AS rn
  FROM mixpanel_events
  WHERE event_name LIKE '%subscription%'
),

first_show_events AS (
  SELECT DISTINCT 
    user_id,
    properties.showName AS first_show,
    ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY time) AS rn
  FROM mixpanel_events
  WHERE event_name IN ('showOpen', 'reelOpen')
),

appsflyer_data AS (
  SELECT 
    user_id,
    properties.media_source AS media_source,
    properties.campaign AS campaign,
    properties.`payload.adset` AS adset
  FROM mixpanel_events
  WHERE event_name = 'appsFlyerInstall'
)

SELECT 
  ue.user_id,
  MIN(ue.distinct_id) AS distinct_id,
  MAX(properties.email) AS user_email,
  MAX(properties.`$os`) AS os,
  MAX(properties.`$city`) AS city,
  MAX(properties.`$model`) AS model,
  MAX(properties.`$region`) AS country_code,
  MAX(CASE WHEN af.adset IS NULL THEN properties.`payload.adset` ELSE af.adset END) AS adset,
  MAX(CASE WHEN af.campaign IS NULL THEN properties.campaign ELSE af.campaign END) AS campaign,
  MAX(CASE WHEN af.media_source IS NULL THEN properties.media_source ELSE af.media_source END) AS media_source,
  MIN(ue.time) AS first_user_time,
  MAX(fs.first_show) AS first_show,
  MIN(CASE WHEN event_name = 'show1ActivatedLevel1' THEN properties.showName END) AS first_activated_show,
  COUNT(DISTINCT CASE WHEN event_name = 'reelFinish' THEN CONCAT(properties.showId, properties.reelId) END) AS total_reels_seen,
  COUNT(DISTINCT CASE WHEN event_name = 'reelFinish' THEN properties.showId END) AS total_show_seen,
  COUNT(CASE WHEN event_name = 'adPaid' THEN ue.user_id END) AS total_ads_seen,
  COUNT(DISTINCT sub.sub_state) AS is_subscriber,
  COUNT(DISTINCT CASE WHEN sub.sub_state IN ('subscriptionPurchased', 'subscriptionRenewed') THEN ue.user_id END) AS is_active_subscriber
FROM 
  mixpanel_events ue
LEFT JOIN 
  subscription_events sub ON ue.user_id = sub.user_id AND sub.rn = 1
LEFT JOIN 
  first_show_events fs ON ue.user_id = fs.user_id AND fs.rn = 1
LEFT JOIN 
  appsflyer_data af ON ue.user_id = af.user_id
GROUP BY 
  ue.user_id
"""

# Execute the Query
user_profile_df = spark.sql(query)

In [0]:
#Extra 
# Write to Delta table
user_profile_df.write.format("delta").mode("overwrite").saveAsTable("workspace.dashtoon_data.new_user_profile")


In [0]:
%sql
select * from workspace.dashtoon_data.new_user_profile limit 10 ;

In [0]:
%sql
DESCRIBE TABLE workspace.dashtoon_data.new_user_profile;

In [0]:
%sql
SELECT DISTINCT event_name, properties.revenue
FROM mixpanel_events
WHERE event_name = 'adPaid'
LIMIT 10;


In [0]:
#Ads arpu Db Query

query="""
WITH ad_revenue AS (
  SELECT 
    u.user_id,
    DATE_TRUNC('DAY', FROM_UNIXTIME(u.first_user_time)) AS aq_timeline,
    DATEDIFF(DAY, FROM_UNIXTIME(u.first_user_time), FROM_UNIXTIME(ue.time)) AS diff,
    SUM(ue.properties.revenue) AS ads_revenue
  FROM 
    workspace.dashtoon_data.new_user_profile u
  LEFT JOIN 
    mixpanel_events ue ON u.user_id = ue.user_id
  WHERE 
    ue.event_name = 'adPaid'
  GROUP BY 
    1, 2, 3
)

SELECT
  aq_timeline,
  ROUND(SUM(CASE WHEN diff = 0 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D0,
  ROUND(SUM(CASE WHEN diff = 1 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D1,
  ROUND(SUM(CASE WHEN diff = 2 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D2,
  ROUND(SUM(CASE WHEN diff = 3 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D3,
  ROUND(SUM(CASE WHEN diff = 4 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D4,
  ROUND(SUM(CASE WHEN diff = 5 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D5,
  ROUND(SUM(CASE WHEN diff = 6 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D6,
  ROUND(SUM(CASE WHEN diff = 7 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D7,
  ROUND(SUM(CASE WHEN diff = 14 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D14,
  ROUND(SUM(CASE WHEN diff = 28 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D28,
  ROUND(SUM(CASE WHEN diff = 60 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D60,
  ROUND(SUM(CASE WHEN diff = 90 THEN ads_revenue END)/COUNT(DISTINCT user_id), 2) AS D90
FROM 
  ad_revenue
GROUP BY 
  1
ORDER BY 
  1 DESC
"""

result_df = spark.sql(query)
result_df.show(10,truncate=False)

In [0]:
#Average watch time Db Query

query="""
WITH watch_time AS (
  SELECT 
    u.user_id,
    -- Option 1: Try using TO_DATE function
    TO_DATE(FROM_UNIXTIME(u.first_user_time)) AS aq_timeline,    
    DATEDIFF(DAY, FROM_UNIXTIME(u.first_user_time), FROM_UNIXTIME(ue.time)) AS diff,
    SUM(CAST(ue.properties.timeSpent AS INT)) AS timeSpent
  FROM 
    workspace.dashtoon_data.new_user_profile u
  LEFT JOIN 
    mixpanel_events ue ON u.user_id = ue.user_id
  WHERE 
    ue.event_name IN ('reelForegroundWatchTime')
    AND u.first_user_time IS NOT NULL -- Make sure to filter out NULL values
  GROUP BY 
    1, 2, 3
)

SELECT
  aq_timeline,
  ROUND(SUM(CASE WHEN diff = 0 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D0,
  ROUND(SUM(CASE WHEN diff = 1 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D1,
  ROUND(SUM(CASE WHEN diff = 2 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D2,
  ROUND(SUM(CASE WHEN diff = 3 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D3,
  ROUND(SUM(CASE WHEN diff = 4 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D4,
  ROUND(SUM(CASE WHEN diff = 5 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D5,
  ROUND(SUM(CASE WHEN diff = 6 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D6,
  ROUND(SUM(CASE WHEN diff = 7 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D7,
  ROUND(SUM(CASE WHEN diff = 14 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D14,
  ROUND(SUM(CASE WHEN diff = 28 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D28,
  ROUND(SUM(CASE WHEN diff = 60 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D60,
  ROUND(SUM(CASE WHEN diff = 90 THEN timeSpent END)/NULLIF(COUNT(DISTINCT user_id), 0), 2) AS D90
FROM 
  watch_time
GROUP BY 
  1
ORDER BY 
  1 DESC
"""

result_df = spark.sql(query)
result_df.show(10,truncate=False)

In [0]:
# Crosswalk Show DB Query

query="""
WITH show_2_activations AS (
  SELECT
    user_id,
    event_name,
    properties.showName,
    time
  FROM 
    mixpanel_events
),

ft AS (
  SELECT 
    u.first_activated_show AS show1,
    s2.showName AS show2,
    COUNT(DISTINCT s2.user_id) AS users
  FROM 
    workspace.dashtoon_data.new_user_profile u
  LEFT JOIN 
    show_2_activations s2 ON s2.user_id = u.user_id
  WHERE 
    s2.showName IS NOT NULL
    -- [[and u.country_code = {{country_code}}]]
    -- [[and u.os = {{os}}]]
  GROUP BY 
    1, 2
)

SELECT *
FROM (
  SELECT
    show1 AS first_show_activated,
    show2 AS top_cross_walk_shows,
    ROW_NUMBER() OVER (PARTITION BY show1 ORDER BY users DESC) AS rn
  FROM 
    ft
  WHERE 
    show1 IS NOT NULL
    -- [[and show1 = {{show_1}}]]
)
WHERE 
  rn <= 3
"""

result_df = spark.sql(query)
result_df.show(10,truncate=False)


In [0]:
#Overall Show details
query="""
SELECT 
  ue.properties.showName,
  ue.properties.showId,
  COUNT(DISTINCT CASE WHEN ue.event_name = 'showOpen' THEN u.user_id END) AS showOpen_users,
  COUNT(DISTINCT CASE WHEN ue.event_name = 'reelOpen' THEN u.user_id END) AS any_reelOpen_users,
  COUNT(DISTINCT CASE WHEN ue.event_name = 'reelOpen' AND CAST(ue.properties.reelSequence AS INT) = 1 THEN u.user_id END) AS `1st_reelOpen_users`,
  COUNT(DISTINCT CASE WHEN ue.event_name LIKE 'show%Activated' THEN u.user_id END) AS activated_users,
  COUNT(DISTINCT CASE WHEN ue.event_name LIKE 'show%Activated' THEN u.user_id END) / 
    NULLIF(COUNT(DISTINCT CASE WHEN ue.event_name = 'showOpen' THEN u.user_id END), 0) AS `activation%`,
  COUNT(DISTINCT CASE WHEN ue.event_name LIKE 'show%ActivatedLevel1' THEN u.user_id END) AS L1_activated_users,
  COUNT(DISTINCT CASE WHEN ue.event_name LIKE 'show%ActivatedLevel1' THEN u.user_id END) / 
    NULLIF(COUNT(DISTINCT CASE WHEN ue.event_name = 'showOpen' THEN u.user_id END), 0) AS `L1_activation%`,
  COUNT(DISTINCT CASE WHEN u.is_active_subscriber = 1 THEN u.user_id END) AS active_subscribers,
  SUM(CASE WHEN ue.event_name = 'adPaid' THEN ue.properties.revenue END) AS ads_revenue
FROM 
  workspace.dashtoon_data.new_user_profile u
LEFT JOIN 
  mixpanel_events ue ON u.user_id = ue.user_id
WHERE 
  TO_TIMESTAMP(u.first_user_time) >= TO_TIMESTAMP('2025-02-01')
  AND ue.properties.showId IS NOT NULL
  AND ue.properties.showName IS NOT NULL
GROUP BY 
  ue.properties.showName,
  ue.properties.showId
ORDER BY 
  ue.properties.showName
"""

result_df = spark.sql(query)
result_df.show(10,truncate=False)