In [None]:
#
# CONFIGURE RUN-TIME PARAMETERS FOR THIS NOTEBOOK
#
keep_invalid_records = True

# Define source and target details
layer = "bronze"
db_schema = "dbo"
application = "warehouse"
lakehouse_name = "AdventureWorks_Lakehouse"
warehouse_name = "AdventureWorks_Warehouse"


# Define the OneLake folder path
workspace_id = " [ YOUR ID HERE ] " ## Adv Wrks DE 3 Dev
lakehouse_id = " [ YOUR ID HERE ] " ## AdventureWorks_Lakehouse
folder = "/Files/" + layer + "/" + application
folder_path = "abfss://" + workspace_id + "@onelake.dfs.fabric.microsoft.com/" + lakehouse_id + folder

print("Paramaters for this run:")
print(f"\nkeep_invalid_records = {keep_invalid_records}")
print(f"folder_path = {folder_path}")
print(f"\nConfigured to process tables from '{lakehouse_name}' into '{warehouse_name}' database schema '{db_schema}' tables.")

StatementMeta(, ab84e929-63f0-4086-b2a8-eba096a3b544, 3, Finished, Available, Finished)

Paramaters for this run:

keep_invalid_records = True
folder_path = abfss://3ac7ce42-ae74-4e7d-8ac3-5ce8358a30df@onelake.dfs.fabric.microsoft.com/50402dac-ce50-4831-af2b-7d65ca8fe7db/Files/bronze/warehouse

Configured to process tables from 'AdventureWorks_Lakehouse' into 'AdventureWorks_Warehouse' database schema 'dbo' tables.


In [2]:
from pyspark.sql import SparkSession

app_name = "LoadLakehouseToWarehouse"

# Get the current Spark session
spark = SparkSession.builder \
    .appName(app_name) \
    .getOrCreate()

print(f"Spark session {app_name} has been created successfully.")

StatementMeta(, ab84e929-63f0-4086-b2a8-eba096a3b544, 4, Finished, Available, Finished)

Spark session LoadLakehouseToWarehouse has been created successfully.


In [3]:
from pyspark.sql.functions import col, when, encode, to_date, lit, regexp_replace
from pyspark.sql.types import *

# Regex to remove multi-byte characters
# Matches any character outside the standard ASCII range (0-127)
pattern = r"[^\x00-\x7F]"

# List all files in the folder
file_list = spark.read.format("binaryFile").load(folder_path).select("path").collect()

# Iterate through each file to get the table name, 
#    then load the corresponding table into a dataframe
for file in file_list:
    file_path = file["path"]
    
    if file_path.endswith(".csv"):  # Ensure the file is a CSV
        # Extract the table name from the file name
        table_name = file_path.split("/")[-1].split(".")[0]

        # Read data from the corresponding Lakehouse table
        full_lhse_table_name = lakehouse_name + "." + db_schema + "." + table_name
        lakehouse_df = spark.read.format("delta") \
            .table(full_lhse_table_name)

        # Read the existing corresponding warehouse table to get its' schema
        full_whse_table_name = warehouse_name + "." + db_schema + "." + table_name
        warehouse_df = spark.read.table(full_whse_table_name)

        # Transform lakehouse fields based the warehouse schema's list of fields
        transformed_df = lakehouse_df

        for field in warehouse_df.schema.fields:
            #
            # Data Transformation
            if isinstance(field.dataType, BinaryType):
                transformed_df = transformed_df.withColumn(field.name, encode(col(field.name), "UTF-8"))
            elif isinstance(field.dataType, BooleanType):
                transformed_df = transformed_df.withColumn( field.name, \
                    when(col(field.name) == "1", True) \
                    .when(col(field.name) == "0", False) \
                    .when(col(field.name) == "True", True) \
                    .when(col(field.name) == "False", False) \
                    .otherwise(False) \
                )
            elif isinstance(field.dataType, StringType):
                transformed_df = transformed_df.withColumn(field.name, regexp_replace(col(field.name), pattern, ""))
            
            #
            # Data Type Conversion
            transformed_df = transformed_df.withColumn(field.name, col(field.name).cast(field.dataType))
            
            #
            # Nullable Transformation
            if not field.nullable:
                if keep_invalid_records:
                    if isinstance(field.dataType, StringType):
                        transformed_df = transformed_df.fillna({field.name: "Unknown"})
                    elif isinstance(field.dataType, DateType)| isinstance(field.dataType, TimestampType):
                        transformed_df = transformed_df.fillna({field.name: "9999-01-01"})
                    elif isinstance(field.dataType, LongType)| isinstance(field.dataType, DecimalType) | \
                        isinstance(field.dataType, DoubleType)| isinstance(field.dataType, FloatType) | \
                        isinstance(field.dataType, IntegerType) | isinstance(field.dataType, ShortType):
                        transformed_df = transformed_df.fillna({field.name: 0})
                    elif isinstance(field.dataType, BinaryType):
                        transformed_df = transformed_df.fillna({field.name: bytearray([0])})
                else:
                    transformed_df = transformed_df.dropna(subset=[field.name])
        ##endfor field

        # Write the transformed table to the Warehouse (Lakehouse: See NOTE below.)
        # NOTE: Seems to be an issue '403 Forbibben' writing direct to the Warehouse here
        #       Workaround is to write to a transient table in the Lakehouse
        #       The new CopyJob artifact will then load the conformed tables to the Warehouse
        #       An optional next step would the drop all the transient tables in the Lakehouse
        transformed_table = lakehouse_name + "." + db_schema + ".slv_" + table_name
        spark.sql(f"DROP TABLE IF EXISTS {transformed_table}")
        transformed_df.write.mode("overwrite") \
            .saveAsTable(transformed_table)
        print(f"Loaded {transformed_df.count():,} transformed data rows into {transformed_table}")
    ##endif .csv file
##endfor file

StatementMeta(, ab84e929-63f0-4086-b2a8-eba096a3b544, 5, Finished, Available, Finished)

Loaded 776,286 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_FactProductInventory
Loaded 60,855 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_FactResellerSales
Loaded 60,398 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_FactInternetSales
Loaded 606 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_DimProduct
Loaded 296 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_DimEmployee
Loaded 18,484 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_DimCustomer
Loaded 39,409 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_FactFinance
Loaded 14,264 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_FactCurrencyRate
Loaded 11 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_DimSalesTerritory
Loaded 64,515 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_FactInternetSalesReason
Loaded 2,059 transformed data rows into AdventureWorks_Lakehouse.dbo.slv_ProspectiveBuyer
Loaded 3,652 tra

In [4]:
# Stop the Spark session
# NOTE: frees up limited F2 SKU capacity resources
spark.stop()

print("Spark session has been stopped successfully.")

StatementMeta(, ab84e929-63f0-4086-b2a8-eba096a3b544, 6, Finished, Available, Finished)

Spark session has been stopped successfully.
