In [0]:
from datetime import datetime
from pyspark.sql.functions import lit

def copy_city_tables(city):
    prefix = 'berlin' if city == 'berlin' else 'stock'
    source_schema = 'some_catalog.airbnb_data'
    target_schema = 'some_catalog.airbnb_bronze'
    # Create target schema if not exists
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_schema}")
    tables = [
        t.name
        for t in spark.catalog.listTables(source_schema)
        if t.name.startswith(prefix)
    ]
    for table in tables:
        source_table = f'{source_schema}.{table}'
        target_table = f'{target_schema}.{table}'
        df = spark.table(source_table)
        df_with_metadata = df.withColumn(
            'load_timestamp',
            lit(datetime.now())
        )
        df_with_metadata.write.mode('overwrite').saveAsTable(target_table)

In [0]:
copy_city_tables('berlin')

In [0]:
copy_city_tables('stockholm')

In [0]:
from pyspark.sql.functions import col

def process_and_copy_city_tables(city):
    prefix = 'berlin' if city == 'berlin' else 'stock'
    source_schema = 'some_catalog.airbnb_bronze'
    target_schema = 'some_catalog.airbnb_silver'
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_schema}")
    tables = [
        t.name
        for t in spark.catalog.listTables(source_schema)
        if t.name.startswith(prefix)
    ]
    for table in tables:
        source_table = f'{source_schema}.{table}'
        target_table = f'{target_schema}.{table}'
        df = spark.table(source_table)
        # Remove duplicate rows
        df = df.dropDuplicates()
        # Remove rows where all values are null
        df = df.na.drop(how='all')
        # Remove rows with null id in listing/reviews tables
        if table.endswith('listing') or table.endswith('reviews'):
            df = df.filter(col('id').isNotNull())
        # Normalize date columns to ISO format if present
        for field in df.schema.fields:
            if field.name.lower() == 'date':
                df = df.withColumn('date', col('date').cast('date'))
        df.write.mode('overwrite').saveAsTable(target_table)

In [0]:
from pyspark.sql.functions import col

def process_and_copy_city_tables(city):
    prefix = 'berlin' if city == 'berlin' else 'stock'
    source_schema = 'some_catalog.airbnb_bronze'
    target_schema = 'some_catalog.airbnb_silver'
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_schema}")
    tables = [
        t.name
        for t in spark.catalog.listTables(source_schema)
        if t.name.startswith(prefix)
    ]
    for table in tables:
        source_table = f'{source_schema}.{table}'
        target_table = f'{target_schema}.{table}'
        df = spark.table(source_table)
        df = df.dropDuplicates()
        df = df.na.drop(how='all')
        # Remove rows with null id in listing tables
        if table.endswith('listing'):
            if 'id' in df.columns:
                df = df.filter(col('id').isNotNull())
        # Remove rows with null id in reviews tables, or null listing_id if no id column
        elif table.endswith('reviews') or table.endswith('review'):
            if 'id' in df.columns:
                df = df.filter(col('id').isNotNull())
            elif 'listing_id' in df.columns:
                df = df.filter(col('listing_id').isNotNull())
        for field in df.schema.fields:
            if field.name.lower() == 'date':
                df = df.withColumn('date', col('date').cast('date'))
        df.write.mode('overwrite').saveAsTable(target_table)


In [0]:
process_and_copy_city_tables('berlin')

In [0]:
process_and_copy_city_tables('stockholm')

In [0]:
def copy_city_tables_to_gold(city):
    prefix = 'berlin' if city == 'berlin' else 'stock'
    source_schema = 'some_catalog.airbnb_silver'
    target_schema = 'some_catalog.airbnb_gold'
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_schema}")
    tables = [
        t.name
        for t in spark.catalog.listTables(source_schema)
        if t.name.startswith(prefix)
    ]
    for table in tables:
        source_table = f'{source_schema}.{table}'
        target_table = f'{target_schema}.{table}'
        df = spark.table(source_table)
        df.write.mode('overwrite').saveAsTable(target_table)

In [0]:
copy_city_tables_to_gold('berlin')

In [0]:
copy_city_tables_to_gold('stockholm')