In [0]:
%run ./01-config

In [0]:
class SetupHelper():
    def __init__(self, env):
        Conf = Config()
        self.landing_zone = Conf.base_data_dir + "/raw"
        self.checkpoint_base = Conf.base_data_dir + "/checkpoint"
        self.catalog = env
        self.db_name = Conf.db_name
        self.initialized = False

    def create_db(self):
        spark.catalog.clearCache()
        print(f"Creating database {self.catalog}.{self.db_name}...", end = '')
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {self.catalog}.{self.db_name}")
        spark.sql(f"USE {self.catalog}.{self.db_name}")
        self.initialized = True
        print("done")

    def create_registered_users(self):
        if(self.initialized):
            print(f"Creating registered users_bz_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.registered_users_bz
            (user_id LONG,
            device_id LONG,
            mac_address STRING,
            registeration_timestamp DOUBLE,
            load_time TIMESTAMP,
            source_file STRING)""")
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def create_gym_logins(self):
        if(self.initialized):
            print(f"Creating gym_logins_bz_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE  {self.catalog}.{self.db_name}.gym_logins_bz
            (mac_address STRING,
             gym BIGINT,
             login DOUBLE,
             logout DOUBLE,
             load_time TIMESTAMP,
             source_file STRING)""")
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def create_kafka_multiplex(self):
        if(self.initialized):
            print(f"Creating kafka_multiplex_bz_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.kafka_multiplex_bz
            (key STRING,
            value STRING,
             topic STRING,
             partition BIGINT,
             offset BIGINT,
             timestamp BIGINT,
             date date,
             week_part STRING,
             load_time TIMESTAMP,
             source_file STRING)
             PARTITIONED BY (topic, week_part)""")
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def create_users(self):
        if(self.initialized):
            print(f"Creating users_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.users
            (user_id BIGINT,
            device_id BIGINT,
             mac_address STRING,
             regiseteration_timestamp TIMESTAMP)
             """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def create_gym_logs(self):
        if(self.initialized):
            print(f"Creating gym_logs_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.gym_logs
            (mac_address STRING,
            gym BIGINT,
            login TIMESTAMP,
            logout TIMESTAMP)
             """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def create_user_profiles(self):
        if(self.initialized):
            print(f"Creating user_profile_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.user_profiles
            (user_id BIGINT,
            dob DATE,
            sex STRING,
            gender STRING,
            first_name STRING,
            last_name STRING,
            street_address STRING,
            city STRING,
            state STRING,
            zip INT,
            updated TIMESTAMP)
             """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")
    
    def create_heart_rate(self):
        if(self.initialized):
            print(f"Creating heart_rate_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.heart_rate
            (device_id LONG,
            time TIMESTAMP,
            heart_rate DOUBLE,
            valid BOOLEAN
            )
            """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def create_user_bins(self):
        if(self.initialized):
            print(f"Creating user_bins_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.user_bins
            (user_id BIGINT,
            age STRING,
            gender STRING,
            city STRING,
            state STRING
            )
            """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def create_workouts(self):
        if(self.initialized):
            print(f"Creating workouts_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.workouts
            (user_id INT,
            workout_id INT,
            time TIMESTAMP,
            action STRING,
            sessoin_id INT
            )
            """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def create_completed_workouts(self):
        if(self.initialized):
            print(f"Creating completed_workouts_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.completed_workouts
            (user_id INT,
            workout_id INT,
            session_id INT,
            start_time TIMESTAMP,
            end_time TIMESTAMP
            )
            """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")
    
    def create_workout_bpm(self):
        if(self.initialized):
            print(f"Creating workout_bpm_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.workout_bpm
            (user_id INT,
            workout_id INT,
            session_id INT,
            start_time TIMESTAMP,
            end_time TIMESTAMP,
            time TIMESTAMP,
            heart_rate DOUBLE
            )
            """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")
            
    def create_date_lookup(self):
        if(self.initialized):
            print(f"Creating date_lookup_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.date_lookup
            (date DATE,
             week INT,
             year INT,
             month INT,
             dayofweek INT,
             dayofmonth INT,
             dayofyear INT,
             week_part STRING
             )
            """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")
            
    def create_workout_bpm_summary(self):
        if(self.initialized):
            print(f"Creating workout_bpm_summary_table...", end='')
            spark.sql(f"""CREATE OR REPLACE TABLE {self.catalog}.{self.db_name}.workout_bpm_summary
            (workout_id INT,
            session_id INT,
            user_id INT,
            age STRING,
            gender STRING,
             city STRING,
             state STRING,
             min_bpm DOUBLE,
             avg_bpm DOUBLE,
             max_bpm DOUBLE,
             num_recordings BIGINT
            )
            """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    
    def create_gym_summary(self):
        if(self.initialized):
            print(f"Creating gym_summary Gold View...", end='')
            spark.sql(f"""CREATE OR REPLACE VIEW {self.catalog}.{self.db_name}.gym_summary AS
            SELECT
                to_date(login::timestamp) as date,
                gym, l.mac_address, workout_id, session_id,
                round((logout::long - start_time::long)/60, 2) as minutes_in_gym,
                round((end_time::long - start_time::long)/60, 2) as minutes_exercising
                from gym_logs l
                JOIN (
                    SELECT mac_address, workout_id, session_id, start_time, end_time FROM completed_workouts w
                    INNER JOIN users u ON w.user_id = u.user_id
                ) w
                ON l.mac_address = w.mac_address
                AND w.start_time BETWEEN l.login AND l.logout
                ORDER BY date, gym, l.mac_address, session_id
            """)
            print("done")
        else:
            raise ReferenceError("Application database is not defined. Can not create table in default database.")

    def setup(self):
        import time
        start = int(time.time())
        print(f"\nStarting setup...", end='')
        self.create_db()
        self.create_registered_users()
        self.create_gym_logins()
        self.create_kafka_multiplex()
        self.create_users()
        self.create_gym_logs()
        self.create_user_profiles()
        self.create_heart_rate()
        self.create_workouts()
        self.create_completed_workouts()
        self.create_workout_bpm()
        self.create_user_bins()
        self.create_date_lookup()
        self.create_workout_bpm_summary()
        self.create_gym_summary()
        print(f"Setup completed in {int(time.time()) - start} seconds.")

    def assert_table(self, table_name):
        assert spark.sql(f"SHOW TABLES IN {self.catalog}.{self.db_name}") \
                    .filter(f"tableName = '{table_name}'") \
                        .count() == 1, f"Table {table_name} does not exist."
        print(f"Found {table_name} table in {self.catalog}.{self.db_name} : Success!")
    
    def validate(self):
        import time
        start = int(time.time())
        print(f"/nValidating tables...", end='')
        self.assert_table("registered_users_bz")
        self.assert_table("gym_logins_bz")
        self.assert_table("kafka_multiplex_bz")
        self.assert_table("users")
        self.assert_table("gym_logs")
        self.assert_table("user_profiles")
        self.assert_table("heart_rate")
        self.assert_table("workouts")
        self.assert_table("completed_workouts")
        self.assert_table("workout_bpm")
        self.assert_table("user_bins")
        self.assert_table("date_lookup")
        self.assert_table("workout_bpm_summary")
        self.assert_table("gym_summary")
        print(f"Validated tables in {int(time.time()) - start} seconds.")

    def cleanup(self):
        if spark.sql(f"SHOW DATABASES IN {self.catalog}").filter(f"databaseName == '{self.db_name}'").count() == 1:
            print(f"Dropping {self.db_name}.{self.db_name} database...", end='')
            spark.sql(f"DROP DATABASE {self.catalog}.{self.db_name} CASCADE")
            print("done")
        print(f"Deleting {self.landing_zone}...", end='')
        dbutils.fs.rm(self.landing_zone, True)
        print("done")
        print(f"Deleteing {self.checkpoint_base} ....", end='')
        dbutils.fs.rm(self.checkpoint_base, True)
        print("done")
        