In [0]:
%run "./01_config"

In [0]:
class Upserter:
    def __init__(self, merge_query, temp_view_name):
        self.merge_query = merge_query
        self.temp_view_name = temp_view_name

    def upsert(self, df_micro_batch, batch_id):
        df_micro_batch.createOrReplaceTempView(self.temp_view_name)
        df_micro_batch._jdf.sparkSession().sql(self.merge_query)

In [0]:
class Silver():
    def __init__(self, env):
        self.Conf = Config()
        self.landing_zone = self.Conf.landing + 'landing_zone'
        self.checkpoint_base = self.Conf.checkpoint + 'checkpoints'
        self.initial = self.Conf.medallion + "initial"
        self.bronze = self.Conf.medallion + "bronze"
        self.silver = self.Conf.medallion + "silver"
        self.gold = self.Conf.medallion + "gold"
        self.catalog = f"fitbit_{env}_catalog"
        self.db_name = self.Conf.db_name
        self.maxFilesPerTrigger = self.Conf.maxFilesPerTrigger
        spark.sql(f"USE {self.catalog}.{self.db_name}")

    def upsert_calories_daily_sl(self, once=True, processing_time="15 seconds", startingVersion=0):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.calories_daily_sl a
            USING calories_daily_sl_delta b
            ON a.user_id = b.user_id and a.date = b.date
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """
        data_upserter = Upserter(query, "calories_daily_sl_delta")

        df_delta = (spark.readStream
            .option("startingVersion", startingVersion)
            .option("ignoreDeletes", True)
            .table(f"{self.catalog}.{self.db_name}.calories_min_bz")
            .selectExpr("user_id", "activity_minute", "calories", "date")
            .withWatermark("activity_minute", "30 seconds")
            .dropDuplicates(["user_id", "activity_minute"])
            .groupBy("user_id", "date")
            .agg(F.sum("calories").alias("daily_calories"))
            .select("user_id", "daily_calories", "date")
        )
        return self._write_stream_update(df_delta, data_upserter, "calories_daily_sl", "calories_daily_sl_upsert_stream", "silver_p1", once, processing_time)
    
    def upsert_heartrate_min_sl(self, once=True, processing_time="15 seconds", startingVersion=0):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.heartrate_min_sl a
            USING heartrate_min_sl_delta b
            ON a.user_id = b.user_id and a.activity_minute = b.activity_minute
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """
        data_upserter = Upserter(query, "heartrate_min_sl_delta")

        df_delta = (spark.readStream
            .option("startingVersion", startingVersion)
            .option("ignoreDeletes", True)
            .table(f"{self.catalog}.{self.db_name}.heartrate_sec_bz")
            .selectExpr("user_id", "time", "value", "date")
            .withWatermark("time", "30 seconds")
            .dropDuplicates(["user_id", "time"])
            .withColumn("activity_minute", F.date_trunc("minute", F.col("time")))
            .select("user_id", "activity_minute", "value", "date")
            .groupBy("user_id", "activity_minute", "date")
            .agg(F.avg("value").alias("avg_heartrate"),
                 F.max("value").alias("max_heartrate"))
            .withColumn("timeKey", F.date_format("activity_minute", 'HH:mm:ss'))
            .select("user_id", "activity_minute", "avg_heartrate", "max_heartrate", "date", "timeKey")
        )

        return self._write_stream_update(df_delta, data_upserter, "heartrate_min_sl", "heartrate_min_sl_upsert_stream", "silver_p1", once, processing_time)
    
    def upsert_heartrate_daily_sl(self, once=True, processing_time="15 seconds", startingVersion=0):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.heartrate_daily_sl a
            USING heartrate_daily_sl_delta b
            ON a.user_id = b.user_id and a.date = b.date
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """
        data_upserter = Upserter(query, "heartrate_daily_sl_delta")

        df_delta = (spark.readStream
            .option("startingVersion", startingVersion)
            .option("ignoreDeletes", True)
            .table(f"{self.catalog}.{self.db_name}.heartrate_min_sl")
            .selectExpr("user_id", "activity_minute", "avg_heartrate", "max_heartrate", "date")
            .withWatermark("activity_minute", "30 seconds")
            .dropDuplicates(["user_id", "activity_minute"])
            .groupBy("user_id", "date")
            .agg(F.avg("avg_heartrate").alias("avg_heartrate"),
                 F.max("max_heartrate").alias("max_heartrate"))
            .select("user_id", "avg_heartrate", "max_heartrate", "date")
        )

        return self._write_stream_update(df_delta, data_upserter, "heartrate_daily_sl", "heartrate_daily_sl_upsert_stream", "silver_p2", once, processing_time)
    
    def upsert_intensities_daily_sl(self, once=True, processing_time="15 seconds", startingVersion=0):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.intensities_daily_sl a
            USING intensities_daily_sl_delta b
            ON a.user_id = b.user_id and a.date = b.date
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """
        data_upserter = Upserter(query, "intensities_daily_sl_delta")

        df_delta = (spark.readStream
            .option("startingVersion", startingVersion)
            .option("ignoreDeletes", True)
            .table(f"{self.catalog}.{self.db_name}.intensities_min_bz")
            .selectExpr("user_id", "activity_minute", "intensity", "date")
            .withWatermark("activity_minute", "30 seconds")
            .dropDuplicates(["user_id", "activity_minute"])
            .groupBy("user_id", "date")
            .agg(F.sum(when(F.col("intensity")==0, 1).otherwise(0)).alias("sedentary_minutes"),
                 F.sum(when(F.col("intensity")==1, 1).otherwise(0)).alias("lightly_active_minutes"),
                 F.sum(when(F.col("intensity")==2, 1).otherwise(0)).alias("fairly_active_minutes"),
                 F.sum(when(F.col("intensity")==3, 1).otherwise(0)).alias("very_active_minutes"))
            .select("user_id", "sedentary_minutes", "lightly_active_minutes", "fairly_active_minutes", "very_active_minutes", "date")
        )
        return self._write_stream_update(df_delta, data_upserter, "intensities_daily_sl", "intensities_daily_sl_upsert_stream", "silver_p1", once, processing_time)
    
    def upsert_sleep_daily_sl(self, once=True, processing_time="15 seconds", startingVersion=0):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.sleep_daily_sl a
            USING sleep_daily_sl_delta b
            ON a.user_id = b.user_id and a.date = b.date
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """
        data_upserter = Upserter(query, "sleep_daily_sl_delta")

        df_delta = (spark.readStream
            .option("startingVersion", startingVersion)
            .option("ignoreDeletes", True)
            .table(f"{self.catalog}.{self.db_name}.sleep_min_bz")
            .selectExpr("user_id", "activity_minute", "value","log_id", "date")
            .withWatermark("activity_minute", "30 seconds")
            .dropDuplicates(["user_id", "activity_minute"])
            .groupBy("user_id", "date", "log_id")
            .agg(F.sum(when(F.col("value")==1, 1).otherwise(0)).alias("asleep_minutes"),
                 F.sum(when(F.col("value")==2, 1).otherwise(0)).alias("Restless_minuts"),
                 F.sum(when(F.col("value")==3, 1).otherwise(0)).alias("awake_minutes"))
            .withColumn("total_minutes_in_bed",F.col("asleep_minutes") + F.col("Restless_minuts") + F.col("awake_minutes"))
            .select("user_id", "total_minutes_in_bed","asleep_minutes","Restless_minuts","awake_minutes", "log_id", "date")
        )
        return self._write_stream_update(df_delta, data_upserter, "sleep_daily_sl", "sleep_daily_sl_upsert_stream", "silver_p1", once, processing_time)
    
    def upsert_steps_daily_sl(self, once=True, processing_time="15 seconds", startingVersion=0):
        from pyspark.sql import functions as F
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.steps_daily_sl a
            USING steps_daily_sl_delta b
            ON a.user_id = b.user_id and a.date = b.date
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """
        data_upserter = Upserter(query, "steps_daily_sl_delta")

        df_delta = (spark.readStream
            .option("startingVersion", startingVersion)
            .option("ignoreDeletes", True)
            .table(f"{self.catalog}.{self.db_name}.steps_min_bz")
            .selectExpr("user_id", "activity_minute", "steps", "date")
            .withWatermark("activity_minute", "30 seconds")
            .dropDuplicates(["user_id", "activity_minute"])
            .groupBy("user_id", "date")
            .agg(F.sum("steps").alias("total_steps"))
            .select("user_id", "total_steps", "date")
        )
        return self._write_stream_update(df_delta, data_upserter, "steps_daily_sl", "steps_daily_sl_upsert_stream", "silver_p1", once, processing_time) 
    
    def upsert_user_list(self, once=True, processing_time="15 seconds", startingVersion=0):
        from pyspark.sql import functions as F
    
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.user_list a
            USING user_list_delta b
            ON a.user_id = b.user_id
            WHEN NOT MATCHED THEN INSERT *
        """
        data_upserter = Upserter(query, "user_list_delta")

        source_tables = [
            "calories_min_bz",
            "heartrate_sec_bz",
            "intensities_min_bz",
            "mets_min_bz",
            "sleep_min_bz",
            "steps_min_bz",
            "weight_daily_bz"
        ]

        # 3. 遍历表并进行 Union
        df_union = None
        for table_name in source_tables:
            temp_df = (spark.readStream
                .option("startingVersion", startingVersion)
                .option("ignoreDeletes", True)
                .table(f"{self.catalog}.{self.db_name}.{table_name}")
                .select("user_id") # 只取 ID 提高性能
            )
            
            if df_union is None:
                df_union = temp_df
            else:
                df_union = df_union.union(temp_df)

        # 4. 对合并后的流进行去重
        df_final = df_union.dropDuplicates(["user_id"])

        # 5. 写入流
        return self._write_stream_update(df_final, data_upserter, "user_list","user_list_upsert_stream", "silver_p3", once, processing_time)
    
    def upsert_date_list(self, once=True, processing_time="15 seconds", startingVersion=0):
        from pyspark.sql import functions as F
    
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.date_list a
            USING date_list_delta b
            ON a.date = b.date
            WHEN NOT MATCHED THEN INSERT *
        """
        data_upserter = Upserter(query, "date_list_delta")

        source_tables = [
            "calories_min_bz",
            "heartrate_sec_bz",
            "intensities_min_bz",
            "mets_min_bz",
            "sleep_min_bz",
            "steps_min_bz",
            "weight_daily_bz"
        ]

        # 3. 遍历表并进行 Union
        df_union = None
        for table_name in source_tables:
            temp_df = (spark.readStream
                .option("startingVersion", startingVersion)
                .option("ignoreDeletes", True)
                .table(f"{self.catalog}.{self.db_name}.{table_name}")
                .select("date") # 只取 ID 提高性能
            )
            
            if df_union is None:
                df_union = temp_df
            else:
                df_union = df_union.union(temp_df)

        # 4. 对合并后的流进行去重
        df_final = df_union.dropDuplicates(["date"])

        # 5. 写入流
        return self._write_stream_update(df_final, data_upserter, "date_list","date_list_upsert_stream", "silver_p3", once, processing_time)
  
    def _write_stream_update(self, df, upserter, path, query_name, pool, once, processing_time):
        stream_writer = (df.writeStream
            .foreachBatch(upserter.upsert)
            .outputMode("update")
            .option("checkpointLocation", f"{self.checkpoint_base}/{path}")
            .queryName(query_name)
        )
        spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
        if once:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()


    def _await_queries(self, once):
        if once:
            for stream in spark.streams.active:
                stream.awaitTermination()
    
    def upsert(self, once=True, processing_time="5 seconds"):
        import time
        start = int(time.time())
        print(f"\nExecuting silver layer upsert ...")

        self.upsert_calories_daily_sl(once, processing_time)
        self.upsert_heartrate_min_sl(once, processing_time)
        self.upsert_intensities_daily_sl(once, processing_time)
        self.upsert_sleep_daily_sl(once, processing_time)
        self.upsert_steps_daily_sl(once, processing_time)
        self._await_queries(once)
        print(f"Completed silver layer 1 upsert {int(time.time()) - start} seconds")
    
        self.upsert_heartrate_daily_sl(once, processing_time)
        self._await_queries(once)
        print(f"Completed silver layer 2 upsert {int(time.time()) - start} seconds")

        self.upsert_user_list(once, processing_time)
        self.upsert_date_list(once, processing_time)
        self._await_queries(once)
        print(f"Completed silver layer 3 upsert {int(time.time()) - start} seconds")