In [0]:
%load_ext autoreload
%autoreload 2
from IPython.display import (
    display, 
    HTML,
)
from databricks.connect import DatabricksSession
from pyspark.dbutils import DBUtils

from library.logger_provider import LoggerProvider
from library.qa.bronze_to_silver_test_data import BronzeToSilverTestData

In [0]:
# DBTITLE 1,Debug
# dbutils.widgets.text("schema_bronze_name", "bloomberg")
# dbutils.widgets.text("table_bronze_name", "ld_currency")
# dbutils.widgets.text("schema_silver_name", "bloomberg")
# dbutils.widgets.text("table_silver_name", "ld_currency")
# dbutils.widgets.text("list_pk_cols", "SECURITIES")
# dbutils.widgets.text("list_order_by_cols", "LAST_UPDATE_DT, px_last")
# dbutils.widgets.text("datatypes_definition_file", '/resources/schemas/bloomberg/ld_currency.json')

In [0]:
spark = DatabricksSession.builder.getOrCreate()
dbutils = DBUtils(spark)
logger = LoggerProvider.get_logger()

container_adsl_name = "quality-assurance"
step_layers = "staging_to_onpremises"
path_schema_json = dbutils.widgets.get("datatypes_definition_file")

# unit catalog bronze
catalog_bronze = 'dev_bronze'
schema_bronze_name = dbutils.widgets.get("schema_bronze_name")
table_bronze_name = dbutils.widgets.get("table_bronze_name")

# unit catalog silver
catalog_silver = 'dev_silver'
schema_silver_name = dbutils.widgets.get("schema_silver_name")
table_silver_name = dbutils.widgets.get("table_silver_name")

# cols
pk_cols = dbutils.widgets.get("list_pk_cols")
list_pk_cols = [x.strip().lower() for x in pk_cols.split(",")]
order_by_cols = dbutils.widgets.get("list_order_by_cols")
list_order_by_cols = [x.strip().lower() for x in order_by_cols.split(",")]

In [0]:
## Transform Bronze to Silver

In [0]:
data_tester = BronzeToSilverTestData(
    container_adsl_name=container_adsl_name,
    step_layers=step_layers,
    schema_bronze_name=schema_bronze_name,
    table_bronze_name=table_bronze_name,
    schema_silver_name=schema_silver_name,
    table_silver_name=table_silver_name,
    list_pk_cols=list_pk_cols,
    list_order_by_cols=list_order_by_cols,
    path_schema_json=path_schema_json,
)
data_tester.log_execution_parameters()
data_tester.validate_parameters()

In [0]:
df_bronze, df_silver = data_tester.get_and_prepare_data()

In [0]:
df_transformed_bronze = data_tester.apply_transformations()

In [0]:
kwargs = {'path_schema_json': path_schema_json}

dict_result_custom_tests = data_tester.execute_custom_data_tests(
    df_expected=df_silver,
    df_observed=df_transformed_bronze,
    kwargs=kwargs
)

In [0]:
validation_result = data_tester.execute_data_expectations(
    df_expected=df_transformed_bronze,
    df_observed=df_silver,
    dict_result_custom_tests=dict_result_custom_tests,
)

In [0]:
html_result_gx = data_tester.generate_results_html(validation_result)
display(HTML(html_result_gx))

In [0]:
data_tester.save_report_tests_azure_storage(
    html_result_gx=html_result_gx,
    container_name=data_tester.container_adsl_name,
    step_layers=data_tester.step_layers,
    schema_target_name=data_tester.schema_name,
    table_target_name=data_tester.table_name
)

In [0]:
report = data_tester.display_results(validation_result)
dbutils.notebook.exit(report)