In [0]:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import sha2, concat_ws
from typing import List, Optional

def upsert_with_hashstring(
    spark: SparkSession,
    df: DataFrame,
    target_path: str,
    unique_cols: List[str],
    partition_cols: Optional[List[str]] = None,
    hash_col_name: str = "hashstring",
    register_table: bool = True,
    dry_run: bool = False,
    verbose: bool = False,
):
    """
    Upserts a DataFrame to a Delta table using a hashstring as a deduplication key.
    Also supports schema printing, dry-run mode, partitioning, and table registration.

    Args:
        spark (SparkSession): Spark session
        df (DataFrame): Source DataFrame to upsert
        target_path (str): Path to Delta table
        unique_cols (List[str]): Columns to use to build hashstring
        partition_cols (List[str], optional): Columns to partition by
        hash_col_name (str): Name of the hash column (default 'hashstring')
        register_table (bool): Whether to register table in metastore
        dry_run (bool): If True, do not perform write, just log actions
        verbose (bool): If True, print detailed logging and schema
    """

    # Step 1: Add hash column for deduplication
    df_hashed = df.withColumn(hash_col_name, sha2(concat_ws("||", *unique_cols), 256))

    if verbose:
        print(f"🧠 Generated hashstring from columns: {unique_cols}")
        df_hashed.select(hash_col_name).show(truncate=False)
        df_hashed.printSchema()
        print(f"🗂️ Target Path: {target_path}")

    # Step 2: Perform dry-run preview
    if dry_run:
        print("🚫 Dry run mode — skipping write operation.")
        return

    # Step 3: If Delta table exists, perform merge (deduplication)
    if DeltaTable.isDeltaTable(spark, target_path):
        if verbose:
            print("🔁 Delta table exists — performing MERGE (upsert).")

        delta_table = DeltaTable.forPath(spark, target_path)

        delta_table.alias("target").merge(
            df_hashed.alias("source"),
            f"target.{hash_col_name} = source.{hash_col_name}"
        ).whenNotMatchedInsertAll().execute()

    else:
        if verbose:
            print("🆕 No Delta table found — writing new Delta table.")

        writer = df_hashed.write.format("delta").mode("overwrite")

        if partition_cols:
            writer = writer.partitionBy(partition_cols)

        writer.save(target_path)

    # Step 4: Register table in metastore (optional)
    if register_table:
        table_name = target_path.rstrip("/").split("/")[-1]
        spark.sql(f"DROP TABLE IF EXISTS {table_name}")
        spark.sql(f"""
            CREATE TABLE {table_name}
            USING DELTA
            LOCATION '{target_path}'
        """)
        if verbose:
            print(f"📚 Table registered: {table_name}")
