Connect to storage


In [0]:
secret_scope_name = 'adls-access-scope'
storage_account_name = dbutils.secrets.get(scope=secret_scope_name, key='storage-account-name')
storage_account_key = dbutils.secrets.get(scope=secret_scope_name, key='storage-account-key')
adls_container_name = 'data'

spark.conf.set(f'fs.azure.account.key.{storage_account_name}.dfs.core.windows.net', storage_account_key)

expedia_input_path = f'abfs://{adls_container_name}@{storage_account_name}.dfs.core.windows.net/m07sparksql/expedia'
hotel_weather_input_path = f'abfs://{adls_container_name}@{storage_account_name}.dfs.core.windows.net/m07sparksql/hotel-weather'


Verify paths and data existence 

In [0]:
try:
    dbutils.fs.ls(expedia_input_path)
    dbutils.fs.ls(hotel_weather_input_path)
except Exception as e:
    print(f"Error: Source files not found/Access issue occured: {e}")
    raise

Import data

In [0]:
expedia_df = spark.read.format("avro").option("header", "true").load(expedia_input_path)
hotel_weather_df = spark.read.parquet(hotel_weather_input_path)

Encrypt PII fields

In [0]:
%run "../utils/encryption_helper"

In [0]:
%run "../utils/configuration"

In [0]:
encryption_helper = EncryptionHelper(dbutils)

try:
    expedia_df = encryption_helper.encrypt_dataframe(expedia_df, common_pii_fields['expedia'])
    hotel_weather_df = encryption_helper.encrypt_dataframe(hotel_weather_df, common_pii_fields['hotel_weather'])
except Exception as e:
    print(f'Error during encryption: {e}')
    raise

Store raw data


In [0]:
expedia_df.write.mode("overwrite").format("delta").saveAsTable("bronze.expedia_raw")
hotel_weather_df.write.mode("overwrite").format("delta").saveAsTable("bronze.hotel_weather_raw")