#### Importing required libraries

In [0]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import to_date, current_timestamp
from datetime import date
import shutil
import os
from typing import List, Dict


In [0]:
%sql
DROP TABLE IF EXISTS inshort_cata.gold.content;
DROP TABLE IF EXISTS inshort_cata.gold.content_stg;
DROP TABLE IF EXISTS inshort_cata.gold.events;
DROP TABLE IF EXISTS inshort_cata.gold.events_stg;
DROP TABLE IF EXISTS inshort_cata.gold.users;
DROP TABLE IF EXISTS inshort_cata.gold.users_stg;

#### Delta lake tables creation based on parquet file uploaded to respective volumes in silver layer:

**Content:** /Volumes/inshort_cata/silver/content_data

**Events:** /Volumes/inshort_cata/silver/events_data

**User:** /Volumes/inshort_cata/silver/user_data

In [0]:
# Time of execution for this code: ~ 3 mins
def create_managed_table_from_volume(
    volume_path: str,
    table_name: str,
    catalog: str = "inshort_cata",
    schema: str = "silver",
    zorder_cols: list = None,
    optimize_after: bool = True
) -> bool:
    """
    Create optimized managed Delta table from volume path.
    
    Args:
        volume_path: Path to volume containing parquet files
        table_name: Target table name
        catalog: Unity Catalog name
        schema: Schema name
        zorder_cols: Columns for Z-ORDER optimization
        optimize_after: Enable OPTIMIZE and Z-ORDER
        
    Returns:
        bool: True if successful
    """
    full_table_name = f"{catalog}.{schema}.{table_name}"
    volume_path = volume_path.rstrip('/')
    
    try:
        # Validate volume contents
        files = dbutils.fs.ls(volume_path)
        parquet_files = [f for f in files if f.name.lower().endswith('.parquet')]
        
        if not parquet_files:
            return False
        
        # Drop existing table
        spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
        
        # Create managed Delta table
        df = spark.read.parquet(volume_path)
        df.write.format("delta").mode("overwrite").saveAsTable(full_table_name)
        
        # Verify creation
        row_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {full_table_name}").collect()[0]['cnt']
        
        # Apply optimizations
        if optimize_after:
            spark.sql(f"OPTIMIZE {full_table_name}")
            
            zorder_columns = zorder_cols or ["deviceid"] if "deviceid" in [c.lower() for c in df.columns] else ["_id"]
            if zorder_columns:
                zorder_clause = "ZORDER BY (" + ", ".join(zorder_columns) + ")"
                spark.sql(f"OPTIMIZE {full_table_name} {zorder_clause}")
            
            # Set Delta table properties
            spark.sql(f"""
                ALTER TABLE {full_table_name} 
                SET TBLPROPERTIES (
                    'delta.autoOptimize.optimizeWrite' = 'true',
                    'delta.autoOptimize.autoCompact' = 'true',
                    'delta.tuneFileSizesForRewrites' = 'true'
                )
            """)
        
        return True
        
    except Exception:
        return False


def create_all_tables() -> dict:
    """
    Create all production tables with optimal configuration.
    
    Returns:
        dict: Status of each table creation
    """
    table_configs = [
        {
            "path": "/Volumes/inshort_cata/silver/user_data",
            "name": "users",
            "zorder": ["deviceid", "district", "install_date"]
        },
        {
            "path": "/Volumes/inshort_cata/silver/events_data", 
            "name": "events",
            "zorder": ["deviceid", "content_id"]
        },
        {
            "path": "/Volumes/inshort_cata/silver/content_data",
            "name": "content", 
            "zorder": ["_id", "author"]
        }
    ]
    
    results = {}
    for config in table_configs:
        success = create_managed_table_from_volume(
            config["path"], 
            config["name"], 
            zorder_cols=config["zorder"]
        )
        results[config["name"]] = success
    
    return results


# Production usage
if __name__ == "__main__":
    # Create individual tables
    create_managed_table_from_volume(
        "/Volumes/inshort_cata/silver/user_data", 
        "users"
    )
    
    create_managed_table_from_volume(
        "/Volumes/inshort_cata/silver/events_data", 
        "events"
    )
    
    create_managed_table_from_volume(
        "/Volumes/inshort_cata/silver/content_data", 
        "content"
    )
    
    # Bulk creation with status tracking
    creation_status = create_all_tables()

Cross-verification of silver layer tables just loaded from files uploaded to different table specific Volumes

In [0]:
%sql
-- One query to be run at a time for cross-checking proper data loading , Everything is looking normal till now

select count(1) from inshort_cata.silver.content;
--select count(1) from inshort_cata.silver.events;
--select count(1) from inshort_cata.silver.users;
    

count(1)
113520


#### Add create statements here for gold layer: Reading streams of silver data and simultaneuosly put it in gold

##### Using Silver layer table for schema replication and adding a couple of more columns for SCD-2 tracking like effective_from, effective_to and is_active

Creating empty final gold tables which will have SCD - 2 data for maintaining history which is to be used in DWH reports in the longer run:

In [0]:
def ensure_table_with_scd2(
    source_table: str,
    target_table: str,
    scd2_cols: Dict[str, str] = None,
    tbl_properties: Dict[str, str] = None,
    zorder_cols: List[str] = None
):
    scd2_cols = scd2_cols or {
        "effective_from": "DATE",
        "effective_to": "DATE",
        "is_active": "BOOLEAN"
    }
    tbl_properties = tbl_properties or {
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
    zorder_cols = zorder_cols or list(scd2_cols.keys())

    if not spark.catalog.tableExists(target_table):
        empty_df = spark.table(source_table).limit(0)
        empty_df.write.format("delta").mode("overwrite").saveAsTable(target_table)

    existing_cols = [field.name for field in spark.table(target_table).schema.fields]
    cols_to_add = [
        f"{col} {dtype}" for col, dtype in scd2_cols.items() if col not in existing_cols
    ]
    if cols_to_add:
        spark.sql(f"ALTER TABLE {target_table} ADD COLUMNS ({', '.join(cols_to_add)})")

    props_str = ", ".join([f"'{k}' = '{v}'" for k, v in tbl_properties.items()])
    spark.sql(f"ALTER TABLE {target_table} SET TBLPROPERTIES ({props_str})")

    zorder_clause = f"ZORDER BY ({', '.join(zorder_cols)})"
    spark.sql(f"OPTIMIZE {target_table} {zorder_clause}")

# Tables details
tables_config = [
    {
        "source": "inshort_cata.silver.users",
        "target": "inshort_cata.gold.users",
        "zorder": ["deviceid", "district", "install_dt"]
    },
    {
        "source": "inshort_cata.silver.events",
        "target": "inshort_cata.gold.events",
        "zorder": ["deviceid", "content_id"]
    },
    {
        "source": "inshort_cata.silver.content",
        "target": "inshort_cata.gold.content",
        "zorder": ["_id", "author"]
    }
]

for config in tables_config:
    ensure_table_with_scd2(
        source_table=config["source"],
        target_table=config["target"],
        zorder_cols=config["zorder"]
    )

In [0]:
%sql
--select count(1) from inshort_cata.gold.content;
--select count(1) from inshort_cata.gold.events;
--select count(1) from inshort_cata.gold.users;

-- Entire data count is and shoould be 0

count(1)
0


##### Populating gold layer's staging tables with suffix "_stg" which can be used for evaluation or any other explicit transformation and then it can be MERGED to final gold layer table containing the cleanest form of data for advanced analytics and reporting

In [0]:
import shutil
import os
from pyspark.sql.functions import current_date, lit
from datetime import date

def populate_scd_staging_tables(
    tables: list,
    silver_prefix: str,
    gold_prefix: str,
    checkpoint_base: str
):
    for table in tables:
        source_table = f"{silver_prefix}.{table}"
        target_table = f"{gold_prefix}.{table}_stg"
        checkpoint_path = f"{checkpoint_base}/{table}_stg"

        # Remove previous checkpoint to avoid state conflicts
        if os.path.exists(checkpoint_path):
            shutil.rmtree(checkpoint_path)

        # Stream data from silver, add SCD2 columns, and write to gold staging
        (
            spark.readStream
            .table(source_table)
            .select(
                "*",
                current_date().alias("effective_from"),
                lit(date(9999, 12, 31)).alias("effective_to"),
                lit(True).alias("is_active")
            )
            .writeStream
            .format("delta")
            .option("checkpointLocation", checkpoint_path)
            .option("mergeSchema", "true")
            .trigger(once=True)
            .toTable(target_table)
        ).awaitTermination()

tables_list = ['users', 'events', 'content']
populate_scd_staging_tables(
    tables=tables_list,
    silver_prefix="inshort_cata.silver",
    gold_prefix="inshort_cata.gold",
    checkpoint_base="/Volumes/inshort_cata/gold/gold_data_checkpoint"
)


In [0]:
%sql
--select count(1) from inshort_cata.gold.content_stg;
--select count(1) from inshort_cata.gold.events_stg;
--select count(1) from inshort_cata.gold.users_stg;

-- Cross verification of staging table in gold layer: Everything looks fine

count(1)
42508


#### Implement SCD-2 for all 3 tables where source is "_stg" table of gold layer and then put the incremental data in gold based on MERGE: WIP

At this point we have data only in silver and gold layer's staging environment, not in final gold tables which will be used for advanced analyics and reporting whose source of truth will be from main datawarehouse.

In [0]:
def execute_scd2_merge_pipeline():
    """
    Execute complete SCD2 MERGE pipeline for all tables.
    
    Returns:
        dict: Success status per table
    """
    tables_list = ['users', 'events', 'content']
    results = {}
    
    for table in tables_list:
        try:
            staging_table = f"inshort_cata.gold.{table}_stg"
            target_table = f"inshort_cata.gold.{table}"
            
            # Verify staging table exists and has data
            row_count = spark.sql(f"SELECT COUNT(*) FROM {staging_table}").collect()[0][0]
            
            if row_count == 0:
                results[table] = False
                continue
            
            # Table-specific MERGE logic
            merge_sql = _generate_merge_sql(table, staging_table, target_table)
            spark.sql(merge_sql)
            
            # Post-merge optimization
            spark.sql(f"OPTIMIZE {target_table} ZORDER BY (effective_from, is_active)")
            
            # Verify post-merge active records
            active_count = spark.sql(f"""
                SELECT COUNT(*) 
                FROM {target_table} 
                WHERE is_active = TRUE
            """).collect()[0][0]
            
            results[table] = True
            
        except Exception:
            results[table] = False
    
    return results


def _generate_merge_sql(table: str, staging_table: str, target_table: str) -> str:
    """Generate table-specific SCD2 MERGE SQL."""
    
    table_configs = {
        'users': {
            'pk_cols': ['deviceid'],
            'business_cols': ['lang', 'district', 'platform', 'install_dt', 'campaign_id']
        },
        'content': {
            'pk_cols': ['_id'],
            'business_cols': ['createdAt', 'newsLanguage', 'categories', 'author']
        },
        'events': {
            'pk_cols': ['deviceid', 'content_id', 'eventtimestamp'],
            'business_cols': ['timespent', 'eventname']
        }
    }
    
    config = table_configs[table]
    pk_condition = ' AND '.join([f'tgt.{col} = src.{col}' for col in config['pk_cols']])
    change_condition = ' OR '.join([f'(tgt.{col} <> src.{col} OR (tgt.{col} IS NULL) <> (src.{col} IS NULL))' 
                                   for col in config['business_cols']])
    insert_cols = ', '.join(config['pk_cols'] + config['business_cols'] + ['effective_from', 'effective_to', 'is_active'])
    insert_values = ', '.join([f'src.{col}' for col in config['pk_cols'] + config['business_cols']] + 
                             ['current_date()', "DATE '9999-12-31'", 'TRUE'])
    
    return f"""
    MERGE INTO {target_table} AS tgt
    USING {staging_table} AS src
    ON tgt.is_active = TRUE AND {pk_condition}
    WHEN MATCHED AND ({change_condition}) THEN
        UPDATE SET
            tgt.effective_to = current_date(),
            tgt.is_active = FALSE
    WHEN NOT MATCHED THEN
        INSERT ({insert_cols})
        VALUES ({insert_values})
    """


# Production execution
if __name__ == "__main__":
    merge_results = execute_scd2_merge_pipeline()



In [0]:
print(merge_results)

{'users': True, 'events': True, 'content': True}


Checking for normal counts for 1st incremental SCD-2 load and duplicate checks as well for each of the final gold layer tables:

In [0]:
%sql
select deviceid, count(deviceid) from inshort_cata.gold.users group by deviceid
having count(deviceid) > 1
order by 2 desc; -- No duplicates
-- select count(1) from inshort_cata.gold.users;

deviceid,count(deviceid)


In [0]:
%sql
-- select count(1) from inshort_cata.gold.content; -- 340560

select _id, count(_id) from inshort_cata.gold.content
group by _id
having count(_id) > 1
order by 2 desc; -- No duplicates


_id,count(_id)


In [0]:
%sql
select count(1) from inshort_cata.gold.events;-- 82440945

count(1)
82440945


#### Data loading along with SCD-2 implemented successfully!!