In [0]:
# HISTORIZE TABLE
# HISTORIZE = None
HISTORIZE = dbutils.widgets.text("HISTORIZE", "true") == "true"

# SOURCE CONFIG
SOURCE_LAKEHOUSE = dbutils.widgets.text("SOURCE_LAKEHOUSE")
SOURCE_SCHEMA = dbutils.widgets.text("SOURCE_SCHEMA")
SOURCE_TABLENAME = dbutils.widgets.text("SOURCE_TABLENAME")

# DESTINATION CONFIG
DESTINATION_LAKEHOUSE = dbutils.widgets.text("DESTINATION_LAKEHOUSE")
DESTINATION_SCHEMA = dbutils.widgets.text("DESTINATION_SCHEMA")
DESTINATION_TABLENAME = dbutils.widgets.text("DESTINATION_TABLENAME")

# NATURAL KEY COLUMNS
NK_COLUMNS = dbutils.widgets.text("NK_COLUMNS", "")

# DELTA LOAD CONFIG
IS_DELTA_LOAD = dbutils.widgets.text("IS_DELTA_LOAD", "true") == "true"
DELTA_LOAD_USE_BROADCAST = dbutils.widgets.text("DELTA_LOAD_USE_BROADCAST", "true") == "true"

# INCLUDE / EXCLUDES COLUMNS FROM COMPARING
EXCLUDE_COLUMNS_FROM_COMPARING = dbutils.widgets.text("EXCLUDE_COLUMNS_FROM_COMPARING", [])
INCLUDE_COLUMNS_FROM_COMPARING = dbutils.widgets.text("INCLUDE_COLUMNS_FROM_COMPARING", [])

# PARTITION BY COLUMNS
PARTITION_BY_COLUMNS = dbutils.widgets.text("PARTITION_BY_COLUMNS", [])

In [0]:
# Bring parameters in shape

if isinstance(NK_COLUMNS, str):
    NK_COLUMNS = list([column.strip() for column in NK_COLUMNS.split(",")])
    print("NK-COLUMNS: ", NK_COLUMNS)

if isinstance(EXCLUDE_COLUMNS_FROM_COMPARING, str):
    EXCLUDE_COLUMNS_FROM_COMPARING = list(set([column.strip() for column in EXCLUDE_COLUMNS_FROM_COMPARING.split(",")]))
    print("EXCLUDE_COLUMNS_FROM_COMPARING: ", EXCLUDE_COLUMNS_FROM_COMPARING)

if isinstance(INCLUDE_COLUMNS_FROM_COMPARING, str):
    INCLUDE_COLUMNS_FROM_COMPARING = list(set([column.strip() for column in INCLUDE_COLUMNS_FROM_COMPARING.split(",")]))
    print("INCLUDE_COLUMNS_FROM_COMPARING: ", INCLUDE_COLUMNS_FROM_COMPARING)

if isinstance(PARTITION_BY_COLUMNS, str):
    PARTITION_BY_COLUMNS = list([column.strip() for column in PARTITION_BY_COLUMNS.split(",")])
    print("PARTITION_BY_COLUMNS: ", PARTITION_BY_COLUMNS)


In [0]:
%run "/Workspace/Users/rico.goerlitz@gmail.com/explore-azure-databricks-project/datahub/databricks/git/100 etl/001 MASTER NOTEBOOK/MASTER_SILVERINGESTION"

In [0]:
from dataclasses import dataclass
from datetime import datetime


# LOGGING
start_time = datetime.now()
print(f"\nETL STARTED AT:\t\t{start_time}")


# ADD TRANSFORMATIONS TO ETL
TRANSFORMATIONS: dict = {
    "{tableName}": lambda: print("Add transformation function (instead of lambda: ...)"),
}


# RUN ETL
spark.sql(f"""
    CREATE DATABASE IF NOT EXISTS {DESTINATION_LAKEHOUSE}
    LOCATION '/mnt/datahub/{DESTINATION_LAKEHOUSE}'
""")

etl.init(
    spark=spark,
    src_lakehouse=SOURCE_LAKEHOUSE,
    src_schema=SOURCE_SCHEMA,
    src_tablename=SOURCE_TABLENAME,
    dist_lakehouse=DESTINATION_LAKEHOUSE,
    dist_schema=DESTINATION_SCHEMA,
    dist_tablename=DESTINATION_TABLENAME,
    nk_columns=NK_COLUMNS,
    constant_columns=CONSTANT_COLUMNS,
    is_delta_load=IS_DELTA_LOAD,
    delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,
    transformations=TRANSFORMATIONS,
    exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,
    include_comparing_columns=INCLUDE_COLUMNS_FROM_COMPARING,
    historize=HISTORIZE,
    partition_by_columns=PARTITION_BY_COLUMNS,
)

print("ETL CONFIG:", str(etl))

etl.ingest()


# LOG ETL TIME DURATION
end_time = datetime.now()
etl_time_duration = (end_time - start_time).total_seconds()

print(f"ETL ENDED AT:\t\t{end_time}")
print(f"ETL TIME DURATION:\t{etl_time_duration}s")