In [None]:
import os
import duckdb
from loguru import logger

In [None]:
%run "../resources/config.ipynb"

In [None]:
def make_dir_when_not_exists(path):
    if not os.path.exists(path):
        os.makedirs(path) #also creates all intermediates


In [None]:
def dataset_loader(connection, dataset_folder: str = dataset_folder, dataset_info: list = dataset_info):
    """
    This function loads all datasets from the landingzone into in-memory tables with the use of a duckdb connection
    """

    for dataset in dataset_info:
        file_path = f'{dataset_folder}/{dataset.dataset_name}/{dataset.dataset_name}.{dataset.file_format}'
        logger.info(f'trying to open flat file at: {file_path}')
        
        query = f"""
        CREATE OR REPLACE TABLE {dataset.dataset_name} AS 
        SELECT * FROM '{file_path}'
        """
        try:
            connection.sql(query)
            logger.info(f'Successfully read flat file at: {file_path} and created table: {dataset.dataset_name}')
        except Exception as e:
            logger.error(f'error while loading file at: {file_path}, with error: {e}')
            raise

In [None]:
def dataset_stager(connection, medallion_zone: str, datasets: list, staging_format: str = staging_format):
    """
    This function stages/saves in-memory tables in a folder
    It checks if the folder/stage corresponds to one of the medallion architecture names (bronze, silver, gold)
    """

    #avoid creation of files at paths we dont want.
    if medallion_zone not in medallion_zones:
        error_message = f"""The dataset_stager function was entered with medallion_zone={medallion_zone}, which in unsupported. 
        Supported zones are: {medallion_zones}"""
        logger.error(error_message)
        raise ValueError(error_message)

    #Save in-memory tables
    for dataset in datasets:
        try:
            partial_destination_path = f'{dataset_folder}/output/{medallion_zone}/'
            full_destination_path = f'{dataset_folder}/output/{medallion_zone}/{dataset}.{staging_format}'
            make_dir_when_not_exists(partial_destination_path) #When the dir does not exist, we have to make it

            logger.info(f'trying to save table {dataset} as {staging_format} at {full_destination_path}')

            query = f"""
            COPY (SELECT * FROM {dataset}) TO '{full_destination_path}' (FORMAT {staging_format})
            """

            connection.execute(query)
            logger.info(f'dataset: {dataset} saved at: {full_destination_path}')
            
        except Exception as e:
            logger.error(f'error while staging dataset: {dataset} at: {full_destination_path}, with error: {e}')
            raise
            

In [None]:
def clean_airbnb_for_silver(connection, table='airbnb'):
    """
    This is a dedicated function to clean the rentals data.
    It does a couple of specific things:
    only keeping the zipcodes that are complete and follow the right format
    Casting all column types to the right type
    """

    query = f"""
    CREATE OR REPLACE TABLE airbnb_cleaned AS 
    SELECT zipcode
    ,latitude
    ,longitude
    ,room_type
    ,CAST(accommodates as int) AS accommodates
    ,CAST(bedrooms as int) AS bedrooms
    ,CAST(price as int) AS price
    ,CAST(review_scores_value as int) AS review_scores_value
    FROM {table}
    WHERE zipcode ~ '^[0-9]{{4}} [A-Z]{{2}}$'
    """

    logger.info(f'trying to clean airbnb data')
    try:
        connection.sql(query)
        logger.info('succesfully cleaned the airbnb data')
    except Exception as e:
        logger.error(f'Error while cleaning the airbnb data: {e}')
        raise
    


In [None]:
def clean_rentals_for_silver(connection, table='rentals'):
    """
    This is a dedicated function to clean the rentals data.
    It does a couple of specific things:
    only keeping the numbers in the rent column
    adding a space in the postalcode so it has the right format.
    only keeping rows that make sense (for rent and postalCode)
    dropping all other rows
    renaming rent and postalCode to price and zipcode to match airbnb data
    """

    query = f"""
    CREATE OR REPLACE TABLE rentals_cleaned AS
    SELECT 
    substring(postalCode FROM 1 FOR 4) || ' ' || substring(postalCode FROM 5) AS zipcode
    ,CAST(regexp_replace(rent, '[^0-9]', '', 'g') as int) AS price
    FROM {table}
    WHERE 1=1
    AND substring(postalCode FROM 1 FOR 4) || ' ' || substring(postalCode FROM 5) ~ '^[0-9]{{4}} [A-Z]{{2}}$'
    AND regexp_replace(rent, '[^0-9]', '', 'g') <> ''
    """
    logger.info(f'trying to clean rental data')
    
    try:
        connection.sql(query)
        logger.info('succesfully cleaned the rental data')
    except Exception as e:
        logger.error(f'Error while cleaning the rental data: {e}')
        raise

In [None]:
def aggregate_data(connection):
    """
    This is a dedicated function to combine/aggregate the rentals and airbnb data.
    We do this by taking the average of the rental price and airbnb price seperated as they cant be compared
    We also add a count, which show how many observations in each zipcode are making up the total average.
    """


    query = """
    CREATE OR REPLACE TABLE aggregate_data AS
    SELECT
    zipcode
    ,ROUND(AVG(price_airbnb), 2) AS average_price_airbnb
    ,ROUND(AVG(price_rentals), 2) AS average_price_rental
    ,SUM(airbnb_flag) AS airbnb_count
    ,SUM(rental_flag) AS rental_count
    FROM
    (
        SELECT 
        zipcode
        ,price AS price_airbnb 
        ,NULL AS price_rentals
        ,1 AS airbnb_flag
        ,0 AS rental_flag
        FROM 
        airbnb_cleaned

        UNION ALL

        SELECT
        zipcode
        ,NULL AS price_airbnb
        ,price AS price_rentals
        ,0 AS airbnb_flag
        ,1 AS rental_flag
        FROM 
        rentals_cleaned
    ) AS T
    GROUP BY zipcode
    ORDER BY average_price_rental, average_price_airbnb asc
    """

    logger.info(f'trying to aggregate the rental and airbnb datasets')
    try:
        connection.sql(query)
        logger.info('succesfully aggregated the rental and airbnb datasets')
    except Exception as e:
        logger.error(f'Error while cleaning the rental data: {e}')
        raise