In [0]:
%load_ext autoreload
%autoreload 2
"spark.databricks.sql.dsv2.unique.enabled"

transformation
- table
  - source
  - default
  - keys
  - columns
    - transformations

In [0]:
import sys
from pyspark.sql import DataFrame
sys.path.append('/Workspace/databricks_bootcamp/silver/')

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from typing import Any
from pyspark.sql.types import IntegerType
from functools import reduce
from pyspark.sql.window import Window
from pyspark.sql.functions import xxhash64
from delta.tables import DeltaTable


In [0]:
import importlib

import meta_driven_transformations
# importlib.reload(meta_driven_transformations)

TRANSFORMATION = meta_driven_transformations.TRANSFORMATION


In [0]:
def default_transformations(df: DataFrame, df_name: str, meta_data: dict[str, Any]) -> DataFrame:
    table_configs = meta_data[df_name]
    if "defaults" in table_configs:
        df = df.select(*[
            F.lower(F.trim(F.col(c))).alias(c) for c in df.columns
        ])

    return df

    # we lower and upper every column, even customer ids becasue we do not want mixed upper and lower ids, and using upper is just as arbitrary as lower

In [0]:
def rename_columns(df: DataFrame, df_name: str, meta_data: dict[str, Any]) -> DataFrame:
    table_configs = meta_data[df_name]
    columns = table_configs["columns"]

    select_expr = [F.col(col_val["source"]).alias(col_key) for col_key, col_val in columns.items()]
    
    return df.select(
            *select_expr
        )

In [0]:
def handle_nulls_and_empty_strings(df: DataFrame, df_name: str, meta_data: dict[str, Any]) -> DataFrame:
    table_configs = meta_data[f"{df_name}"]
    string_cols: str = []
    int_cols: str = []
    date_cols: dict[str, list[str]] = {}

    for col_key, col_val in table_configs["columns"].items():
        cast_types = col_val.get("cast")
        if "cast" not in col_val:
            string_cols.append(col_key)
        elif "int" in cast_types:
            int_cols.append(col_key)
        elif "date" in cast_types:
            date_format = cast_types["default_value"]
            date_cols.setdefault(date_format, []).append(col_key)
        else:
            raise ValueError(f"Type {cast_types} not supported for column {col_key}")

    expr = []

    [
        expr.append(
            F.when(
                (F.col(c).isNull()) | (F.col(c) == ""), "unknown"
                )\
                .otherwise(F.col(c))\
                .alias(c)
            )
        for c in string_cols
    ]

    [
        expr.append(
            F.when(
                (F.col(c).isNull()) | (F.col(c) == ""), "0"
                )\
                .otherwise(F.col(c))\
                .alias(c)
            )
        for c in int_cols
    ]

    for date_format, c in date_cols.items():
        for col in c:
            expr.append(
            F.when(
                (F.col(col).isNull()) | (F.col(col) == ""), date_format
                )\
                .otherwise(F.col(col))\
                .alias(col)
            )

    return df.select(
        *expr
    )

In [0]:
def apply_enum_aliases(df: DataFrame, df_name: str, meta_data: dict[str, any]) -> DataFrame:
    table_configs = meta_data[df_name]

    exprs = []
    quarantine_dfs = []

    for col_key, col_val in table_configs["columns"].items():
        if "map" in col_val:
            w = None
            match_cond = None

            for desired_val, possible_vals in col_val["map"].items():
                cond = F.col(col_key).isin(possible_vals)
                match_cond = cond if match_cond is None else (match_cond | cond)
                w = F.when(cond, F.lit(desired_val)) if w is None else w.when(cond, F.lit(desired_val))

            normalized_col = w.otherwise(F.lit("unknown"))

        else:
            normalized_col = F.col(col_key)
            match_cond = None 

        exprs.append(normalized_col.alias(col_key))

        if "validation" in col_val and "enum" in col_val["validation"]:
            valid_values = col_val["validation"]["enum"]

            invalid_condition = (
                ~normalized_col.isin(valid_values)
            )

            invalid_rows = (
                df.filter(invalid_condition)
                  .select(
                      *[
                          normalized_col.alias(col_key)
                          if c == col_key
                          else F.lit("unknown").alias(c)
                          for c in df.columns
                      ]
                  )
            )

            quarantine_dfs.append(invalid_rows)

    main_df = df.select(*exprs)

    if quarantine_dfs:
        quarantine_df = reduce(DataFrame.unionByName, quarantine_dfs)
        # return main_df.unionByName(quarantine_df)

    return main_df

In [0]:
def casting(df: DataFrame, df_name: str, meta_data:dict[str, Any]) -> DataFrame:
    table_configs = meta_data[f"{df_name}"]
    expr = []

    for col_key, col_val in table_configs["columns"].items():
        
        if not col_val.get("cast"):
            expr.append(F.col(col_key))
        
        elif "int" in col_val["cast"]:
            expr.append(F.col(col_key).cast("int").alias(col_key))
        
        elif "date" in col_val["cast"]:
            expr.append(F.try_to_date(F.col(col_key), col_val["cast"]["date"]).alias(col_key))

    return df.select(
        *expr
    )

In [0]:
def handle_hyphon(df: DataFrame, df_name: str, meta_data: dict[str, Any]) -> DataFrame:
    table_configs = meta_data[df_name]
    expr = []

    for col_key, col_val in table_configs["columns"].items():
        if col_val.get("replace_hyphon", False):
            expr.append(
                F.regexp_replace(
                    F.col(col_key),
                    r"[\s\-]+",
                    "_"
                ).alias(col_key)
            )
        elif col_val.get("remove_hyphon"):
            expr.append(
                F.regexp_replace(F.col(col_key), "-", "").alias(col_key)
            )

        else:
            expr.append(F.col(col_key))
    return df.select(*expr)

In [0]:
# table = "silver_crm_sales_info"
# df = default_transformations(df, table,TRANSFORMATION) 
# df = rename_columns(df, table, TRANSFORMATION)
# df = handle_nulls_and_empty_strings(df, table, TRANSFORMATION)
# df = apply_enum_aliases(df, table, TRANSFORMATION)
# df = casting(df, table, TRANSFORMATION)
# df = replace_blank_spaces(df, table, TRANSFORMATION)

In [0]:
def data_augmentation(df, df_name, meta_data):
    table = meta_data[df_name]
    expr_list = []
    if augmented_cols_list := table.get("data_augmentation_columns", []):
        ops = {
            "+": lambda a, b: a + b,
            "-": lambda a, b: a - b,
            "*": lambda a, b: a * b,
            "/": lambda a, b: a / b,
            }

        for cols in df.columns:
            if cols in augmented_cols_list:
                operation = table.get("columns").get(cols).get("expression")

                left  = F.col(operation[0])
                right = F.col(operation[2])
                op = operation[1]

                expr = F.when((left == 0) | (right == 0), F.col(cols))\
                    .otherwise(F.round(ops[op](left, right))).cast("int")
                
                expr_list.append(
                    F.when(
                        F.col(cols) == 0,
                        expr
                    ).otherwise(F.col(cols))\
                    .alias(cols)
                )
            else:
                expr_list.append(F.col(cols))
        return df.select(*expr_list)
    return df

In [0]:
def intelligent_key_preparation(preprocessed_tables, meta_data):
    intelligent_tables = {}
    for df_name, df in preprocessed_tables.items():
        table = meta_data[df_name]
        if table.get("intelligent_key"):
            expr_list = []

            intelligent_key_mapping = table.get("intelligent_key", {})

            derived_cols = intelligent_key_mapping.get("intelligent_key")["source"]

            untouched_cols = [
                F.col(col)
                for col in df.columns
                if col != derived_cols
            ]

            for new_column_name, column_values in intelligent_key_mapping.items():
                source_column = column_values["source"]
                start, length = column_values["substr"]

                expr_list.append(
                        F.substring(F.col(source_column), start, length).alias(new_column_name)
                )

            expr_list.extend(untouched_cols)
            
            int_key_removed_df = df.select(*expr_list)

            intelligent_tables [df_name] = int_key_removed_df
        else: 
            intelligent_tables[df_name] = df
    return intelligent_tables 

In [0]:
def surrogate_key_addition(intelligent_tables, meta_data):
    surrogate_key_dict = {}
    surrogate_tables = {}

    for table_name in intelligent_tables:
        surrogate_config = list(meta_data[table_name]["surrogate_key"].keys())
        if "dim_customer_sk" in surrogate_config:
            surrogate_key_dict.setdefault("dim_customer_sk", []).append(table_name)
        
        if "dim_product_sk" in surrogate_config:
            surrogate_key_dict.setdefault("dim_product_sk", []).append(table_name)

        if "dim_category_sk" in surrogate_config:
            surrogate_key_dict.setdefault("dim_category_sk", []).append(table_name)

    for key, tables in surrogate_key_dict.items():

        # drop_logic = f"DROP TABLE IF EXISTS bike_data_lakehouse.surrogate_keys.surrogate_key_{key}_table"

        # spark.sql(drop_logic)
        column_name = meta_data[tables[0]]["surrogate_key"][key]

        create_table_ddl = f"""
        CREATE TABLE IF NOT EXISTS bike_data_lakehouse.surrogate_keys.surrogate_key_{key}_table (
            {column_name} STRING NOT NULL,
            {key} LONG NOT NULL
        )
        """
        spark.sql(create_table_ddl)

        dfs = [
            intelligent_tables[t].select(column_name)
            for t in tables
        ]

        dimension_df = reduce(
            lambda df1, df2: df1.union(df2),
            dfs
        )

        dimension_df = (
            dimension_df
            .select(F.col(column_name))
            .distinct()
        )


        df_with_sk = dimension_df.withColumn(
            key,
            xxhash64(F.col(column_name))
        )

        delta_target = DeltaTable.forName(spark, f"bike_data_lakehouse.surrogate_keys.surrogate_key_{key}_table")

        (
        delta_target.alias("t")
        .merge(
            df_with_sk.alias("s"),
            f"t.{column_name} = s.{column_name}"
        )
        .whenNotMatchedInsertAll()
        .execute()
        )
    
    sk_dict = {}
    sk_tables_sql = "SHOW TABLES IN bike_data_lakehouse.surrogate_keys"
    sk_tables = [row.tableName for row in spark.sql(sk_tables_sql).collect()]
    
    for table in sk_tables:
        sk_dict[table] = spark.read.table(f"bike_data_lakehouse.surrogate_keys.{table}")

    for table_name, data_frame in intelligent_tables.items():
        for surrogate_key, column_name in meta_data[table_name]["surrogate_key"].items():
            sk_join_table = sk_dict[f"surrogate_key_{surrogate_key}_table"]
            data_frame = data_frame.join(
                F.broadcast(sk_join_table),
                on=[column_name],
                how="left"
            )

        surrogate_tables[f"{table_name}"] = data_frame # we do not do the joins, we just overwrite it 
    return surrogate_tables 

In [0]:
def second_nf_configuration(surrogate_tables, meta_data):
    # only designed for crm_customer_info
    normalized_tables = {}

    for df_name, df in surrogate_tables.items():
        table = meta_data[df_name]
        primary_key = list(table["primary_key"].keys())

        
        pk_unknown_conditions = [
            F.col(col) == F.lit("unknown") for col in primary_key
        ]

        unknown_condition = reduce(lambda a, b: a | b, pk_unknown_conditions)

        malformed_condition = (F.col("duplicate_rows") > 1) | unknown_condition

        duplicate_rows = (
            df.groupBy(*primary_key)
            .agg(F.count("*").alias("duplicate_rows"))
            .filter(malformed_condition)
        )

        duplicate_keys = [
            tuple(row[col] for col in primary_key)
            for row in duplicate_rows.collect()
        ]

        duplicate_key_structs = [
            F.struct(*[F.lit(v) for v in key_tuple])
            for key_tuple in duplicate_keys
        ]


        # quarantine_df = df.join(duplicate_rows, on=primary_key, how="inner")

        second_nf_compliant_rows = df.filter(
            ~F.struct(*primary_key).isin(duplicate_key_structs)
        )

        # upload to a quarantine

        normalized_tables[df_name] = second_nf_compliant_rows
    
    return normalized_tables 
    

In [0]:
def create_table(normalized_tables, meta_data):
    catalog = "bike_data_lakehouse"
    schema = "silver"
    for df_name, df in normalized_tables.items():

        drop_logic = f"DROP TABLE IF EXISTS bike_data_lakehouse.silver.{df_name}"

        spark.sql(drop_logic)

        table_meta = meta_data[df_name]
        primary_key = table_meta["primary_key"]

        cast_types = {"date", "int"}

        expr = []
        table_constraints = []
        composite_pk_cols = []

        is_composite_pk = len(primary_key) > 1

        for col_name, col_val in table_meta["columns"].items():

            cast_info = col_val.get("cast", {})
            cast_type = next((k for k in cast_info if k in cast_types), None)
            dtype = cast_type.upper() if cast_type else "STRING"

            if col_name in primary_key:
                length_of_pk = primary_key[col_name]
                if is_composite_pk:
                    expr.append(
                        f"{col_name} {dtype} NOT NULL "
                        f"CHECK (LENGTH({col_name}) = {length_of_pk})"
                    )
                    composite_pk_cols.append(col_name)
                else:
                    expr.append(
                        f"{col_name} {dtype} NOT NULL PRIMARY KEY UNIQUE "
                        f"CHECK (LENGTH({col_name}) = {length_of_pk})"
                    )
            else:
                expr.append(f"{col_name} {dtype}")

        if is_composite_pk:
            cols = ", ".join(composite_pk_cols)
            table_constraints.append(
                f"CONSTRAINT pk_{df_name} PRIMARY KEY ({cols})"
            )
            table_constraints.append(
                f"CONSTRAINT uq_{df_name} UNIQUE ({cols})"
            )

        column_expr = ", ".join(expr + table_constraints)

        create_table_sql = f"""
            CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{df_name} (
                {column_expr}
            )
        """

        show_tables = f"SHOW TABLES IN {catalog}.{schema}"
        existing_tables = spark.sql(show_tables).collect()
        existing_table_names = {row.tableName for row in existing_tables}

        if df_name not in existing_table_names:
            df.write.mode("append").saveAsTable(f"{catalog}.{schema}.{df_name}")

        else:
            print(f"ERRROR - failed to write: this table {df_name} already exists in schema")


In [0]:
import meta_driven_transformations
importlib.reload(meta_driven_transformations)

TRANSFORMATION = meta_driven_transformations.TRANSFORMATION
# spark.conf.set("spark.sql.storeAssignmentPolicy", "ANSI")

def clean_data_tables(TRANSFORMATION:dict[str, Any]):
    print("cleaning_data")
    incoming_schema = "bike_data_lakehouse.bronze."
    table_objects = spark.sql("SHOW TABLES IN bike_data_lakehouse.bronze")
    table_list = [tables.tableName for tables in table_objects.collect()]
    clean_data = {}

    for table in table_list:
        print(table)
        df = spark.read.table(f"{incoming_schema}{table}")
        silver_table_key = f"silver_{table}"
        df = default_transformations(df, silver_table_key, TRANSFORMATION) 
        df = rename_columns(df, silver_table_key, TRANSFORMATION)
        df = handle_nulls_and_empty_strings(df, silver_table_key, TRANSFORMATION)
        df = handle_hyphon(df, silver_table_key, TRANSFORMATION)
        df = casting(df, silver_table_key, TRANSFORMATION)
        clean_data[silver_table_key] = df
    
    return clean_data
        
clean_data = clean_data_tables(TRANSFORMATION)

def preprocess_data_tables(TRANSFORMATION:dict[str, Any], clean_data) -> dict[str, Any]:
    print("preprocessing data")
    preprocessed_tables = {}
    incoming_schema = "bike_data_lakehouse.bronze."
    
    for table_name, df in clean_data.items():
        print(table_name)
        df = apply_enum_aliases(df, table_name, TRANSFORMATION)
        df = data_augmentation(df, table_name, TRANSFORMATION)

        preprocessed_tables[table_name] = df
    return preprocessed_tables

preprocessed_tables = preprocess_data_tables(TRANSFORMATION, clean_data)

def silver_table_upload(TRANSFORMATION:dict[str, Any], preprocessed_tables) -> None:
    print("uploading")
    intelligent_tables = intelligent_key_preparation(preprocessed_tables, TRANSFORMATION)
    surrogate_tables = surrogate_key_addition(intelligent_tables, TRANSFORMATION)
    normalized_tables = second_nf_configuration(surrogate_tables, TRANSFORMATION) # also deduplication
    create_table(normalized_tables, TRANSFORMATION)

silver_table_upload(TRANSFORMATION, preprocessed_tables)
# 1. Column Standardization (Rename Columns)
# 2. Default Transformations (basic string cleanup)
# 3. Structural Cleaning (trim, whitespace normalization, hyphen handling)
# 4. Null & Empty Handling
# 5. Type Casting
# 6. Enum / Domain Mapping
# 7. Data Validation & Constraint Enforcement, no??? jsut save the data 
# 8. Business Key Identification
# 9. Intelligent Key Preparation
# 10. Surrogate Key Generation
# 11. Data Augmentation (derived columns)
# 12. 2NF Normalization
# 13. Final Quality Checks
# 14. Table Creation (Silver)

In [0]:
%sql
SELECT *
FROM bike_data_lakehouse.silver.silver_crm_product_info
LIMIT 10

clean_tables{table name: dataframe}  
surrogate_key_dict{new_column_name, table_name}  
column name [natural key]  
dfs = df with natural key
bunch of unions  
surrogate_tables {surrogate_key_name: unioned_data_Frame}

grab a table  
- what surrogate keys does it have
join the original one with the corresponding surrogate key table, possible > 2


In [0]:
# for table_name, data_frame in clean_tables.items():
#     for surrogate_key, column_name in meta_data[table_name]["surrogate_key"].items():
#         path = f"bike_data_lakehouse.surrogate_keys.surrogate_key_{surrogate_key}_table"
#         sk_table = spark.read.table(path) # could optimzie to only read the table once
#         result = data_frame.join(
#             F.broadcast(sk_table),
#             on=[column_name],
#             how="left"
#         )

#         # sk_table = spark.read.table("surrogate_keys.surrogate_key_{key}_table")
#         print(surrogate_keys)
#         # data_frame.join()


column_name = meta_data[table]["surrogate_key"][key]  
df1.join(df2, on = column_name, how = "outer").join(df3, on = column_name, how = "outer").select(column_name.alias(key))

we look at all the tables
if it has a surrogate then we join all the tables that have this surrogate key condition on the value of the surrogate key.items()
we then make a hash based on the id we chose

In [0]:
# rewrite the entire data frames... by using joins...
# use decorators (logging and timing)
# make hard writes to the table

In [0]:
%sql
SELECT *
FROM bike_data_lakehouse.silver.silver_crm_sales_info
WHERE LENGTH(dim_sales_customer_id) <> 5

In [0]:
%skip
spark.conf.set("spark.sql.storeAssignmentPolicy", "ANSI")

In [0]:
table_name = "a"

spark.sql(f"""
          CREATE TABLE IF NOT EXISTS blah (
              blah type
          )
""")

['so61548', 'so61560', 'so61561', 'so61562', 'so61636', 'so67551', 'so68588', 'so68745', 'so68745', 'so68889']  
['ca_1098', 'hl_u509', 'cl_9009', 'fe_6654', 'bc_m005', 'bc_m005', 'pk_7098', 'hl_u509', 'gl_h102_l', 'tt_r982']

In [0]:
%sql
SELECT *
FROM bike_data_lakehouse.silver.silver_crm_sales_info

select 
  "pk".alias("int key")
    separated keys
  other columns

In [0]:
%sql
SELECT *
FROM bike_data_lakehouse.bronze.crm_sales_info
WHERE sls_prd_key LIKE "%RA-H123%"

In [0]:
%sql
SELECT *
FROM bike_data_lakehouse.bronze.erp_product_category_g1_v2

In [0]:
# separate intelligent keys into whatever