Ingestion bronze layer script

In [0]:
from pyspark.sql.functions import current_timestamp, lit
from pyspark.sql.utils import AnalysisException
#list of tables to ingest
table_to_ingest = [
    {
        "source":"crm",
        "path":"/Volumes/workspace/bronze/source_systems/source_crm/cust_info.csv",
        "table":"crm_cust_info"
    },
    {
        "source":"crm",
        "path":"/Volumes/workspace/bronze/source_systems/source_crm/prd_info.csv",
        "table":"crm_prd_info"
    },
    {
        "source":"crm",
        "path":"/Volumes/workspace/bronze/source_systems/source_crm/sales_details.csv",
        "table":"crm_sales_details"
    },
    {
        "source":"erp",
        "path":"/Volumes/workspace/bronze/source_systems/source_erp/CUST_AZ12.csv",
        "table":"erp_cust_az12"
    },
    {
        "source":"erp",
        "path":"/Volumes/workspace/bronze/source_systems/source_erp/LOC_A101.csv",
        "table":"erp_loc_a101"
    },
    {
        "source":"erp",
        "path":"/Volumes/workspace/bronze/source_systems/source_erp/PX_CAT_G1V2.csv",
        "table":"erp_px_cat_g1v2"
    }
]

for item in table_to_ingest:
    print(f"Ingesting {item['source']} from workspace.bronze.{item["table"]}")
    
    try:
        #Read the data from the source
        df_raw = (spark.read.option("header","true")
                            .option("inferschema","true")
                            .csv(item["path"])
                )
        #add a new column to get the ingestion time
        df_bronze = df_raw.withColumn("bronze_ingestion_timestamp", current_timestamp())
        #write the data to the bronze table
        (
            df_bronze.write
              .mode("overwrite") 
              .format("delta")
              .option("mergeSchema", "true") # Allows new columns to be added automatically in the future
              .saveAsTable(f"workspace.bronze.{item['table']}")
        )

        print(f"Successfull ingestion of : workspace.bronze.{item['table']}")

    except AnalysisException as e:
        print(f"ERROR: Could not find or read {item['path']}. Details: {e}")
    except Exception as e:
        print(f"ERROR: Unexpected issue with {item['table']}: {e}")



