In [0]:
from pyspark.sql.functions import current_timestamp

def add_ingestion_date(input_df):
    outputdf = input_df.withColumn('ingestion_date', current_timestamp())
    return outputdf

In [0]:
def re_arrange_partition_column(partition_column, df):
    column_list = []
    for column_name in df.schema.names:
        if column_name != partition_column:
           column_list.append(column_name)
    column_list.append(partition_column)

    output_df = df.select(column_list) 
    return output_df

In [0]:
def overwrite_partition(df, database_name, table_name, partition_column):
    output_df = re_arrange_partition_column(partition_column, df)

    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

    if spark.catalog.tableExists(f"{database_name}.{table_name}"):
        output_df.write.mode("overwrite").insertInto(f"{database_name}.{table_name}")
    else:
        output_df.write.mode("overwrite").partitionBy(partition_column).format("parquet").saveAsTable(f"{database_name}.{table_name}")


In [0]:
def df_column_to_list(input_df, column_name):
    df_row_list = input_df.select(column_name).distinct().collect()
    common_value_list = [row[column_name] for row in df_row_list]
    return common_value_list


In [0]:
def merge_delta_data(input_df, database_name, table_name, folder_path, merge_condition, partition_column):
    spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")

    from delta.tables import DeltaTable
    if spark.catalog.tableExists(f"{database_name}.{table_name}"):
        deltaTable = DeltaTable.forPath(spark, f"{folder_path}/{table_name}")
        deltaTable.alias("tgt").merge(
            input_df.alias("src"),
            merge_condition) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

    else:
        input_df.write.mode("overwrite").partitionBy(partition_column).format("delta").saveAsTable(f"{database_name}.{table_name}")
