# raw_to_stg

Moves data from the raw zone to the staging zone, and applies necessary transformations to tables. You can change the staging zone catalog in Unity Catalog by setting the **stg_catalog_name** job parameter.

In [None]:
import sys, os
from datetime import datetime, timedelta

sys.path.append(os.path.abspath("../"))

run_date = datetime.strptime(dbutils.widgets.get("run_date"), '%Y-%m-%d')
start_date = run_date - timedelta(days=1)
secret_scope = dbutils.widgets.get("secret_scope")
ctr_catalog_name = dbutils.widgets.get("ctr_catalog_name")
raw_catalog_name = dbutils.widgets.get("raw_catalog_name")
stg_catalog_name = dbutils.widgets.get("stg_catalog_name")

We must define a dictionary called **transformations_dict** with the following structure:
- The keys are tuples composed of the names of the source schema and table.
- The values are lists of tuples, each tuple composed of the new table name (i.e. the one will be used in the staging zone) and a **transformation function**.

The **transformation function** is a function which will transform the source table. It receives a spark DataFrame as argument, and returns the transformed DataFrame. If a table needs no transformation between the raw and staging zones, then the transformation function may be set to *None*.

All tables will be renamed and transformed accordingly to the values specified in the dictionary. Tables which are not in the dictionary **will not be moved** to the staging zone.

Besides the transformations defined in the transformation function, other transformation will be subsequentily be applied to the tables. 
1. If the entry relative to the source table in the **data_ingestion** control table has a **stg_primary_key** different than NULL, then the primary key will be renamed to the value of this column. Note that the number of items of the stg_primary_key (separated by comma) must be equal to the number of columns that compose the table primary key.
2. All column names will conform to the snake_case naming convention.

In [None]:
from lib.transformations import sales

transformations_dict = {
    #Source schema and table name                   #New table name and transformation function
    ("Sales", "CountryRegionCurrency"):             [("country_region_currencies", None)],
    ("Sales", "Currency"):                          [("currencies", None)],
    ("Sales", "CurrencyRate"):                      [("currency_rates", None)],
    ("Sales", "Customer"):                          [("customers", None)],
    ("Sales", "PersonCreditCard"):                  [("person_credit_cards", None)],
    ("Sales", "SalesOrderHeaderSalesReason"):       [("order_header_sales_reasons", None)],
    ("Sales", "SalesPersonQuotaHistory"):           [("sales_person_quota_history", None)],
    ("Sales", "SalesReason"):                       [("sales_reasons", None)],
    ("Sales", "SalesTaxRate"):                      [("tax_rates", None)],
    ("Sales", "SalesTerritory"):                    [("territories", None)],
    ("Sales", "SalesTerritoryHistory"):             [("territory_history", None)],
    ("Sales", "ShoppingCartItem"):                  [("shopping_cart_items", None)],
    ("Sales", "SpecialOfferProduct"):               [("special_offer_products", None)],
    ("Sales", "CreditCard"):                        [("credits_cards", sales.transform_credit_cards)],
    ("Sales", "SalesOrderDetail"):                  [("order_details", sales.transform_order_details)],
    ("Sales", "SalesOrderHeader"):                  [("order_headers", sales.transform_order_headers)],
    ("Sales", "SalesPerson"):                       [("sales_people", sales.transform_sales_people)],
    ("Sales", "SpecialOffer"):                      [("special_offers", sales.transform_special_offers)],
    ("Sales", "Store"):                             [("stores", sales.transform_stores), ("store_demographics", sales.transform_store_demographics)]
}

In [0]:
from lib.el import DeltaDataLoader
from lib.logging import Log, Logger
from lib.naming import pascal_to_snake
from lib.transformations import column_names_to_snakecase, column_names_renamed

#Selects the tables that will be ingested
df_data_ingestion = (spark.read
    .table(f"{ctr_catalog_name}.loading.data_ingestion")
    .select(
        "schema_name", 
        "table_name", 
        "primary_key", 
        "stg_primary_key",
        "filter",
        "selected")
    .where("active = true")
)

logs = []
processing_has_failed = False
start_date_filter = f"TO_DATE('{start_date.strftime('%Y-%m-%d')}', 'yyyy-MM-dd')"

for row in df_data_ingestion.collect():
    schema_name = row["schema_name"]
    table_name = row["table_name"]
    selected = None if row["selected"] is None else row["selected"].replace(" ", "").split(",")
    primary_key = [pascal_to_snake(pk) for pk in row["primary_key"].replace(" ", "").split(",")]

    #If stg_primary_key is defined in loading.data_ingestion, uses this. Else, uses original primary key.
    if row["stg_primary_key"] is None:
        stg_primary_key = None
    else:
        stg_primary_key = [pascal_to_snake(pk) for pk in row["stg_primary_key"].replace(" ", "").split(",")]

    data_loader = DeltaDataLoader(
        schema_name = schema_name,
        table_name = table_name,
        primary_key = primary_key if stg_primary_key is None else stg_primary_key,
        selected = selected,
        catalog_name = raw_catalog_name
    )

    schema_name_new = pascal_to_snake(schema_name)

    if (schema_name, table_name) in transformations_dict:
        transformations = transformations_dict[(schema_name, table_name)]
        
        for transformation in transformations:
            table_name_new = transformation[0]

            log = Log(
                    target_catalog_name = stg_catalog_name, 
                    target_schema_name = schema_name_new,
                    target_table_name = table_name_new, 
                    source_catalog_name = raw_catalog_name, 
                    source_schema_name = schema_name, 
                    source_table_name = table_name
                )
            
            try: 
                table_full_name = f"{stg_catalog_name}.{schema_name_new}.{table_name_new}"
                #If a transformation function was defined for this table, uses it.
                #Else, sets a passthrough function.
                on_transform = (lambda df:df) if transformation[1] is None else transformation[1]

                #If the table exists, loads it incrementally, else loads the full table
                if spark.catalog.tableExists(table_full_name):
                    filter = row["filter"].replace(":start_date", start_date_filter)
                else:
                    filter = None   

                #Extracts data from raw zone, applies default and custom transformations, and loads
                #the data into staging zone.
                data_loader \
                    .extract(filter) \
                    .apply(on_transform) \
                    .apply(column_names_to_snakecase) \
                    .apply(lambda df: column_names_renamed(df, primary_key, stg_primary_key)) \
                    .load_into(table_full_name)
                
                log.movements = data_loader.df.count()

            except Exception as e:
                log.error = repr(e)
                processing_has_failed = True
        
            logs.append(log)

logger = Logger(f"{ctr_catalog_name}.loading.data_ingestion_log")

logger.log(logs)

if processing_has_failed:
    raise RuntimeError("Processing of one or more tables has failed. Check the data ingestion log for further info.")