In [0]:
from pyspark.sql.functions import current_timestamp
def add_ingestion_date(input_df):
    output_df = input_df.withColumn("ingestion_date", current_timestamp())
    return output_df

In [0]:
def re_arrange_partition_column(input_df, partition_column):
    column_list = []
    for column_name in input_df.schema.names:
        if column_name != partition_column:
            column_list.append(column_name)
    column_list.append(partition_column)
    output_df = input_df.select(column_list)
    return output_df

In [0]:
def incremental_load(input_df, partition_column, db_name, table_name):
    output_df = re_arrange_partition_column(input_df, partition_column)
    spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
        output_df.write.mode("overwrite").insertInto(f"{db_name}.{table_name}")
    else:
        output_df.write.mode("append").partitionBy(partition_column).format("parquet").saveAsTable(f"{db_name}.{table_name}")


In [0]:
def df_col_to_list(input_df, col_name):
    col_list = input_df.select(col_name)\
        .distinct()\
            .collect()
    
    col_value_list = [row[col_name] for row in col_list]
    return col_value_list

In [0]:
def merge_delta_table(input_df, db_name, table_name, folder_path, merge_condition, partition_column):

    from delta.tables import DeltaTable
    spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning", "true")

    if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
        deltaTable = DeltaTable.forPath(spark, f"{folder_path}/{db_name}/{table_name}")
        deltaTable.alias("tgt").merge(
            input_df.alias("src"),
            merge_condition)\
            .whenMatchedUpdateAll()\
                .whenNotMatchedInsertAll()\
                .execute()  
    else:
        input_df.write.mode("append").partitionBy(partition_column).format("delta").saveAsTable(f"{db_name}.{table_name}")