In [5]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.window import Window

StatementMeta(, 295f52cc-59a7-4254-a293-6c750133c8b7, 12, Finished, Available, Finished)

In [3]:
# Read bronze data
responses_bronze = spark.read.table("responses")
questions = spark.read.table("new_questions3")
connections_bronze = spark.read.table("Connections")
llm_responses_bronze = spark.read.table("LLMResponses")
feedback_bronze = spark.read.table("Feedback")
feature_rankings_bronze = spark.read.table("FeatureRankings")

StatementMeta(, 295f52cc-59a7-4254-a293-6c750133c8b7, 10, Finished, Available, Finished)

In [6]:
# Silver layer: Deduplicate Connections
window_spec = Window.partitionBy("session_id", "event_type").orderBy(F.desc("event_timestamp"))
connections_silver = (
    connections_bronze
    .filter(F.col("is_deleted") == 0)  # exclude soft-deleted
    .withColumn("row_num", F.row_number().over(window_spec))
    .filter(F.col("row_num") == 1)  # keep latest event per session_id and event_type
    .drop("row_num")
)

StatementMeta(, 295f52cc-59a7-4254-a293-6c750133c8b7, 13, Finished, Available, Finished)

In [7]:
# Enrich responses with question text
responses_silver = (
    responses_bronze.alias("r")
    .join(questions.alias("q"), F.col("r.question_id") == F.col("q.id"), "left")
    .select(
        "r.id", "r.session_id", "r.response_text", 
        F.col("q.Category"), F.col("q.Question"), F.col("q.options")
    )
)

StatementMeta(, 295f52cc-59a7-4254-a293-6c750133c8b7, 14, Finished, Available, Finished)

In [10]:
# Write silver tables
connections_silver.write.format("delta").mode("overwrite").saveAsTable("connections_silver")
responses_silver.write.format("delta").mode("overwrite").saveAsTable("responses_silver")

StatementMeta(, 295f52cc-59a7-4254-a293-6c750133c8b7, 17, Finished, Available, Finished)

In [11]:
# Gold layer example: Session summary
feature_rankings = spark.read.table("FeatureRankings")

session_summary = (
    connections_silver.alias("c")
    .join(responses_silver.alias("r"), "session_id", "left")
    .join(feature_rankings.alias("f"), "session_id", "left")
    .groupBy("c.session_id")
    .agg(
        F.min("c.event_timestamp").alias("session_start"),
        F.max("c.event_timestamp").alias("session_end"),
        F.countDistinct("r.id").alias("num_responses"),
        F.countDistinct("f.feature_name").alias("num_features_ranked")
    )
)

StatementMeta(, 295f52cc-59a7-4254-a293-6c750133c8b7, 18, Finished, Available, Finished)

In [12]:
# Write gold table
session_summary.write.format("delta").mode("overwrite").saveAsTable("Gold_Session_Summary")

StatementMeta(, 295f52cc-59a7-4254-a293-6c750133c8b7, 19, Finished, Available, Finished)