In [None]:
def bigquery_transform(df):
    
    #Daily Metrics:
    daily_df = df.withWatermark("timestamp","10 minutes")\
                .withColumn("date",to_date(df.timestamp))\
                .groupBy("date")\
                .agg(
                    approx_count_distinct(df.user_id).alias("total_users"),
                    approx_count_distinct(df.session_id).alias("total_sessions"),
                    mode("device").alias("most_common_device"),
                    mode("event_type").alias("most_common_event")
                )
    
    # Session Level Metrics:
    session_df = df.withWatermark("timestamp","10 minutes")\
                    .groupBy(df.session_id, df.user_id)\
                    .agg(
                        min(df.timestamp).alias("start_time"),
                        max(df.timestamp).alias("stop_time"),
                        count("*").alias("total_events"),
                        approx_count_distinct(df.page_url).alias("total_page_visits"),
                        mode("event_type").alias("most_common_event_type"),
                        mode("device").alias("most_common_device")
                    )
    
    # Purchase & Revenue Analysis:
    purchase_df = df.filter(df.event_type == "purchase").withWatermark("timestamp","10 minute")\
                    .withColumn("date",to_date(df.timestamp))\
                    .groupBy("date")\
                    .agg(
                        countDistinct(df.order_id).alias("total_orders"),
                        sum(df.total_amount).alias("total_revenue"),
                        avg(df.total_amount).alias("average_order_value")
                    )

    return daily_df, session_df, purchase_df


In [None]:
def bigquery_writer(daily_df, session_df, purchase_df):
    
    project_id=os.getenv("BQ_proj_id")
    dataset_id=os.getenv("BQ_dataset_id")
    
    df.write.format("bigquery")\
        .option("table",f"{project_id}.{dataset_id}.{table_name}")\
        .option("partitionField", "date")\
        .mode("append")\
        .save()

    session_table = "clickstream_sessions"
    daily_table = "daily_engagement"
    purchase_table = "purchase_metrics"


    query_daily = daily_df.writeStream.foreachBatch(lambda df, epoch_id: write_to_bq(df, epoch_id, daily_table)).outputMode("update").start()
    query_session = session_df.writeStream.foreachBatch(lambda df, epoch_id: write_to_bq(df, epoch_id, session_table)).outputMode("update").start()
    query_purchase = purchase_df.writeStream.foreachBatch(lambda df, epoch_id: write_to_bq(df, epoch_id, purchase_table)).outputMode("update").start()
    
    return query_daily, query_session, query_purchase