In [0]:
%run "../Configuration/config file"

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

class UpsertCls():

    def __init__(self, merge_query, view_name, delete_query=None):
        self.query = merge_query
        self.view_name = view_name
        self.delete_query = delete_query
    
    def upsert(self, df, batch_id):
        df.createOrReplaceTempView(self.view_name)
        df._jdf.sparkSession().sql(self.query)
    
    def multiUpsert(self, df, batch_id):
        df_new_user = df.filter(col("update_type") == "new").drop("update_type")
        df_update_user = df.filter(col("update_type") == "update").drop("update_type")
        df_delete_user = df.filter(col("update_type")=="delete").drop("update_type")
        if df_new_user.count()>0:
            self.upsert(df_new_user, batch_id)
        if df_delete_user.count()>0:
            ls_user_id = df_delete_user.select("user_id").distinct().collect()
            delete_query = self.delete_query.format(ls_user_id)
            df_delete_user._jdf.sparkSession().sql(delete_query)
        window_spec = Window.partitionBy("user_id").orderBy(col("timestamp").desc())
        df_update_user = df_update_user.withColumn('rank', rank().over(window_spec))
        final_df = df_update_user.where(col("rank")==1).drop("rank")
        self.upsert(final_df, batch_id)        
    


In [0]:
import time
from pyspark.sql.functions import cast, col, to_date, from_unixtime, to_timestamp, from_json, round, when, min, max, broadcast, date_diff, current_date, floor


class Silver():

    def __init__(self):
        obj_conf = ConfigModule()
        self.checkpoint_dir = f"{obj_conf.base_checkpoint_path}/checkpoint"
        self.catalog = obj_conf.environment
        self.schema = obj_conf.db_name

    
    def get_sl_users(self, version=0, once=True, processingtime='15 seconds'):
        print(f'Streaming for {self.catalog}.{self.schema}.sl_users table...', end='')
        read_df = spark.readStream.option('startingVersion', version)\
                            .option('ignoreDeletes', True)\
                            .table(f"{self.catalog}.{self.schema}.bz_registered_users")
        updated_df = read_df.select("user_id", "device_id", "mac_address", 
                                    to_timestamp(col("registration_timestamp").cast('long')).alias("registration_timestamp"))\
                            .dropDuplicates(["user_id", "device_id"])
        merge_query = f""" MERGE INTO {self.catalog}.{self.schema}.sl_users AS target USING users_view AS source 
                            ON source.user_id=target.user_id AND source.device_id=target.device_id 
                            WHEN NOT MATCHED THEN INSERT *;"""
        obj_upsertcls = UpsertCls(merge_query, 'users_view')
        final_df = updated_df.writeStream.queryName('sl_users_stream')\
                                    .format('delta')\
                                    .option('checkpointLocation', f'{self.checkpoint_dir}/cp_sl_users')\
                                    .outputMode('update')\
                                    .foreachBatch(obj_upsertcls.upsert)
        if once:
            final_df.trigger(availableNow = once).start()
        else:
            final_df.trigger(processingTime = processingtime).start()
        print('Started.')
    
    def get_sl_gym_logins(self, version=0, once=True, processingtime='15 seconds'):
        print(f'Streaming for {self.catalog}.{self.schema}.sl_gym_logins table...', end='')
        read_df = spark.readStream.option('startingVersion', version)\
                            .option('ignoreDeletes', True)\
                            .table(f"{self.catalog}.{self.schema}.bz_gym_logins")
        updated_df = read_df.select("mac_address", "gym",
                                    to_timestamp(col("login").cast('long')).alias("login"), 
                                    to_timestamp(col("logout").cast('long')).alias("logout"))\
                            .dropDuplicates(["mac_address", "gym", "login", "logout"])
        merge_query = f""" MERGE INTO {self.catalog}.{self.schema}.sl_gym_logins AS target 
                            USING gym_logins_view AS source 
                            ON source.mac_address=target.mac_address AND source.gym=target.gym AND source.login=target.login
                            WHEN MATCHED AND source.logout > target.login AND source.logout > target.logout
                            THEN UPDATE SET target.logout=source.logout
                            WHEN NOT MATCHED THEN INSERT *;"""
        obj_upsertcls = UpsertCls(merge_query, 'gym_logins_view')
        final_df = updated_df.writeStream.queryName('sl_gym_logins_stream')\
                                    .format('delta')\
                                    .option('checkpointLocation', f'{self.checkpoint_dir}/cp_sl_gym_logins')\
                                    .outputMode('update')\
                                    .foreachBatch(obj_upsertcls.upsert)
        if once:
            final_df.trigger(availableNow = once).start()
        else:
            final_df.trigger(processingTime = processingtime).start()
        print('Started.')
    
    def get_sl_user_profile(self, version=0, once=True, processingtime='15 seconds'):
        print(f'Streaming for {self.catalog}.{self.schema}.sl_user_profile table...', end='')
        user_profile_schema = "user_id bigint, update_type string, timestamp double, dob string, sex string, gender string, first_name string, last_name string, address struct<street_address:string, city:string, state:string, zip:bigint>"
        merge_query = f"""MERGE INTO {self.catalog}.{self.schema}.sl_user_profile AS target
                        USING user_profile_view AS source ON source.user_id=target.user_id
                        WHEN MATCHED THEN UPDATE SET  
                        target.timestamp=source.timestamp, 
                        target.dob=source.dob, 
                        target.sex=source.sex, 
                        target.gender=source.gender, 
                        target.first_name=source.first_name, 
                        target.last_name=source.last_name, 
                        target.street_address=source.street_address, 
                        target.city=source.city, 
                        target.state=source.state, 
                        target.zip=source.zip
                        WHEN NOT MATCHED THEN INSERT *;"""
        delete_query = f"""DELETE FROM {self.catalog}.{self.schema}.sl_user_profile where user_id={0}"""
        read_df = spark.readStream.option('StartingVersion', version)\
                                    .option('ignoreDeletes', True)\
                                    .table(f'{self.catalog}.{self.schema}.bz_kafka_multiplex')\
                                    .where("topic='user_info'")
        schema_df = read_df.select(from_json(col("value"), user_profile_schema).alias('a')).select("a.*")\
                            .select("user_id", "update_type", to_timestamp(col("timestamp")).alias("timestamp"), to_date(col("dob"), "MM/dd/yyyy").alias("dob"), "sex", "gender", "first_name", "last_name","address.street_address", "address.city","address.state", "address.zip")
        updated_df = schema_df.dropDuplicates(["user_id", "update_type", "timestamp"])
        obj_upsertcls = UpsertCls(merge_query, 'user_profile_view', delete_query=delete_query)
        final_df = updated_df.writeStream.queryName('sl_user_profile_stream')\
                                        .format('delta')\
                                        .option("checkpointLocation", f'{self.checkpoint_dir}/cp_sl_user_profile')\
                                        .outputMode('update')\
                                        .foreachBatch(obj_upsertcls.multiUpsert)
        if once:
            final_df.trigger(availableNow = once).start()
        else:
            final_df.trigger(processingTime = processingtime).start()
        print('Started.')
    
    def get_sl_heart_rate(self, version=0, once=True, processingtime='15 seconds'):
        print(f'Streaming for {self.catalog}.{self.schema}.sl_heart_rate table...', end='')
        read_df = spark.readStream.option('startingVersion', version)\
                            .option('ignoreDeletes', True)\
                            .table(f"{self.catalog}.{self.schema}.bz_kafka_multiplex")\
                            .where("topic = 'bpm'")
        heart_rate_schema = "device_id long, time double, heartrate double"
        schema_df = read_df.select(from_json(col("value"), heart_rate_schema).alias('a')).select("a.*")\
                            .select("device_id", to_timestamp(col("time")).alias("time"), round(col("heartrate"),4).alias("heart_rate"))
        updated_df = schema_df.dropDuplicates(["device_id", "time"])
        merge_query = f""" MERGE INTO {self.catalog}.{self.schema}.sl_heart_rate AS target 
                            USING heart_rate_view AS source 
                            ON source.device_id=target.device_id AND source.time=target.time
                            WHEN NOT MATCHED THEN INSERT *;"""
        obj_upsertcls = UpsertCls(merge_query, 'heart_rate_view')
        final_df = updated_df.writeStream.queryName('sl_heart_rate_stream')\
                                    .format('delta')\
                                    .option('checkpointLocation', f'{self.checkpoint_dir}/cp_sl_heart_rate')\
                                    .outputMode('update')\
                                    .foreachBatch(obj_upsertcls.upsert)
        if once:
            final_df.trigger(availableNow = once).start()
        else:
            final_df.trigger(processingTime = processingtime).start()
        print('Started.')
    

    def get_sl_workout_session(self, version=0, once=True, processingtime='15 seconds'):
        print(f'Streaming for {self.catalog}.{self.schema}.sl_workout_session table...', end='')
        read_df = spark.readStream.option('startingVersion', version)\
                            .option('ignoreDeletes', True)\
                            .table(f"{self.catalog}.{self.schema}.bz_kafka_multiplex")\
                            .where("topic = 'workout'")
        workout_schema = """user_id long, workout_id long, timestamp double, 
                            action string, session_id long"""
        schema_df = read_df.select(from_json(col("value"), workout_schema).alias('a')).select("a.*")\
                            .select("user_id", "workout_id", to_timestamp(col("timestamp")).alias("timestamp"), "action", "session_id")
        updated_df = schema_df.dropDuplicates(["user_id", "timestamp"])
        merge_query = f""" MERGE INTO {self.catalog}.{self.schema}.sl_workout_session AS target 
                            USING workout_session_view AS source 
                            ON source.user_id=target.user_id AND source.timestamp=target.timestamp
                            WHEN NOT MATCHED THEN INSERT (user_id, workout_id, timestamp, action, session_id)
                            VALUES (source.user_id, source.workout_id, source.timestamp, source.action, source.session_id);"""
        obj_upsertcls = UpsertCls(merge_query, 'workout_session_view')
        final_df = updated_df.writeStream.queryName('sl_workout_session_stream')\
                                .format('delta')\
                                .option('checkpointLocation', f'{self.checkpoint_dir}/cp_sl_workout_session')\
                                .outputMode('update')\
                                .foreachBatch(obj_upsertcls.upsert)
        if once:
            final_df.trigger(availableNow = once).start()
        else:
            final_df.trigger(processingTime = processingtime).start()
        print('Started.')
    
    def get_sl_complete_workout(self, version=0, once=True, processingtime='15 seconds'):
        print(f"Streaming for {self.catalog}.{self.schema}.sl_complete_workout table...", end='')
        cw_merge_query = f"""MERGE INTO {self.catalog}.{self.schema}.sl_complete_workout AS target
                                    USING complete_workout_view AS source ON 
                                    source.user_id=target.user_id AND source.session_id=target.session_id AND 
                                    source.workout_id=target.workout_id
                                    WHEN MATCHED AND source.end_time > target.start_time AND source.end_time > target.end_time THEN UPDATE SET target.end_time=source.end_time
                                    WHEN NOT MATCHED THEN INSERT *"""
        read_df = spark.readStream.option('startingVersion', version)\
                                    .option('ignoreDeleted', True)\
                                    .table(f'{self.catalog}.{self.schema}.sl_workout_session')
        updated_df = read_df.withColumn("start_time", when(col("action")=="start", col("timestamp")))\
                            .withColumn("end_time", when(col("action")=="stop", col("timestamp")))\
                            .groupBy("user_id", "workout_id", "session_id")\
                            .agg(min("start_time").alias("start_time"), max("end_time").alias("end_time"))\
                            .select("user_id", "workout_id", "session_id", "start_time", "end_time")
        obj_upsertcls = UpsertCls(cw_merge_query, 'complete_workout_view')
        final_df = updated_df.writeStream.queryName('sl_complete_workout_stream')\
                            .option('checkpointLocation', f'{self.checkpoint_dir}/cp_sl_complete_workout')\
                            .outputMode('update')\
                            .foreachBatch(obj_upsertcls.upsert)
        if once:
            time.sleep(60)
            final_df.trigger(availableNow=once).start()
        else:
            final_df.trigger(processingTime=processingtime).start()
        print('Started.')

    def get_sl_user_bins(self, version=0, once=True, processingtime='15 seconds'):
        print(f"Streaming for {self.catalog}.{self.schema}.sl_user_bins table...", end='')
        merge_query = f"""MERGE INTO {self.catalog}.{self.schema}.sl_user_bins AS target
                            USING user_bins_view AS source ON 
                            source.user_id=target.user_id
                            WHEN MATCHED THEN UPDATE SET *
                            WHEN NOT MATCHED THEN INSERT *;
                        """
        register_user_df = spark.readStream.option('startingVersion', version)\
                                    .option('ignoreDeleted', True)\
                                    .table(f'{self.catalog}.{self.schema}.sl_users')
        profile_df = spark.read.option('startingVersion', version)\
                            .option('ignoreDeleted', True)\
                            .table(f"{self.catalog}.{self.schema}.sl_user_profile")\
                            .withColumn( "age_count", floor( date_diff(current_date(), to_date("dob")) / 365 ))\
                            .withColumn("age",when(col("age_count")<18, "under 18")\
                                            .when((col("age_count")>17) & (col("age_count")<=25), "18-25")\
                                            .when((col("age_count")>25) & (col("age_count")<=30), "26-30")\
                                            .when((col("age_count")>30) & (col("age_count")<=35), "31-35")\
                                            .when((col("age_count")>35) & (col("age_count")<=40), "36-40")\
                                            .when((col("age_count")>40) & (col("age_count")<=45), "41-45")\
                                            .when((col("age_count")>45) & (col("age_count")<=50), "46-50")\
                                            .when((col("age_count")>50) & (col("age_count")<=55), "51-55")\
                                            .when((col("age_count")>55) & (col("age_count")<=60), "56-60")\
                                            .when((col("age_count")>60) & (col("age_count")<=60), "56-60")\
                                            .when((col("age_count")>75) & (col("age_count")<=100), "above 75")\
                                            .otherwise("invalid age"))
        combine_df = register_user_df.join(profile_df, on="user_id", how="left")\
                        .fillna({"age":"Unknown", "gender":"Unknown", "city":"Unknown", "state":"Unknown"})\
                        .select("user_id", "device_id", "age", "gender", "city", "state")
        obj_upsertcls = UpsertCls(merge_query, 'user_bins_view')
        final_df = combine_df.writeStream.queryName('sl_user_bin_stream')\
                            .option('checkpointLocation', f'{self.checkpoint_dir}/cp_sl_user_bin')\
                            .outputMode('update')\
                            .foreachBatch(obj_upsertcls.upsert)
        if once:
            time.sleep(60)
            final_df.trigger(availableNow=once).start()
        else:
            final_df.trigger(processingTime=processingtime).start()
        print('Started.')

        

    def launcher(self, once=True, processingtime='15 seconds'):
        self.get_sl_users(once=once, processingtime=processingtime)
        self.get_sl_gym_logins(once=once, processingtime=processingtime)
        self.get_sl_user_profile(once=once, processingtime=processingtime)
        self.get_sl_heart_rate(once=once, processingtime=processingtime)
        self.get_sl_workout_session(once=once, processingtime=processingtime)
        self.get_sl_complete_workout(once=once, processingtime=processingtime)
        self.get_sl_user_bins(once=once, processingtime=processingtime)
        for stream in spark.streams.active:
            stream.awaitTermination()



#obj = Silver()
#obj.launcher()

In [0]:
class SilverTestSuite():

    def __init__(self):
        obj_conf = ConfigModule()
        self.catalog = obj_conf.environment
        self.schema = obj_conf.db_name

    def assert_fn(self, table_name, filter, expected_count):
        print(f'Testing Silver layer - {self.catalog}.{self.schema}.{table_name} table...', end='')
        actual_count = spark.sql(f"select count(*) from {self.catalog}.{self.schema}.{table_name} where {filter}").collect()[0][0]
        assert actual_count==expected_count, f"Test case failed, actual count is {actual_count}"
        print('Test Passed.')
    
    def testcases(self):
        self.assert_fn('sl_users', 'true', 5)
        self.assert_fn('sl_gym_logins', 'gym==1', 2)
        self.assert_fn('sl_user_profile', "true", 2)
        self.assert_fn('sl_heart_rate', "true", 5)
        self.assert_fn('sl_workout_session', "true", 2)
        self.assert_fn('sl_complete_workout', "user_id=12474", 1)
        self.assert_fn('sl_user_bins', "user_id in (12140, 12474)", 2)

#obj = BronzeTestSuite()
#obj.testcases()