In [0]:
raw_container_path = "dbfs:/mnt/formula1dlsiddhi/raw"
processed_container_path = "dbfs:/mnt/formula1dlsiddhi/processed"
presentation_container_path = "dbfs:/mnt/formula1dlsiddhi/presentation"

In [0]:
from pyspark.sql.functions import current_timestamp
def ingest_current_timestamp(input_df):
    return input_df.withColumn("ingest_date", current_timestamp())

In [0]:
def rearrange_partition_column(input_df, partition_column):
    column_list = []
    for column_name in input_df.schema.names:
        if column_name != partition_column:
            column_list.append(column_name)
    column_list.append(partition_column)
    output_df = input_df.select(column_list)
    return output_df

In [0]:
def overwrite_partition(input_df, db_name, table_name, partition_column):
    output_df = rearrange_partition_column(input_df, partition_column)

    spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    if spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}"):
        output_df.write.mode("overwrite").insertInto(f"{db_name}.{table_name}")
    else:
        output_df.write.mode("overwrite").format("parquet").partitionBy(f"{partition_column}").saveAsTable(f"{db_name}.{table_name}")


In [0]:
from delta.tables import DeltaTable
def mergeTable(input_df, db_name, table_name, merge_condition, partition_column):
    if spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}"):
        deltaTable = DeltaTable.forName(spark, f"{db_name}.{table_name}")
        deltaTable.alias("target").merge(
            input_df.alias("source"),
            merge_condition) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
    else:
        input_df.write.mode("overwrite").format("delta").partitionBy(f"{partition_column}").saveAsTable(f"{db_name}.{table_name}")

In [0]:
dbutils.notebook.exit("success...")