In [0]:
import json
from datetime import datetime, timedelta
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
import sys

dbutils.widgets.removeAll()

dbutils.widgets.text("brewdat_library_version", "v1.1.5", "01 - brewdat_library_version")
brewdat_library_version = dbutils.widgets.get("brewdat_library_version")
print(f"{brewdat_library_version = }")

dbutils.widgets.text("target_database", "gld_maz_logistics_warehouse", "02 - target_database")
target_database = dbutils.widgets.get("target_database")
print(f"{target_database = }")

dbutils.widgets.text("target_table", "maz_materials_plus", "03 - target_table")
target_table = dbutils.widgets.get("target_table")
print(f"{target_table = }")

dbutils.widgets.text("target_zone", "maz", "04 - target_zone")
target_zone = dbutils.widgets.get("target_zone")
print(f"{target_zone = }")

dbutils.widgets.text("target_business_domain", "masterdata", "05 - target_business_domain")
target_business_domain = dbutils.widgets.get("target_business_domain")
print(f"{target_business_domain = }")

dbutils.widgets.text("target_subzone","copecac", "06 - target_subzone")
target_subzone = dbutils.widgets.get("target_subzone")
print(f"{target_subzone = }")

dbutils.widgets.text("target_product", "item", "07 - target_product")
target_product = dbutils.widgets.get("target_product")
print(f"{target_product = }")

date_start = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
date_end = (datetime.now()).strftime("%Y-%m-%d")

dbutils.widgets.text("data_interval_start", "{}".format(date_start), "08 - data_interval_start")
data_interval_start = dbutils.widgets.get("data_interval_start")
print(f"{data_interval_start = }")

dbutils.widgets.text("data_interval_end", "{}".format(date_end), "09 - data_interval_end")
data_interval_end = dbutils.widgets.get("data_interval_end")
print(f"{data_interval_end = }")

dbutils.widgets.text("mchb", "","10 - mchb")
mchb = dbutils.widgets.get("mchb")
print(f"{mchb = }")

dbutils.widgets.text("mard", "","11 - mard")
mard = dbutils.widgets.get("mard")
print(f"{mard = }")

In [0]:
# Import BrewDat Library modules and share dbutils globally
sys.path.append(f"/Workspace/Repos/brewdat_library/{brewdat_library_version}")
from brewdat.data_engineering import common_utils, lakehouse_utils, transform_utils, write_utils
common_utils.set_global_dbutils(dbutils)

# Print a module's help
# help(transform_utils)

In [0]:
%run "../set_project_context"

In [0]:
import os
os.getcwd()

In [0]:
"../set_project_context"

In [0]:
# Configure SPN for all ADLS access using AKV-backed secret scope
common_utils.configure_spn_access_for_adls(
    storage_account_names=[adls_silver_gold_storage_account_name],
    key_vault_name=key_vault_name,
    spn_client_id=spn_client_id,
    spn_secret_name=spn_secret_name,
)

# Set location for gold folder

params_list = [
    lakehouse_gold_root,
    target_zone,
    target_business_domain,
    target_subzone,
    target_product,
    target_table,
]

if any(x is None or len(x) == 0 for x in params_list):
    raise ValueError("Location would contain null or empty values.")

lakehouse_utils.assert_valid_zone(target_zone)
lakehouse_utils.assert_valid_business_domain(target_business_domain)
lakehouse_utils.assert_valid_folder_name(target_table)
target_location = (
    f"{lakehouse_gold_root}/data/{target_zone}/{target_business_domain}/"
    + f"gld_{target_zone}_{target_business_domain}_materials/{target_table}"
).lower()

print(f"{target_location = }")

In [0]:
# AQE
# In Databricks Runtime 7.3 LTS and above, AQE is enabled by default.
# https://docs.databricks.com/optimizations/aqe.html#adaptive-query-execution
# spark.conf.set("spark.databricks.optimizer.adaptive.enabled", True) 
spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.minPartitionNumber", 10000)
spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.enabled", True )
spark.conf.set("spark.databricks.adaptive.skewJoin.spillProof.enabled", True)

# Disk cache
# Some tables are used more than once, so caching is a possibility. However, workloads are usually small so it might not bring considerable improvements.
# If workloads get bigger, consider using disk cache. Also, there are worker instances optimized for caching. 
# https://docs.databricks.com/optimizations/disk-cache.html
# spark.conf.set("spark.databricks.io.cache.enabled", "true")

In [0]:
import os
environment = os.getenv("ENVIRONMENT")
if environment not in ["dev", "qa", "prod"]:
    raise Exception(
        "This Databricks Workspace does not have necessary environment variables."
        " Contact the admin team to set up the global init script and restart your cluster."
    )

if environment == 'dev':
    uc_name = 'brewdat_uc_maz_dev'
elif environment == 'qa':
    uc_name = 'brewdat_uc_maz_qa'
elif environment == 'prod':
    uc_name = 'brewdat_uc_maz_prod'

if target_subzone == 'copecac':
    mtda_source_schema = 'slv_maz_masterdata_sap_pr3'
    supply_source_schema = 'slv_maz_supply_sap_pr3'
elif target_subzone == 'mx':
    mtda_source_schema = 'slv_maz_masterdata_sap_pr0'
    supply_source_schema = 'slv_maz_supply_sap_pr0'

print(f"{mtda_source_schema=}\n{supply_source_schema=}\n{uc_name=}")

In [0]:
uc_name='brewdat_uc_maz_prod'

In [0]:
try:    
#     copecac_mchb = spark.read.table(f"{uc_name}.{supply_source_schema}.{target_subzone}_mchb").select("matnr", "werks", "lgort", "charg", "clabs", "cspem", "cinsm", "cretm", "ceinm", "cumlm", "cvmum", "cvmin", "cvmei", "cvmsp", "cvmre", "cvmla", "ersda", "laeda", "op_ind").filter(F.col("op_ind") != "D").alias("copecac_mchb")
    copecac_mchb = spark.read.table(mchb).select("matnr", "werks", "lgort", "charg", "clabs", "cspem", "cinsm", "cretm", "ceinm", "cumlm", "cvmum", "cvmin", "cvmei", "cvmsp", "cvmre", "cvmla", "ersda", "laeda", "op_ind").filter(F.col("op_ind") != "D").alias("copecac_mchb")

    # Catalog Table
    copecac_mara = spark.read.table(f"{uc_name}.{mtda_source_schema}.{target_subzone}_mara").select("matnr", "meins", "mtart", "matkl", "op_ind").filter(F.col("op_ind") != "D").drop("op_ind").alias("copecac_mara")

#     # Catalog Table
#     copecac_mard = spark.read.table(f"{uc_name}.{mtda_source_schema}.{target_subzone}_mard").select("matnr", "werks", "lgort", "lvorm", "labst", "speme", "insme", "retme", "einme", "op_ind").filter(F.col("op_ind") != "D").drop("op_ind").alias("copecac_mard")

    # Catalog Table
    copecac_mard = spark.read.table(mard).select("matnr", "werks", "lgort", "lvorm", "labst", "speme", "insme", "retme", "einme", "op_ind").filter(F.col("op_ind") != "D").drop("op_ind").alias("copecac_mard")

    # Catalog Table
    copecac_makt = spark.read.table(f"{uc_name}.{mtda_source_schema}.{target_subzone}_makt").select("matnr", "maktx", "spras", "op_ind").filter(F.col("op_ind") != "D").filter(F.col("spras") == 'S').drop("op_ind").drop("spras").alias("copecac_makt")

    # Catalog Table
    copecac_t001 = spark.read.table(f"{uc_name}.{mtda_source_schema}.{target_subzone}_t001").select("waers", "bukrs", "spras", "op_ind").filter(F.col("op_ind") != "D").filter(F.col("spras") == 'S').drop("op_ind").drop("spras").alias("copecac_t001")

    # Catalog Table
    copecac_t001l = spark.read.table(f"{uc_name}.{mtda_source_schema}.{target_subzone}_t001l").select("werks", "lgort", "lgobe", "op_ind").filter(F.col("op_ind") != "D").drop("op_ind").alias("copecac_t001l")

    # Catalog Table
    copecac_t001w = spark.read.table(f"{uc_name}.{mtda_source_schema}.{target_subzone}_t001w").select("werks", "vkorg", "name1", "spras", "op_ind","land1").filter(F.col("spras") == 'S').drop("op_ind").drop("spras").alias("copecac_t001w")

    # Catalog Table
    copecac_tvko = spark.read.table(f"{uc_name}.{mtda_source_schema}.{target_subzone}_tvko").select("vkorg", "bukrs", "op_ind").filter(F.col("op_ind") != "D").drop("op_ind").alias("copecac_tvko")

    # Catalog Table
    copecac_t006a = spark.read.table(f"{uc_name}.{mtda_source_schema}.{target_subzone}_t006a").select("mseh3", "msehi", "spras","op_ind").filter(F.col("op_ind") != "D").filter(F.col("spras") == 'S').drop("op_ind").drop("spras").alias("copecac_t006a")

    #Fetching data for specific country by joining the tables
    copecac_mchb = copecac_mchb.join(
            copecac_t001w,
            (F.trim(F.col("copecac_mchb.werks")) == F.trim(F.col("copecac_t001w.werks"))),
            "left",).\
            drop(copecac_t001w['werks'],copecac_t001w['name1'],copecac_t001w['vkorg']).alias("copecac_mchb")

    
    copecac_mard = copecac_mard.join(
            copecac_t001w,
            (F.trim(F.col("copecac_mard.werks")) == F.trim(F.col("copecac_t001w.werks"))),
            "left",).\
            drop(copecac_t001w['werks'],copecac_t001w['name1'],copecac_t001w['vkorg']).alias("copecac_mard")

except Exception:
    common_utils.exit_with_last_exception()

In [0]:
try:
    cols_inventory = {
        "country_code": F.trim(F.col("copecac_mard.land1")),
        "company_code": F.trim(F.col("copecac_tvko.bukrs")),
        "sales_organization_code": F.trim(F.col("copecac_tvko.vkorg")),
        "plant_code": F.trim(F.col("copecac_mard.werks")),
        "plant_name": F.trim(F.col("copecac_t001w.name1")),
        "storage_location_code": F.trim(F.col("copecac_mard.lgort")),
        "storage_location_name": F.trim(F.col("copecac_t001l.lgobe")),
        "product_code": F.trim(F.col("copecac_mard.matnr")),
        "product_short_code": F.regexp_replace(F.col("copecac_mard.matnr"), r"^[0]*", ""),
        "product_name": F.trim(F.col("copecac_makt.maktx")),
        "product_type_code": F.trim(F.col("copecac_mara.mtart")),
        "product_group": F.trim(F.col("copecac_mara.matkl")),
        "sales_uom_code": F.trim(F.col("copecac_t006a.mseh3")),
        "batch_code": F.col("copecac_mchb.charg"),
        "product_deletion_flag": F.trim(F.col("copecac_mard.lvorm")),
        "currency": F.trim(F.col("copecac_t001.waers"))
    }

except Exception:
    common_utils.exit_with_last_exception()

In [0]:
try:
    df_inventory = (
        copecac_mard.join(
            copecac_mchb,
            (
                (F.trim(F.col("copecac_mchb.matnr")) == F.trim(F.col("copecac_mard.matnr"))) &\
                (F.trim(F.col("copecac_mchb.werks")) == F.trim(F.col("copecac_mard.werks"))) &\
                (F.trim(F.col("copecac_mchb.lgort")) == F.trim(F.col("copecac_mard.lgort"))) &\
                (F.trim(F.col("copecac_mchb.land1")) == F.trim(F.col("copecac_mard.land1")))
            ),
            "left",
        )
        .join(
            copecac_mara,
            (F.trim(F.col("copecac_mara.matnr")) == F.trim(F.col("copecac_mard.matnr"))),
            "left",
        )
        .join(
            copecac_makt,
            (F.trim(F.col("copecac_mard.matnr")) == F.trim(F.col("copecac_makt.matnr"))),
            "left",
        )
        .join(
            copecac_t001w,
            (F.trim(F.col("copecac_mard.werks")) == F.trim(F.col("copecac_t001w.werks"))),
            "left",
        ).
        join(
            copecac_tvko,
            (F.trim(F.col("copecac_t001w.vkorg")) == F.trim(F.col("copecac_tvko.vkorg"))),
            "left",
        )
        .join(
            copecac_t001,
            (F.trim(F.col("copecac_tvko.bukrs")) == F.trim(F.col("copecac_t001.bukrs"))),
            "left",
        )
        .join(
            copecac_t001l,
            (F.trim(F.col("copecac_mard.werks")) == F.trim(F.col("copecac_t001l.werks")))
            & (F.trim(F.col("copecac_mard.lgort")) == F.trim(F.col("copecac_t001l.lgort"))),
            "left",
        ).
        join(
            copecac_t006a,
            (
                F.trim(F.upper(F.col("copecac_mara.meins")))
                == F.trim(F.upper(F.col("copecac_t006a.msehi")))
            ),
            "left",
        )
        .withColumns(cols_inventory)
        .select(list(cols_inventory.keys()))
    )

except Exception:
    common_utils.exit_with_last_exception()

In [0]:
# display(df_inventory.filter(F.col("product_code") == "000000000000303845"))

In [0]:
key_columns = ['country_code','product_code', 'plant_code', 'storage_location_code', 'batch_code']

In [0]:
try:
    df_final_products = transform_utils.deduplicate_records(df = df_inventory, key_columns=key_columns)
    df_final_products = transform_utils.create_or_replace_audit_columns(df_final_products)
    df_final_products = df_final_products.na.fill(value = 'NA', subset = key_columns)
    df_final_products = df_final_products.filter(~((F.col("country_code") == 'BO') | (F.col("country_code") == 'NA')))
except Exception:
    common_utils.exit_with_last_exception()

In [0]:
if target_subzone == 'mx':
    load_type = 'OVERWRITE_TABLE'
if target_subzone == 'copecac':
    load_type = 'APPEND_ALL'

In [0]:
results = write_utils.write_delta_table(
        df=df_final_products,
        location=target_location,
        database_name=target_database,
        key_columns=key_columns,
        table_name=target_table,
        # load_type=load_type,
        load_type='OVERWRITE_TABLE',
        schema_evolution_mode=write_utils.SchemaEvolutionMode.OVERWRITE_SCHEMA,
        auto_broadcast_join_threshold=-1,
        bad_record_handling_mode=write_utils.BadRecordHandlingMode.REJECT,
        enable_vacuum=False,
        enable_caching=False)
print(results)

In [0]:
common_utils.exit_with_object(results)