### Add ingestion date column to each data frame using current timestamp

In [0]:
from pyspark.sql.functions import current_timestamp

def add_ingestion_date(input_df):
    output_df = input_df.withColumn("ingestion_date", current_timestamp())
    return output_df

## Make a list of columns

In [0]:
def df_column_to_list(input_df, column_name):
    df_row_list = input_df.select(column_name).distinct().collect()

    column_value_list = [row[column_name] for row in df_row_list]
    return column_value_list

### Create a Delta Lake table or insert/update records to an existing Delta Lake table

In [0]:
def merge_delta_data(input_df, db_name, table_name, folder_path, merge_condition, partition_column):
    # Set dynamic partition pruning to true to avoid full table scan by using race_id to find the partition
    spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning", "true")

    # If the table exists, update the records from the source df that already exist in the target table, or insert the new records that are not matched. If the table does not exist, create the table and populate the data.
    from delta.tables import DeltaTable
    if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
        deltaTable = DeltaTable.forPath(spark, f"{folder_path}/{table_name}")
        deltaTable.alias("tgt").merge(
            input_df.alias("src"),
            merge_condition) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()
    else:
        input_df.write.mode("overwrite").partitionBy(partition_column).format("delta").saveAsTable(f"{db_name}.{table_name}")