In [1]:
from functools import partial
from pyspark.sql.dataframe import DataFrame
from delta.tables import *
from pyspark.sql.functions import *

StatementMeta(, 4d9ddd9d-eae3-4399-a9ab-2e3f1fc664dd, 3, Finished, Available)

In [2]:
def write_delta_v2(type: str, df: DataFrame, mode: str, targetTable: str, dbname: str, sourceTable: str = None, keyCol: str = None, delta_path: str = None, partition: str = None, partitionRange: list = None, upsert_partition_value: str = None) -> str:
  """
    Write a DataFrame to a Delta table in Azure Data Lake Gen2.

    :param type: Type of operation ('data_copy' or 'transformation').
    :param df: DataFrame to write to the Delta table.
    :param mode: Write mode ('append', 'overwrite', 'upsert').
    :param targetTable: Name of the target Delta table.
    :param dbname: Database name.
    :param sourceTable: Name of the source table for upsert mode.
    :param keyCol: Column(s) to use as the key for upsert mode.
    :param delta_path: Path to the Delta table in Data Lake Gen2.
    :param partition: Column to partition the Delta table on.
    :param partitionRange: List of partition values to write in overwrite mode.
    :param upsert_partition_value: Partition value to use for upsert mode.
    :return: Delta table path.
  """
  # If delta_path is not provided, set default paths based on 'type'.
  if delta_path == None:
    if type == 'data_copy':
        path = 'abfss://AWS_AZURE@onelake.dfs.fabric.microsoft.com/AWS_LAKEHOUSE.Lakehouse'
        delta_path = path+'/Tables/'+targetTable
    if type == 'transformation':
        path = 'abfss://AWS_AZURE@onelake.dfs.fabric.microsoft.com/AWS_LAKEHOUSE.Lakehouse'
        # delta_path = path+'/TABLES/'+targetTable

  # Modify the targetTable to include the database name.
  targetTable = 'AWS_LAKEHOUSE'+'.'+targetTable

  # Check if the specified 'mode' is valid.
  assert mode in ['append', 'overwrite', 'upsert'], "Mode should be either 'append' or 'overwrite', 'upsert'"

  # Check if the target table already exists.
  tableTest = spark.catalog._jcatalog.tableExists(targetTable)
  if not tableTest:
    # If the table does not exist, create it.
    print("write table for the first time: ")
    if partition is None:
      # If no partition is specified, write the DataFrame to the Delta table.
      df.write.format("delta").mode(mode).save(delta_path)
    else:
      # If a partition is specified, write the DataFrame with partitioning.
      df.write.format("delta").mode(mode).option("mergeSchema", "true").partitionBy(partition).save(delta_path)
    # Print table name and path for reference.
    print(targetTable, delta_path)
    
  else:
    # If the target table already exists.
    print("table already exists!")
    if mode == 'upsert':
      # If the mode is 'upsert', perform upsert operation.
      print("upsert mode")
      df.createOrReplaceTempView(sourceTable)
      res = keyCol.strip('][').split(',')
      keyList = [f"tar.{c} = src.{c}" for c in res]
      keyString = " and ".join(keyList)
      columns = df.schema.names
      columns = list(map(lambda x: x.lower(), columns))
      updateCols = columns[:]
      for c in res:
        updateCols.remove(c.lower())
      updateList = [f"tar.{c} = src.{c}" for c in updateCols]
      updateString = "UPDATE SET " + ", ".join(updateList)
      insertString = f"""INSERT ({", ".join(columns)}) VALUES ({", ".join(columns)})"""
      upsertSql = f"""MERGE INTO {targetTable} AS tar \
          USING {sourceTable} AS src \
          ON {keyString} \
          WHEN MATCHED THEN \
            {updateString} \
          WHEN NOT MATCHED THEN \
            {insertString} \
          """
      if partition is not None and upsert_partition_value is not None:
        upsertSql = f"""MERGE INTO {targetTable} AS tar
          USING {sourceTable} AS src
          ON tar.{keyCol} = src.{keyCol} AND tar.{partition} = '{upsert_partition_value}'
          WHEN MATCHED THEN
            {updateString}
          WHEN NOT MATCHED THEN
            {insertString}
          """
      spark.sql(upsertSql)  
    elif mode == 'overwrite':
      # If the mode is 'overwrite', perform overwrite operation.
      print("overwrite mode")
      if partitionRange: # Test if partitionRange list is not empty.
        partitionRange = [str(x) for x in partitionRange]
        writeString = ", ".join(partitionRange)
        writeCon = f"""{partition} in ({writeString})"""
        if partition is None:
          df.filter(writeCon).write.format("delta").option("mergeSchema", "true").option("overwriteSchema", "true").mode(mode).option("replaceWhere", writeCon).save(delta_path)
        else:
          df.filter(writeCon).write.format("delta").option("mergeSchema", "true").option("overwriteSchema", "true").partitionBy(partition).mode(mode).option("replaceWhere", writeCon).save(delta_path)
      else:
        if partition is None:
          df.write.format("delta").option("mergeSchema", "true").option("overwriteSchema", "true").mode(mode).save(delta_path)
        else:
          df.write.format("delta").option("mergeSchema", "true").option("overwriteSchema", "true").partitionBy(partition).mode(mode).save(delta_path)
    else:
      # If the mode is 'append', perform append operation.
      print("append mode")
      if partition is None:
        df.write.format("delta").mode(mode).save(delta_path)
      else:
        df.write.format("delta").partitionBy(partition).mode(mode).save(delta_path)
  return delta_path


StatementMeta(, 4d9ddd9d-eae3-4399-a9ab-2e3f1fc664dd, 4, Finished, Available)