In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
# Function to add ingestion_date column
def add_ingestion_date(input_df):
    return input_df.withColumn("ingestion_date", current_timestamp()) 

In [0]:
def move_column_to_last(df, partition_col):
      
# df -> pass dataframe as dataframe. No need to pass as string
# partition_col -> pass partition column name as string
  
  current_columns = list(df.schema.names)
  if partition_col in current_columns:
        current_columns.remove(partition_col)
        current_columns.append(partition_col)
        return df.select(*current_columns)
  else:
        print(f"Warning: Column '{partition_col}' not found in the DataFrame. Returning original DataFrame.")
        return df
  

In [0]:
def write_to_database(df, db, table_name, partition_col):

# Enable dynamic overwrite 
    spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# df -> pass dataframe as dataframe. No need to pass as string
# db -> pass database name as string
# table_name -> pass table name as string
# partition_col -> pass partition column name as string

    if (spark._jsparkSession.catalog().tableExists(f"{db}.{table_name}")):
        df.write.mode("overwrite").insertInto(f"{db}.{table_name}")
    else:
        df.write.mode("overwrite").partitionBy(f"{partition_col}").format("parquet").saveAsTable(f"{db}.{table_name}")

In [0]:
def df_column_to_list(input_df, column_name):
    df_row_list = input_df.select(column_name)\
                          .distinct()\
                          .collect()
    column_value_list = [row[column_name] for row in df_row_list]
    return column_value_list

In [0]:
def merge_delta_data(input_df, db_name, table_name, folder_path, merge_condition, partition_column):

   spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning.enabled", "true")

   from delta.tables import DeltaTable
   if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
      deltaTablePeople = DeltaTable.forPath(spark, f"{folder_path}/{table_name}")
      deltaTablePeople.alias('tgt') \
            .merge(
               input_df.alias('src'),
               merge_condition
                  ) \
               .whenMatchedUpdateAll() \
               .whenNotMatchedInsertAll() \
               .execute()
   else:
      input_df.write.mode("overwrite").partitionBy(f'{partition_column}').format("delta").saveAsTable(f"{db_name}.{table_name}")