In [0]:
%run "./01_config"

In [0]:
class Bronze():
    def __init__(self, env):
        self.Conf = Config()
        self.landing_zone = self.Conf.base_dir_data + "/raw"
        self.checkpoint_base = self.Conf.base_dir_checkpoint + "/checkpoints"
        self.catalog = f"sbit_{env}_catalog"
        self.db_name = self.Conf.db_name
        spark.sql(f"USE {self.catalog}.{self.db_name}")

    def consume_user_registration(self, once=True, processing_time="5 seconds"):
        from pyspark.sql import functions as F
        schema = "user_id long, device_id long, mac_address string, registration_timestamp double"
        
        df_stream = (spark.readStream
                        .format("cloudFiles")
                        .schema(schema)
                        .option("maxFilesPerTrigger", 1)
                        .option("cloudFiles.format", "csv")
                        .option("header", "true")
                        .load(self.landing_zone + "/registered_users_bz")
                        .withColumn("load_time", F.current_timestamp())
                        .withColumn("source_file", F.col("_metadata.file_path"))
                    )

        stream_writer = df_stream.writeStream \
            .format("delta") \
            .option("checkpointLocation", self.checkpoint_base + "/registered_users_bz") \
            .outputMode("append") \
            .queryName("registered_users_bz_ingestion_stream")
        
        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "bronze_p2")
        
        if once == True:
            return stream_writer.trigger(availableNow=True).toTable(f"{self.catalog}.{self.db_name}.registered_users_bz")
        else:
            return stream_writer.trigger(processingTime=processing_time).toTable(f"{self.catalog}.{self.db_name}.registered_users_bz")

    def consume_gym_logins(self, once=True, processing_time="5 seconds"):
        from pyspark.sql import functions as F
        schema = "mac_address string, gym bigint, login double, logout double"
        
        df_stream = (spark.readStream
                        .format("cloudFiles")
                        .schema(schema)
                        .option("maxFilesPerTrigger", 1)
                        .option("cloudFiles.format", "csv")
                        .option("header", "true")
                        .load(self.landing_zone + "/gym_logins_bz")
                        .withColumn("load_time", F.current_timestamp())
                        .withColumn("source_file", F.col("_metadata.file_path"))
                    )
        stream_writer = df_stream.writeStream \
            .format("delta") \
            .option("checkpointLocation", self.checkpoint_base + "/gym_logins_bz") \
            .outputMode("append") \
            .queryName("gym_logins_bz_ingestion_stream")
        
        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "bronze_p2")

        if once == True:
            return stream_writer.trigger(availableNow=True).toTable(f"{self.catalog}.{self.db_name}.gym_logins_bz")
        else:
            return stream_writer.trigger(processingTime=processing_time).toTable(f"{self.catalog}.{self.db_name}.gym_logins_bz")

    def consume_kafka_multiplex(self, once=True, processing_time="5 seconds"):
        from pyspark.sql import functions as F
        schema = "key string, value string, topic string, partition bigint, offset bigint, timestamp bigint"
        
        df_date_lookup = spark.table(f"{self.catalog}.{self.db_name}.date_lookup").select("date", "week_part")
        
        df_stream = (spark.readStream
                        .format("cloudFiles")
                        .schema(schema)
                        .option("maxFilesPerTrigger", 1)
                        .option("cloudFiles.format", "json")
                        .option("multiLine", "true")
                        .load(self.landing_zone + "/kafka_multiplex_bz")
                        .withColumn("load_time", F.current_timestamp())
                        .withColumn("source_file", F.col("_metadata.file_path"))
                        .join(F.broadcast(df_date_lookup), 
                              [F.to_date((F.col("timestamp")/1000).cast("timestamp")) == F.col("date")], 
                              "left")
                    )

        stream_writer = df_stream.writeStream \
            .format("delta") \
            .option("checkpointLocation", self.checkpoint_base + "/kafka_multiplex_bz") \
            .outputMode("append") \
            .queryName("kafka_multiplex_bz_ingestion_stream")
        
        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "bronze_p1")
        
        if once == True:
            return stream_writer.trigger(availableNow=True).toTable(f"{self.catalog}.{self.db_name}.kafka_multiplex_bz")
        else:
            return stream_writer.trigger(processingTime=processing_time).toTable(f"{self.catalog}.{self.db_name}.kafka_multiplex_bz")
    
    def consume(self, once=True, processing_time="5 seconds"):
        import time
        start = int(time.time())
        print(f"\nStarting bronze layer consumption ...")
        
        self.consume_user_registration(once, processing_time)
        self.consume_gym_logins(once, processing_time)
        self.consume_kafka_multiplex(once, processing_time)
        
        if once:
            for stream in spark.streams.active:
                stream.awaitTermination()
                
        print(f"Completed bronze layer consumtion {int(time.time()) - start} seconds")