# Playground

## Initialize globals

In [1]:
import sys
import os
import io
import shutil
import time

from uuid import uuid4
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

sys.path.append("../src")

from fabricengineer.transform.mlv.mlv import MaterializedLakeView
from fabricengineer.transform.silver import (
    SilverIngestionInsertOnlyService,
    SilverIngestionSCD2Service
)
from fabricengineer.transform.silver.utils import (
    LakehouseTable,
    get_mock_table_path
)
from fabricengineer.logging import TimeLogger, logger

mlv: MaterializedLakeView
timer: TimeLogger

In [2]:
class NotebookUtilsFSMock:
    def _get_path(self, file: str) -> str:
        return os.path.join(os.getcwd(), file)

    def exists(self, path: str) -> bool:
        return os.path.exists(self._get_path(path))

    def put(
        self,
        file: str,
        content: str,
        overwrite: bool = False
    ) -> None:
        path = self._get_path(file)
        os.makedirs(os.path.dirname(path), exist_ok=True)

        if os.path.exists(path) and not overwrite:
            raise FileExistsError(f"File {path} already exists and overwrite is set to False.")
        with open(path, 'w') as f:
            f.write(content)


class NotebookUtilsMock:
    def __init__(self):
        self.fs = NotebookUtilsFSMock()

builder = SparkSession.builder \
    .appName("TestSession") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

global spark
spark: SparkSession = configure_spark_with_delta_pip(builder).getOrCreate()

global notebookutils
notebookutils = NotebookUtilsMock()

25/08/06 15:03:38 WARN Utils: Your hostname, MacBook-Air-von-Enrico.local resolves to a loopback address: 127.0.0.1; using 192.168.0.7 instead (on interface en0)
25/08/06 15:03:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/enricogoerlitz/.ivy2/cache
The jars for the packages stored in: /Users/enricogoerlitz/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-52c667fe-2ef5-4f09-9930-519a2942dfac;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central


:: loading settings :: url = jar:file:/Users/enricogoerlitz/opt/miniconda3/envs/py312/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 89ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-52c667fe-2ef5-4f09-9930-519a2942dfac
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/3ms)
25/08/06 15:03:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-ja

In [3]:
import io
import logging
from typing import Any, Callable
from contextlib import contextmanager

@contextmanager
def capture_logs(logger: logging.Logger):
    log_stream = io.StringIO()
    handler = logging.StreamHandler(log_stream)
    handler.setLevel(logging.DEBUG)  # Fang alles ab
    formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] %(filename)s %(message)s", "%d.%m.%Y %H:%M:%S,%f")
    handler.setFormatter(formatter)

    logger.addHandler(handler)
    try:
        yield log_stream
    finally:
        logger.removeHandler(handler)

def sniff_logs(logger: logging.Logger, fn: Callable[[], Any]) -> tuple[Any, list[str]]:
    with capture_logs(logger) as log_stream:
        result = fn()
    logs = log_stream.getvalue().splitlines()
    return result, logs


In [4]:
import os
import shutil


def cleanup_fs():
    path_Files = notebookutils.fs._get_path("Files")
    path_tmp = notebookutils.fs._get_path("tmp")
    path_tmp_2 = "../tmp"
    path_tmp_3 = "../Files"

    rm_paths = [path_Files, path_tmp, path_tmp_2, path_tmp_3]
    for path in rm_paths:
        if os.path.exists(path):
            shutil.rmtree(path)

cleanup_fs()

## SilverIngestionSCD2Service

In [5]:
src_table = LakehouseTable(
    lakehouse="BronzeLakehouse",
    schema="schema",
    table="tablescd2"
)
dest_table = LakehouseTable(
    lakehouse="SilverLakehouse",
    schema=src_table.schema,
    table=src_table.table
)

etl = SilverIngestionSCD2Service()
etl.init(
    spark_=spark,
    source_table=src_table,
    destination_table=dest_table,
    nk_columns=["id"],
    constant_columns=[],
    is_delta_load=False,
    delta_load_use_broadcast=True,
    transformations={},
    exclude_comparing_columns=None,
    include_comparing_columns=None,
    historize=True,
    partition_by_columns=None,
    is_testing_mock=True
)

str(etl)

"{'historize': True, 'is_delta_load': False, 'delta_load_use_broadcast': True, 'src_table_path': 'BronzeLakehouse.schema.tablescd2', 'dist_table_path': 'SilverLakehouse.schema.tablescd2', 'nk_columns': ['id'], 'include_comparing_columns': [], 'exclude_comparing_columns': {'ROW_UPDATE_DTS', 'ROW_IS_CURRENT', 'ROW_LOAD_DTS', 'NK', 'id', 'ROW_DELETE_DTS', 'PK'}, 'transformations': {}, 'constant_columns': [], 'partition_by': [], 'pk_column': 'PK', 'nk_column': 'NK', 'nk_column_concate_str': '_', 'row_load_dts_column': 'ROW_LOAD_DTS', 'row_update_dts_column': 'ROW_UPDATE_DTS', 'row_delete_dts_column': 'ROW_DELETE_DTS', 'dw_columns': ['PK', 'NK', 'ROW_IS_CURRENT', 'ROW_UPDATE_DTS', 'ROW_DELETE_DTS', 'ROW_LOAD_DTS']}"

In [None]:
from pyspark.sql import functions as F, types as T

schema = T.StructType([
    T.StructField("id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), False),
    T.StructField("department_id", T.IntegerType(), False),
    T.StructField("created_at", T.StringType(), False),
    T.StructField("updated_at", T.StringType(), False),
])

data = [
    (1, "Alice", 1, "2023-01-01", "2023-01-01"),
    (2, "Bob", 2, "2023-01-01", "2023-01-01"),
    (3, "Charlie", 3, "2023-01-01", "2023-01-01"),
    (4, "David", 1, "2023-01-01", "2023-01-01"),
    (5, "Eve", 2, "2023-01-01", "2023-01-01"),
    (6, "Frank", 3, "2023-01-01", "2023-01-01"),
    (7, "Grace", 1, "2023-01-01", "2023-01-01"),
    (8, "Heidi", 2, "2023-01-01", "2023-01-01"),
    (9, "Ivan", 3, "2023-01-01", "2023-01-01"),
    (10, "Judy", 1, "2023-01-01", "2023-01-01")
    # ,(11, "Judy-2", 1, "2023-01-01", "2023-01-01")
    # ,(12, "Judy-3", 1, "2023-01-01", "2023-01-01")
    # ,(13, "Judy-4", 1, "2023-01-01", "2023-01-01")
]

df_bronze = spark.createDataFrame(data, schema)
df_bronze = df_bronze \
    .withColumn("created_at", F.to_timestamp("created_at")) \
    .withColumn("updated_at",F.to_timestamp("updated_at"))

df_bronze.show(truncate=False)
bronze_path = get_mock_table_path(etl._src_table)
df_bronze.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(bronze_path)

+---+-------+-------------+-------------------+-------------------+
|id |name   |department_id|created_at         |updated_at         |
+---+-------+-------------+-------------------+-------------------+
|1  |Alice  |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|2  |Bob    |2            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|3  |Charlie|3            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|4  |David  |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|5  |Eve    |2            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|6  |Frank  |3            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|7  |Grace  |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|8  |Heidi  |2            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|9  |Ivan   |3            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|10 |Judy   |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|11 |Judy-2 |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|12 |Judy-3 |1            |2023-01-01 00:00:00|2

25/08/06 15:03:42 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers


In [None]:
etl.ingest()
df_silver = etl.read_silver_df()
df_silver.orderBy(F.col("id").asc(), F.col("ROW_LOAD_DTS").asc()).show(truncate=False)