In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

class BronzeLayerIngestion:
    def __init__(self):
        self.catalog = 'dev'
        self.db = 'swatch_raw'
        self.device_registration_table = 'device_registration'
        self.gym_attendance_table = 'gym_attendance'
        self.bpm_table = 'bpm'
        self.workout_table = 'workout_session'
        self.user_profile_table = 'user_profile'
        self.bootstrap_server = 'pkc-619z3.us-east1.gcp.confluent.cloud:9092'
        self.security_protocol = 'SASL_SSL'
        self.jaas_module = 'org.apache.kafka.common.security.plain.PlainLoginModule'
        self.api_key = 'PWCWCEVJQBUXSSNB'
        self.api_secret = 'kvR0VmvqnZFW2+8VktiegzT2kI4+JIy6aPybyMj6jVXuwhp5c34kbctYyUucGmnM'
        self.bpm_topic = 'bpm2'
        self.user_profile_topic = 'user_profile'
        self.workout_topic = 'workout_session'


In [0]:
# Ingest the devices into bronze layer (raw)
class DeviceIngestion(BronzeLayerIngestion):
    def __init__(self):
        super().__init__()
    
    def get_device_schema(self):
        schema = StructType([
                StructField('user_id', StringType(),True),
                StructField('device_id', LongType(),True),
                StructField('mac_address', StringType(),True),
                StructField('registration_timestamp', LongType(),True)
                            ])
        return schema
    
    def get_raw_data(self):
        raw_df = (spark
                  .readStream
                  .format('csv')
                  .option("delimiter", ",")
                  .schema(self.get_device_schema())
                  .option('header',True)
                  .load('/Volumes/dev/swatch_raw/device_registration_landing_zone/'))
        return raw_df
    
    def persist_devices(self, raw_df):
        devices_streaming_query = (raw_df
                                   .writeStream
                                   .format("delta")
                                   .queryName("bronze_layer_device_registration")
                                   .option("maxFilesPerTrigger",10)
                                   .option('checkpointLocation','/Volumes/dev/swatch_raw/device_registration_checkpoint')
                                   .outputMode("append")
                                   .trigger(processingTime='10 seconds')
                                   .toTable(f"{self.catalog}.{self.db}.{self.device_registration_table}")
                                   )
        return devices_streaming_query
    
    def start_stream(self):
        devices_streaming_query = self.persist_devices(self.get_raw_data())
        return devices_streaming_query


In [0]:
devices = DeviceIngestion()
devices_streaming_query=devices.start_stream()

In [0]:
%sql
select * from dev.swatch_raw.device_registration;

user_id,device_id,mac_address,registration_timestamp
12474,118440,4c:c5:9f:cb:13:bd,1678451168
12140,141687,ae:ec:f6:48:ca:f7,1678451529
13559,109290,36:1f:d9:d3:e8:0d,1678451631
11745,190960,14:cd:d6:db:70:f6,1678451681
12227,114131,57:24:ac:8c:75:ea,1678452028
14633,175406,1d:69:69:75:d0:aa,1678538830
14508,102558,df:f9:dc:5e:e2:a8,1678539025
13937,143442,dd:96:be:e9:1e:f4,1678539037
14232,172965,dd:45:d2:37:a8:0e,1678539087
15149,177966,de:c0:cd:a7:71:f4,1678539102


In [0]:
devices_streaming_query.stop()

In [0]:
class GymSessionIngestion(BronzeLayerIngestion):
    def __init__(self):
        super().__init__()

    def get_schema(self):
        schema = StructType([
            StructField("mac_address", StringType(), True),
            StructField("gym_id", StringType(), True),
            StructField("login_time", LongType(), True),
            StructField("logout_time", LongType(), True)
        ])
        return schema
    
    def read_raw_data(self):
        raw_df = (spark
                  .readStream
                  .format("csv")
                  .option("header", True)
                  .option("delimiter", ",")
                  .schema(self.get_schema())
                  .load("/Volumes/dev/swatch_raw/gym_attendance_landing_zone")
                  )
        return raw_df
    
    def persist_data(self,raw_df):
        gym_streaming_query = (raw_df
                               .writeStream
                               .format("delta")
                               .queryName("bronze_layer_gym_session")
                               .option("checkpointLocation","/Volumes/dev/swatch_raw/gym_attendance_checkpoint")
                               .outputMode("append")
                               .option("maxFilePerTrigger",10)
                               .trigger(processingTime='5 seconds')
                                .toTable(f"{self.catalog}.{self.db}.{self.gym_attendance_table}")
                               )
        return gym_streaming_query
    
    def start_stream(self):
        gym_streaming_query = self.persist_data(self.read_raw_data())
        return gym_streaming_query

In [0]:
gym_attendance = GymSessionIngestion()
gym_streaming_query=gym_attendance.start_stream()

In [0]:
gym_streaming_query.stop()

In [0]:
%sql
select * from dev.swatch_raw.gym_attendance;

mac_address,gym_id,login_time,logout_time
4c:c5:9f:cb:13:bd,5,1678521600,1678526100
ae:ec:f6:48:ca:f7,1,1678522500,1678525200
36:1f:d9:d3:e8:0d,3,1678522500,1678527000
14:cd:d6:db:70:f6,5,1678523400,1678527600
57:24:ac:8c:75:ea,1,1678524000,1678528500
36:1f:d9:d3:e8:0d,3,1678561200,1678564800
14:cd:d6:db:70:f6,5,1678562400,1678565700
57:24:ac:8c:75:ea,5,1678562880,1678567200
1d:69:69:75:d0:aa,1,1678608000,1678611000
df:f9:dc:5e:e2:a8,1,1678608000,1678611600


In [0]:

# Ingest the raw data from the kafka
# Apply the Dedup to make sure we only save the kafka record once in the table 
# Kakfa provide the atleast once gurantee.

class BPMIngestion(BronzeLayerIngestion):
    def __init__(self):
        super().__init__()

    def get_schema(self):
        schema = StructType([
            StructField("device_id", LongType(), True),
            StructField("heartrate", DoubleType(), True),
            StructField("time", LongType(), True)
        ])
        return schema
    
    def load_raw_data(self):
        raw_df = (spark
                  .readStream
                  .format("kafka")
                  .option("kafka.bootstrap.servers",self.bootstrap_server)
                  .option("kafka.security.protocol",self.security_protocol)
                  .option("kafka.sasl.mechanism", "PLAIN")
                  .option("kafka.sasl.jaas.config", f"{self.jaas_module} required username='{self.api_key}' password='{self.api_secret}';")
                  .option("maxOffsetsPerTrigger",100)
                  .option("subscribe",self.bpm_topic)
                  .load()
                  )
        return raw_df
    
    def format_raw_data(self,raw_df):
        formatted_df = (raw_df
                     .select(from_json(col("value").cast("string"), self.get_schema()).alias("value"))
                     .selectExpr("value.*")
                     )
        #remove_dup = formatted_df.dropDuplicates(["device_id","time"])
        return formatted_df
    
    def upsert(self,df,batch_id):
        df.createOrReplaceTempView("raw_bpm")
        merge_statement = f"""
        MERGE into {self.catalog}.{self.db}.{self.bpm_table} t
        USING raw_bpm s
        on (s.device_id = t.device_id and t.time = s.time)
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """

        df._jdf.sparkSession().sql(merge_statement)

    
    def persist_data(self,formatted_df):
        bpm_streaming_query = (formatted_df
                               .writeStream
                               .queryName("bronze_layer_bpm_ingestion")
                               .format("delta")
                               .option("checkpointLocation","/Volumes/dev/swatch_raw/bpm_checkpoint")
                               .outputMode("update")
                               .foreachBatch(self.upsert)
                               .start()
                               )
        return bpm_streaming_query
    
    def start_bpm_stream(self):
        return self.persist_data(self.format_raw_data(self.load_raw_data()))
    

In [0]:
bpm = BPMIngestion()
bpm_streaming_query = bpm.start_bpm_stream()

In [0]:
bpm_streaming_query.stop()

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 642, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/sql/utils.py", line 132, in call
    raise e
  File "/databricks/spark/python/pyspark/sql/utils.py", line 129, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/root/.ipykernel/1246/command-4121416081749373-3513267630", line 49, in upsert
    df._jdf.sparkSession().sql(merge_statement)
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1355, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/errors/exceptions/captured.py", line 255, in deco
    re

In [0]:
%sql
select * from dev.swatch_raw.bpm;


device_id,heartrate,time
175406,36.62929070359733,1678665457
175406,41.902535832782526,1678599619
175406,44.5946420681194,1678606235
175406,69.76540886392135,1678664344
175406,47.49380526293744,1678621030
118440,63.933349753602016,1678652991
118440,47.75329990677812,1678593521
118440,62.575923914097615,1678672866
175406,94.1332832270105,1678626908
175406,51.74745320944342,1678652793


In [0]:
class UserProfileIngestion(BronzeLayerIngestion):

    def __init__(self):
        super().__init__()

    def get_schema(self):
        schema = StructType([
            StructField("user_id", StringType(), True),
            StructField("update_type", StringType(), True),
            StructField("timestamp", LongType(), True),
            StructField("dob", StringType(), True),
            StructField("sex", StringType(), True),
            StructField("gender", StringType(), True),
            StructField("first_name", StringType(), True),
            StructField("last_name", StringType(), True),
            StructField("address", StructType([
                 StructField("street_address", StringType(), True),
                 StructField("city", StringType(), True),
                 StructField("state", StringType(), True),
                 StructField("zip", StringType(), True)
            ]), True)
        ])
        return schema
    
    def load_raw_data(self):
        raw_df = (spark
                  .readStream
                  .format("kafka")
                  .option("kafka.bootstrap.servers",self.bootstrap_server)
                  .option("kafka.security.protocol",self.security_protocol)
                  .option("kafka.sasl.mechanism", "PLAIN")
                  .option("kafka.sasl.jaas.config", f"{self.jaas_module} required username='{self.api_key}' password='{self.api_secret}';")
                  .option("maxOffsetsPerTrigger",100)
                  .option("subscribe",self.user_profile_topic)
                  .load()
                  )
        return raw_df
    
    def format_raw_data(self,raw_df):
        formatted_df = (raw_df
                     .select(from_json(col("value").cast("string"), self.get_schema()).alias("value"))
                     .selectExpr("value.*")
                     )
        return formatted_df
    
    # Remove the duplicates from the stream
    def upsert(self,df,batch_id):
        df.createOrReplaceTempView("raw_user_profile")
        merge_statement = f"""
        MERGE into {self.catalog}.{self.db}.{self.user_profile_table} t
        USING raw_user_profile s
        on (s.user_id = t.user_id and t.timestamp = s.timestamp and t.update_type=s.update_type)
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """

        df._jdf.sparkSession().sql(merge_statement)

    
    def persist_data(self,formatted_df):
        user_profile_streaming_query = (formatted_df
                               .writeStream
                               .queryName("bronze_layer_user_profile_ingestion")
                               .format("delta")
                               .option("checkpointLocation","/Volumes/dev/swatch_raw/user_profile_checkpoint")
                               .outputMode("update")
                               .trigger(processingTime='5 seconds')
                               .foreachBatch(self.upsert)
                               .start()
                               )
        return user_profile_streaming_query
    
    def start_user_profile_stream(self):
        return self.persist_data(self.format_raw_data(self.load_raw_data()))
    
    


In [0]:
user_profile = UserProfileIngestion()
user_profile_streaming_query = user_profile.start_user_profile_stream()

In [0]:
user_profile_streaming_query.stop()

In [0]:
%sql
select * from dev.swatch_raw.user_profile;

user_id,update_type,timestamp,dob,sex,gender,first_name,last_name,address
12474,new,1678451168,07/25/1939,M,M,Matthew,Phillips,"List(02648 Wilkins Cliffs Suite 998, San Fernando, CA, 91340)"
13559,new,1678451631,03/06/1980,F,F,Victoria,Smith,"List(88788 Dawson Lodge, Los Angeles, CA, 90065)"
11745,new,1678451681,06/29/1955,F,F,Shannon,Reyes,"List(3105 Bowers Expressway, Long Beach, CA, 90808)"
13559,update,1678451650,03/06/1980,F,F,Victoria,Smith,"List(634 Acevedo Mountain, Santa Monica, CA, 90405)"
13937,new,1678539037,04/26/1982,M,M,Matthew,Johnson,"List(9231 Edward Throughway Suite 072, Toluca Lake, CA, 91610)"
14232,update,1678539098,01/04/1979,M,M,Edward,Smith,"List(41444 Noble Cape Suite 390, North Hollywood, CA, 91606)"
15149,new,1678539102,03/30/1972,M,M,Cameron,Vasquez,"List(95932 Gary Ridges, Los Angeles, CA, 90018)"
12140,new,1678451529,02/02/1999,M,M,Robert,Clark,"List(68994 Steven Vista, Pearblossom, CA, 93553)"
12227,new,1678452028,12/11/1949,F,F,Courtney,Sheppard,"List(47754 Angela Plaza Apt. 135, Los Angeles, CA, 90010)"
12140,update,1678451625,02/02/1999,M,M,Robert,Castillo,"List(68994 Steven Vista, Pearblossom, CA, 93553)"


In [0]:
class WorkoutIngestion(BronzeLayerIngestion):

    def __init__(self):
        super().__init__()

    def get_schema(self):
        schema = StructType([
            StructField("user_id", StringType(), True),
            StructField("workout_id", IntegerType(), True),
            StructField("timestamp", LongType(), True),
            StructField("action", StringType(), True),
            StructField("session_id", IntegerType(), True)
        ])
        return schema
    
    def load_raw_data(self):
        raw_df = (spark
                  .readStream
                  .format("kafka")
                  .option("kafka.bootstrap.servers",self.bootstrap_server)
                  .option("kafka.security.protocol",self.security_protocol)
                  .option("kafka.sasl.mechanism", "PLAIN")
                  .option("kafka.sasl.jaas.config", f"{self.jaas_module} required username='{self.api_key}' password='{self.api_secret}';")
                  .option("maxOffsetsPerTrigger",100)
                  .option("subscribe",self.workout_topic)
                  .load()
                  )
        return raw_df
    
    def format_raw_data(self,raw_df):
        formatted_df = (raw_df
                     .select(from_json(col("value").cast("string"), self.get_schema()).alias("value"))
                     .selectExpr("value.*")
                     )
        return formatted_df
    
    def upsert(self,df,batch_id):
        df.createOrReplaceTempView("raw_workout")
        merge_statement = f"""
        MERGE into {self.catalog}.{self.db}.{self.workout_table} t
        USING raw_workout s
        on (s.user_id = t.user_id and t.timestamp = s.timestamp and t.workout_id=s.workout_id)
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """

        df._jdf.sparkSession().sql(merge_statement)

    
    def persist_data(self,formatted_df):
        workout_streaming_query = (formatted_df
                                        .writeStream
                                        .queryName("bronze_layer_workout_ingestion")
                                        .format("delta")
                                        .option("checkpointLocation","/Volumes/dev/swatch_raw/workout_session_checkpoint")
                                        .outputMode("update")
                                        .trigger(processingTime='5 seconds')
                                        .foreachBatch(self.upsert)
                                        .start()
                                  )
        return workout_streaming_query
    
    def start_workout_stream(self):
        return self.persist_data(self.format_raw_data(self.load_raw_data()))
    
    


In [0]:
workout = WorkoutIngestion()
workout_streaming_query = workout.start_workout_stream()

In [0]:
workout_streaming_query.stop()

In [0]:
%sql
select * from dev.swatch_raw.workout_session;

user_id,workout_id,timestamp,action,session_id
12227,1,1678528200,stop,1
12227,1,1678524300,start,1
12474,1,1678521900,start,1
13559,1,1678561500,start,2
12227,1,1678563120,start,2
12227,1,1678566900,stop,2
11745,1,1678565400,stop,2
13559,1,1678522800,start,1
14508,1,1678608300,start,1
13937,1,1678649400,stop,2
