In [None]:
%run ./01-config

In [None]:
class SetupHelper():
    def __init__(self,env):
        self.landing_zone = Conf.base_dir_data + "/raw"
        self.checkpoint_base = Conf.base_dir_checkpoint + "/checkpoints"
        self.catalog = env
        self.db_name = Conf.db_name
        self.initialized = False 

    def create_db(self):
        spark.catalog.clearCache()
        print(f"Creating the database {self.catalog}.{self.db_name}")
        spark.sql("CREATE DATABASE IF NOT EXISTS {self.catalog}.{self.db_name}")
        spark.sql(f"use {self.catalog}.{self.db_name}")
        self.initialized = True
        Print("Done")

    def create_registered_users_bz(self):
        if(self.initialized):
            print(f"Creating registered_users_bz table...", end='')
            spark.sql(f"""CREATE TABLE IF NOT EXIST {self.catalog}.{self.db_name}.registered_users_bz(
                      user_id long,
                      device_id long,
                      mac_address string,
                      registration_timestamp double,
                      load_time timestamp,
                      source_file string
                      )""")
            print("Done")
        else:
            raise ReferenceError("Application database is not defined. Cannot create table in default dataframe.")
        

    def setup(self):
        import time
        start = int(time.time())
        print(f"\nStarting setup ...")
        self.create_db()
        self.create_registered_users_bz()
        print(f"Setup completed in {int(time.time()-start)} seconds)")

    ## validation functions

    def assert_table(self,table_name):
        assert spark.sql(f"SHOW TABLES IN {self.catalog}.{self.db_name}") \
                    .filter(f"is Temporary == false and tableName == '{table_name}'")\
                    .count() ==1, f"The table {table_name} is missing"
        print(f"Found {table_name} table in {self.catalog}.{self.db_name}:Success")
        self.assert_table("registered_users_bz")
        print(f"Setup validataion completed in {int(time.time())-start} seconds")

    def cleanup(self):
        if spark.sql(f"SHOW DATABASES IN {self.catalog}").filter(f"databaseName == '{self.db_name}'").count() == 1:
            print(f"Dropping the database {self.catalog}.{self.db_name}...",end='')
            spark.sql(f"DROP DATABASE {self.catalog}.{self_db_name} CASCADE")
            print("Done")
        print(f"Deleting {self.landing_Zone}...",end='')
        dbutils.fs.rm(self.landing_zone,True)
        print("Done LandingZone")
        print(f"Deleting {self.checkpoint_base}...",end='')
        dbutils.fs.rm(self.checkpoint_base,True)
        print("Done")
