In [None]:
"""
Ignite 2025 - Fabric Delta Tables Builder (PySpark)

This script loads JSON data from the Lakehouse Files, flattens nested structures,
joins session metadata with VTT analysis, and creates Delta tables in the Lakehouse.

Workflow:
1. Load sessions_metadata.json from metadata folder
2. Load sessions_analysis_full.json from analysis folder
3. Flatten nested structures (speakers, tags, topics)
4. Left join metadata with analysis
5. Create dimension and fact tables
6. Save as Delta tables in Lakehouse Tables section
"""

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime

# Configuration
# In Fabric, use relative paths starting with 'Files/'
BASE_PATH = "Files/Ignite2025_All"
METADATA_FILE = f"{BASE_PATH}/metadata/sessions_metadata.json"
ANALYSIS_FILE = f"{BASE_PATH}/analysis/sessions_analysis_full.json"

# Get Spark session
spark = SparkSession.builder.getOrCreate()

print("=" * 80)
print("Ignite 2025 - Fabric Delta Tables Builder (PySpark)")
print("=" * 80)


# ==============================================================================
# STEP 1: LOAD AND FLATTEN SESSIONS METADATA
# ==============================================================================
print("\nüìÇ STEP 1: Loading sessions metadata...")

# Load JSON with nested structures
df_sessions_raw = spark.read.option("multiLine", "true").json(METADATA_FILE)

print(f"‚úÖ Loaded {df_sessions_raw.count()} sessions from metadata")
print(f"   Columns: {', '.join(df_sessions_raw.columns)}")

# Flatten sessions metadata
df_sessions = df_sessions_raw.select(
    "session_id",
    "session_code",
    "title",
    "description",
    "level",
    "session_type",
    "duration_minutes",
    "start_time",
    "end_time",
    "speaker_names",
    "location",
    "venue",
    "room",
    "slide_deck_url",
    "has_slides",
    "video_url",
    "has_video",
    "captions_url",
    "extracted_at",
    # Keep nested arrays for dimension tables
    F.col("speakers").alias("speakers_array"),
    F.col("tags").alias("tags_array"),
    F.col("topics").alias("topics_array"),
)

# Parse timestamps
df_sessions = df_sessions \
    .withColumn("start_time", F.to_timestamp("start_time")) \
    .withColumn("end_time", F.to_timestamp("end_time")) \
    .withColumn("extracted_at", F.to_timestamp("extracted_at"))

print(f"‚úÖ Flattened sessions metadata: {df_sessions.count()} rows")


# ==============================================================================
# STEP 2: LOAD AND FLATTEN ANALYSIS DATA
# ==============================================================================
print("\nüìÇ STEP 2: Loading analysis data...")

# Load analysis JSON
df_analysis_raw = spark.read.option("multiLine", "true").json(ANALYSIS_FILE)

print(f"‚úÖ Loaded {df_analysis_raw.count()} analyzed sessions")

# Flatten analysis data
df_analysis = df_analysis_raw.select(
    "session_code",
    F.col("session_title").alias("analyzed_title"),
    "summary",
    "target_audience",
    "technical_level",
    "analyzed_at",
    # Flatten arrays to pipe-separated strings for fact table
    F.when(F.col("key_topics").isNotNull(), 
           F.concat_ws("|", F.col("key_topics")))
     .otherwise(F.lit(None)).alias("key_topics"),
    F.when(F.col("microsoft_features_mentioned").isNotNull(), 
           F.concat_ws("|", F.col("microsoft_features_mentioned")))
     .otherwise(F.lit(None)).alias("microsoft_features_mentioned"),
    F.when(F.col("new_announcements").isNotNull(), 
           F.concat_ws("|", F.col("new_announcements")))
     .otherwise(F.lit(None)).alias("new_announcements"),
    F.when(F.col("demos_described").isNotNull(), 
           F.concat_ws("|", F.col("demos_described")))
     .otherwise(F.lit(None)).alias("demos_described"),
    F.when(F.col("best_practices").isNotNull(), 
           F.concat_ws("|", F.col("best_practices")))
     .otherwise(F.lit(None)).alias("best_practices"),
    F.when(F.col("key_quotes").isNotNull(), 
           F.concat_ws("|", F.col("key_quotes")))
     .otherwise(F.lit(None)).alias("key_quotes"),
    F.when(F.col("action_items").isNotNull(), 
           F.concat_ws("|", F.col("action_items")))
     .otherwise(F.lit(None)).alias("action_items"),
)

# Add flag for successful analysis (based on summary being populated)
df_analysis = df_analysis.withColumn(
    "has_analysis",
    F.when(F.col("summary").isNotNull() & (F.col("summary") != ""), True)
     .otherwise(False)
)

# Parse analyzed_at timestamp
df_analysis = df_analysis.withColumn(
    "analyzed_at", 
    F.to_timestamp("analyzed_at")
)

print(f"‚úÖ Flattened analysis data: {df_analysis.count()} rows")


# ==============================================================================
# STEP 3: LEFT JOIN TO CREATE UNIFIED TABLE
# ==============================================================================
print("\nüîó STEP 3: Creating unified table (left join)...")

df_unified = df_sessions.join(
    df_analysis,
    on="session_code",
    how="left"
)

# Fill has_analysis with False for sessions without analysis
df_unified = df_unified.fillna({"has_analysis": False})

sessions_with_analysis = df_unified.filter(F.col("has_analysis") == True).count()
sessions_without_analysis = df_unified.filter(F.col("has_analysis") == False).count()

print(f"‚úÖ Unified table created: {df_unified.count()} rows")
print(f"   Sessions with analysis: {sessions_with_analysis}")
print(f"   Sessions without analysis: {sessions_without_analysis}")


# ==============================================================================
# STEP 4: CREATE DIMENSION TABLES
# ==============================================================================

# ------------------------------------------------------------------------------
# Dim_Date
# ------------------------------------------------------------------------------
print("\nüìÖ Creating Dim_Date...")

df_dates = df_unified.select(
    F.col("start_time").alias("datetime")
).union(
    df_unified.select(F.col("end_time").alias("datetime"))
).filter(F.col("datetime").isNotNull())

dim_date = df_dates.select(
    F.date_format("datetime", "yyyyMMdd").alias("date_key"),
    F.to_date("datetime").alias("date"),
    F.year("datetime").alias("year"),
    F.month("datetime").alias("month"),
    F.date_format("datetime", "MMMM").alias("month_name"),
    F.dayofmonth("datetime").alias("day"),
    F.date_format("datetime", "EEEE").alias("day_of_week"),
    F.quarter("datetime").alias("quarter")
).distinct().orderBy("date_key")

print(f"‚úÖ Dim_Date created: {dim_date.count()} rows")

# ------------------------------------------------------------------------------
# Dim_Session (with surrogate key)
# ------------------------------------------------------------------------------
print("\nüìä Creating Dim_Session...")

dim_session = df_unified.select(
    "session_id",
    "session_code",
    "title",
    "description",
    "duration_minutes",
    "start_time",
    "end_time",
    "has_slides",
    "has_video",
    "has_analysis",
    "slide_deck_url",
    "video_url"
).distinct() \
 .withColumn("session_key", F.monotonically_increasing_id() + 1) \
 .withColumn("start_date_key", F.date_format("start_time", "yyyyMMdd")) \
 .withColumn("end_date_key", F.date_format("end_time", "yyyyMMdd"))

# Reorder columns to put session_key first
dim_session = dim_session.select(
    "session_key",
    "session_id",
    "session_code",
    "title",
    "description",
    "duration_minutes",
    "start_time",
    "end_time",
    "start_date_key",
    "end_date_key",
    "has_slides",
    "has_video",
    "has_analysis",
    "slide_deck_url",
    "video_url"
)

print(f"‚úÖ Dim_Session created: {dim_session.count()} rows")

# ------------------------------------------------------------------------------
# Dim_SessionType
# ------------------------------------------------------------------------------
print("\nüß© Creating Dim_SessionType...")

dim_session_type = df_unified.select("session_type") \
    .distinct() \
    .filter(F.col("session_type").isNotNull()) \
    .orderBy("session_type") \
    .withColumn("session_type_key", F.monotonically_increasing_id() + 1)

print(f"‚úÖ Dim_SessionType created: {dim_session_type.count()} rows")

# ------------------------------------------------------------------------------
# Dim_SessionLevel
# ------------------------------------------------------------------------------
print("\nüß© Creating Dim_SessionLevel...")

dim_session_level = df_unified.select(
    F.col("level").alias("session_level")
).distinct() \
 .filter(F.col("session_level").isNotNull()) \
 .orderBy("session_level") \
 .withColumn("session_level_key", F.monotonically_increasing_id() + 1)

print(f"‚úÖ Dim_SessionLevel created: {dim_session_level.count()} rows")

# ------------------------------------------------------------------------------
# Dim_Location
# ------------------------------------------------------------------------------
print("\nüß© Creating Dim_Location...")

dim_location = df_unified.select("venue", "room", "location") \
    .distinct() \
    .filter(
        (F.col("venue").isNotNull() & (F.col("venue") != "")) |
        (F.col("room").isNotNull() & (F.col("room") != "")) |
        (F.col("location").isNotNull() & (F.col("location") != ""))
    ) \
    .orderBy("venue", "room") \
    .withColumn("location_key", F.monotonically_increasing_id() + 1)

print(f"‚úÖ Dim_Location created: {dim_location.count()} rows")

# ------------------------------------------------------------------------------
# Dim_Speaker (unique speakers only)
# ------------------------------------------------------------------------------
print("\nüë§ Creating Dim_Speaker...")

# Explode speakers array and get unique speakers
dim_speaker = df_unified.select(
    F.explode_outer("speakers_array").alias("speaker")
).select(
    F.col("speaker.fullName").alias("speaker_name"),
    F.col("speaker.title").alias("speaker_title"),
    F.col("speaker.company").alias("speaker_company")
).filter(F.col("speaker_name").isNotNull()) \
 .distinct() \
 .orderBy("speaker_name") \
 .withColumn("speaker_key", F.monotonically_increasing_id() + 1)

# Reorder columns
dim_speaker = dim_speaker.select(
    "speaker_key",
    "speaker_name",
    "speaker_title",
    "speaker_company"
)

print(f"‚úÖ Dim_Speaker created: {dim_speaker.count()} rows")

# ------------------------------------------------------------------------------
# Dim_Tag (unique tags only)
# ------------------------------------------------------------------------------
print("\nüè∑Ô∏è Creating Dim_Tag...")

# Explode tags array and get unique tags
dim_tag = df_unified.select(
    F.explode_outer("tags_array").alias("tag_name")
).filter(F.col("tag_name").isNotNull()) \
 .distinct() \
 .orderBy("tag_name") \
 .withColumn("tag_key", F.monotonically_increasing_id() + 1)

print(f"‚úÖ Dim_Tag created: {dim_tag.count()} rows")

# ------------------------------------------------------------------------------
# Dim_Topic (unique topics only)
# ------------------------------------------------------------------------------
print("\nüè∑Ô∏è Creating Dim_Topic...")

# Explode topics array and get unique topics
dim_topic = df_unified.select(
    F.explode_outer("topics_array").alias("topic")
).select(
    F.when(F.col("topic.displayValue").isNotNull(), F.col("topic.displayValue"))
     .otherwise(F.col("topic.logicalValue")).alias("topic_name")
).filter(F.col("topic_name").isNotNull()) \
 .distinct() \
 .orderBy("topic_name") \
 .withColumn("topic_key", F.monotonically_increasing_id() + 1)

print(f"‚úÖ Dim_Topic created: {dim_topic.count()} rows")


# ==============================================================================
# STEP 5: CREATE BRIDGE TABLES (Many-to-Many)
# ==============================================================================

# ------------------------------------------------------------------------------
# Bridge_SessionSpeaker
# ------------------------------------------------------------------------------
print("\nüîó Creating Bridge_SessionSpeaker...")

bridge_session_speaker = df_unified.select(
    "session_code",
    F.explode_outer("speakers_array").alias("speaker")
).select(
    "session_code",
    F.col("speaker.fullName").alias("speaker_name")
).filter(F.col("speaker_name").isNotNull()) \
 .join(dim_session.select("session_key", "session_code"), on="session_code", how="inner") \
 .join(dim_speaker.select("speaker_key", "speaker_name"), on="speaker_name", how="inner") \
 .select("session_key", "speaker_key") \
 .distinct()

print(f"‚úÖ Bridge_SessionSpeaker created: {bridge_session_speaker.count()} rows")

# ------------------------------------------------------------------------------
# Bridge_SessionTag
# ------------------------------------------------------------------------------
print("\nüîó Creating Bridge_SessionTag...")

bridge_session_tag = df_unified.select(
    "session_code",
    F.explode_outer("tags_array").alias("tag_name")
).filter(F.col("tag_name").isNotNull()) \
 .join(dim_session.select("session_key", "session_code"), on="session_code", how="inner") \
 .join(dim_tag, on="tag_name", how="inner") \
 .select("session_key", "tag_key") \
 .distinct()

print(f"‚úÖ Bridge_SessionTag created: {bridge_session_tag.count()} rows")

# ------------------------------------------------------------------------------
# Bridge_SessionTopic
# ------------------------------------------------------------------------------
print("\nüîó Creating Bridge_SessionTopic...")

bridge_session_topic = df_unified.select(
    "session_code",
    F.explode_outer("topics_array").alias("topic")
).select(
    "session_code",
    F.when(F.col("topic.displayValue").isNotNull(), F.col("topic.displayValue"))
     .otherwise(F.col("topic.logicalValue")).alias("topic_name")
).filter(F.col("topic_name").isNotNull()) \
 .join(dim_session.select("session_key", "session_code"), on="session_code", how="inner") \
 .join(dim_topic, on="topic_name", how="inner") \
 .select("session_key", "topic_key") \
 .distinct()

print(f"‚úÖ Bridge_SessionTopic created: {bridge_session_topic.count()} rows")


# ==============================================================================
# STEP 6: CREATE FACT TABLE WITH SURROGATE KEYS
# ==============================================================================
print("\nüìà Creating Fact_SessionAnalysis...")

# Prepare fact table with surrogate keys
fact_base = df_unified.select(
    "session_code",
    "session_type",
    F.col("level").alias("session_level"),
    "venue",
    "room",
    "location",
    "start_time",
    "has_analysis",
    "summary",
    "key_topics",
    "microsoft_features_mentioned",
    "new_announcements",
    "demos_described",
    "best_practices",
    "target_audience",
    "technical_level",
    "key_quotes",
    "action_items",
    "analyzed_at"
).distinct()

# Join to get session_key
fact_base = fact_base.join(
    dim_session.select("session_key", "session_code"),
    on="session_code",
    how="left"
)

# Add start_date_key
fact_base = fact_base.withColumn(
    "start_date_key",
    F.date_format("start_time", "yyyyMMdd")
)

# Join to get session_type_key
fact_base = fact_base.join(
    dim_session_type,
    on="session_type",
    how="left"
)

# Join to get session_level_key
fact_base = fact_base.join(
    dim_session_level,
    on="session_level",
    how="left"
)

# Join to get location_key
fact_base = fact_base.join(
    dim_location,
    on=["venue", "room", "location"],
    how="left"
)

# Select final fact columns
fact_analysis = fact_base.select(
    "session_key",
    "start_date_key",
    "session_type_key",
    "session_level_key",
    "location_key",
    "has_analysis",
    "summary",
    "key_topics",
    "microsoft_features_mentioned",
    "new_announcements",
    "demos_described",
    "best_practices",
    "target_audience",
    "technical_level",
    "key_quotes",
    "action_items",
    "analyzed_at"
)

# Add metric columns (counts)
fact_analysis = fact_analysis \
    .withColumn("key_topics_count", 
                F.when(F.col("key_topics").isNotNull(), 
                       F.size(F.split(F.col("key_topics"), "\\|")))
                 .otherwise(0)) \
    .withColumn("features_count",
                F.when(F.col("microsoft_features_mentioned").isNotNull(),
                       F.size(F.split(F.col("microsoft_features_mentioned"), "\\|")))
                 .otherwise(0)) \
    .withColumn("announcements_count",
                F.when(F.col("new_announcements").isNotNull(),
                       F.size(F.split(F.col("new_announcements"), "\\|")))
                 .otherwise(0)) \
    .withColumn("best_practices_count",
                F.when(F.col("best_practices").isNotNull(),
                       F.size(F.split(F.col("best_practices"), "\\|")))
                 .otherwise(0)) \
    .withColumn("action_items_count",
                F.when(F.col("action_items").isNotNull(),
                       F.size(F.split(F.col("action_items"), "\\|")))
                 .otherwise(0))

print(f"‚úÖ Fact_SessionAnalysis created: {fact_analysis.count()} rows")


# ==============================================================================
# STEP 7: SAVE AS DELTA TABLES IN LAKEHOUSE
# ==============================================================================
print("\nüíæ Saving tables to Lakehouse (Delta format)...")

tables = {
    "Dim_Date": dim_date,
    "Dim_Session": dim_session,
    "Dim_SessionType": dim_session_type,
    "Dim_SessionLevel": dim_session_level,
    "Dim_Location": dim_location,
    "Dim_Speaker": dim_speaker,
    "Dim_Tag": dim_tag,
    "Dim_Topic": dim_topic,
    "Bridge_SessionSpeaker": bridge_session_speaker,
    "Bridge_SessionTag": bridge_session_tag,
    "Bridge_SessionTopic": bridge_session_topic,
    "Fact_SessionAnalysis": fact_analysis,
}

for table_name, df in tables.items():
    print(f"\n   üíæ Saving {table_name}...")
    
    # Write as Delta table (overwrites if exists)
    df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(table_name)
    
    print(f"   ‚úÖ {table_name} saved: {df.count()} rows")

# Also save the unified table for reference
print(f"\n   üíæ Saving Unified_Sessions...")
df_unified.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("Unified_Sessions")

print(f"   ‚úÖ Unified_Sessions saved: {df_unified.count()} rows")


# ==============================================================================
# SUMMARY
# ==============================================================================
print("\n" + "=" * 80)
print("‚úÖ DELTA TABLES CREATION COMPLETE")
print("=" * 80)

print("\nüìä Tables created in Lakehouse:")
print(f"  ‚Ä¢ Unified_Sessions: {df_unified.count():,} rows √ó {len(df_unified.columns)} columns")
print(f"  ‚Ä¢ Dim_Date: {dim_date.count():,} rows")
print(f"  ‚Ä¢ Dim_Session: {dim_session.count():,} rows")
print(f"  ‚Ä¢ Dim_SessionType: {dim_session_type.count():,} rows")
print(f"  ‚Ä¢ Dim_SessionLevel: {dim_session_level.count():,} rows")
print(f"  ‚Ä¢ Dim_Location: {dim_location.count():,} rows")
print(f"  ‚Ä¢ Dim_Speaker: {dim_speaker.count():,} rows")
print(f"  ‚Ä¢ Dim_Tag: {dim_tag.count():,} rows")
print(f"  ‚Ä¢ Dim_Topic: {dim_topic.count():,} rows")
print(f"  ‚Ä¢ Bridge_SessionSpeaker: {bridge_session_speaker.count():,} rows")
print(f"  ‚Ä¢ Bridge_SessionTag: {bridge_session_tag.count():,} rows")
print(f"  ‚Ä¢ Bridge_SessionTopic: {bridge_session_topic.count():,} rows")
print(f"  ‚Ä¢ Fact_SessionAnalysis: {fact_analysis.count():,} rows")

print("\nüìä Analysis Coverage:")
print(f"  ‚Ä¢ Sessions with analysis: {sessions_with_analysis:,} ({sessions_with_analysis/df_unified.count()*100:.1f}%)")
print(f"  ‚Ä¢ Sessions without analysis: {sessions_without_analysis:,}")

print("\nüîó Power BI Relationships (Star Schema):")
print("  ‚ñ™ Direct to Fact (1:*):")
print("    ‚Ä¢ Dim_Session[session_key] ‚Üí Fact_SessionAnalysis[session_key]")
print("    ‚Ä¢ Dim_Date[date_key] ‚Üí Fact_SessionAnalysis[start_date_key]")
print("    ‚Ä¢ Dim_SessionType[session_type_key] ‚Üí Fact_SessionAnalysis[session_type_key]")
print("    ‚Ä¢ Dim_SessionLevel[session_level_key] ‚Üí Fact_SessionAnalysis[session_level_key]")
print("    ‚Ä¢ Dim_Location[location_key] ‚Üí Fact_SessionAnalysis[location_key]")
print("  ‚ñ™ Many-to-Many via Bridges (1:*):")
print("    ‚Ä¢ Dim_Speaker[speaker_key] ‚Üí Bridge_SessionSpeaker[speaker_key]")
print("    ‚Ä¢ Dim_Tag[tag_key] ‚Üí Bridge_SessionTag[tag_key]")
print("    ‚Ä¢ Dim_Topic[topic_key] ‚Üí Bridge_SessionTopic[topic_key]")
print("    ‚Ä¢ Dim_Session[session_key] ‚Üí Bridge_SessionSpeaker[session_key]")
print("    ‚Ä¢ Dim_Session[session_key] ‚Üí Bridge_SessionTag[session_key]")
print("    ‚Ä¢ Dim_Session[session_key] ‚Üí Bridge_SessionTopic[session_key]")

print("\n‚ú® Done! All tables are now available in the Lakehouse Tables section.")

StatementMeta(, cb5b91d7-3ef1-43d8-ad19-a39fed23476a, 5, Finished, Available, Finished)

Ignite 2025 - Fabric Delta Tables Builder (PySpark)

üìÇ STEP 1: Loading sessions metadata...
‚úÖ Loaded 1090 sessions from metadata
   Columns: captions_url, description, duration_minutes, end_time, extracted_at, has_slides, has_video, learning_path, level, location, room, session_code, session_id, session_type, slide_deck_url, speaker_names, speakers, start_time, tags, title, topics, venue, video_url
‚úÖ Flattened sessions metadata: 1090 rows

üìÇ STEP 2: Loading analysis data...
‚úÖ Loaded 498 analyzed sessions
‚úÖ Flattened analysis data: 498 rows

üîó STEP 3: Creating unified table (left join)...
‚úÖ Unified table created: 1090 rows
   Sessions with analysis: 498
   Sessions without analysis: 592

üìÖ Creating Dim_Date...
‚úÖ Dim_Date created: 5 rows

üìä Creating Dim_Session...
‚úÖ Dim_Session created: 1090 rows

üß© Creating Dim_SessionType...
‚úÖ Dim_SessionType created: 8 rows

üß© Creating Dim_SessionLevel...
‚úÖ Dim_SessionLevel created: 5 rows

üß© Creating Dim_Loca