In [None]:
def load_csv_to_spark(url: str):
    """Load CSV data from a URL into a Spark DataFrame."""
    pdf = pd.read_csv(url)
    return spark.createDataFrame(pdf)

def table_exists(path: str) -> bool:
    """Check if a Delta table exists at the given path."""
    return DeltaTable.isDeltaTable(spark, path)

def create_delta_table(df, path: str, table_name: str):
    """Create a new Delta table from a Spark DataFrame."""
    df.write.format("delta").mode("overwrite").save(path)
    spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{path}'")
    print(f"Table '{table_name}' created at '{path}'.")

def merge_delta_table(df, path: str, merge_keys: list):
    """Merge new data into an existing Delta table based on key columns."""
    delta_table = DeltaTable.forPath(spark, path)

    merge_condition = " AND ".join([f"tgt.{k} = src.{k}" for k in merge_keys])

    (delta_table.alias("tgt")
     .merge(
         source=df.alias("src"),
         condition=merge_condition
     )
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute())

    print(f"Data merged into existing Delta table at '{path}'.")