## Transform: CountryRegionCurrency

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
from delta.tables import DeltaTable
from pyspark.sql.functions import lit
from concurrent.futures import ThreadPoolExecutor

In [3]:
with open('config.json', 'r') as f:
    config = json.load(f)

DATABASE = config["general"]["database"]
SCHEMA = config["general"]["schema"]
RAW_CATALOG = f'{config["general"]["user"]}_raw'
STG_CATALOG = f'{config["general"]["user"]}_stg'
tables_list = list(config["tables"].keys())
tables_dict = config["tables"]

In [4]:
cmd = f"DROP SCHEMA IF EXISTS {STG_CATALOG}.{SCHEMA} CASCADE"
spark.sql(cmd)
create_schema = f"CREATE SCHEMA IF NOT EXISTS {STG_CATALOG}.{SCHEMA}"
spark.sql(create_schema)


In [5]:
def read_raw_df(table_name):
    spark.sql(f"USE CATALOG {RAW_CATALOG}")
    raw_df = spark.table(f"{RAW_CATALOG}.{SCHEMA}.{table_name}")
    return raw_df

In [6]:
def save_stg_df(stg_df, table_name):
    spark.sql(f"USE CATALOG {STG_CATALOG}")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {STG_CATALOG}.{SCHEMA}")
    stg_df.write.format("delta").mode("append").saveAsTable(f"{STG_CATALOG}.{SCHEMA}.{table_name}_stg")


In [7]:
def rename_columns(df, columns_map):
    df = df.select(list(columns_map.keys()))
    for original, alias in columns_map.items():
        df = df.withColumnRenamed(original, alias)
    
    return df

In [8]:

def insert_origin_columns(df, schema):
    df = df.withColumn("source_name", lit("adventure_works"))\
           .withColumn("source_departament", lit(SCHEMA))
    return df

## Table 1: CountryRegionCurrency

In [None]:
def load_data(table):
    print("Runing to table:", table)
    raw_df = read_raw_df(table)

    if raw_df.count():
        columns_map = tables_dict[table]["stg_columns"]
        stg_df = rename_columns(raw_df, columns_map)
        stg_df = insert_origin_columns(stg_df, SCHEMA)
        stg_table_name = f"{STG_CATALOG}.{SCHEMA}.{tables_dict[table]['stg_name']}_stg"
    
        table_exists = spark.catalog.tableExists(stg_table_name)

        if table_exists:
                primary_key_columns = tables_dict[table]["primary_key"]
                merge_condition = " AND ".join([f"target.{col} = source.{col}" for col in primary_key_columns])
        
                delta_table = DeltaTable.forName(spark, stg_table_name)
                # Perform the merge operation if the table exists
                delta_table.alias("target").merge(
                    stg_df.alias("source"),
                    merge_condition
                ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
            # Create the Delta table if it doesn't exist
            stg_df.write.format("delta").saveAsTable(stg_table_name)
    else:
        print('Nothing to update')

In [10]:
with ThreadPoolExecutor(max_workers=8) as executor:
    executor.map(load_data, tables_list)

CountryRegionCurrency
CreditCard
Currency
CurrencyRate
Customer
PersonCreditCard
SalesOrderDetail
SalesOrderHeader
SalesOrderHeaderSalesReason
SalesPerson
SalesPersonQuotaHistory
SalesReason
SalesTaxRate
SalesTerritory
SalesTerritoryHistory
ShoppingCartItem
SpecialOffer
SpecialOfferProduct
Store
