In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DateType
import time

# Define tables, schemas, and CSV paths
tables = {
    "erp_cust_az12": {
        "path": "/Volumes/sales-crm-erp/bronze/raw_csv_files/source_erp/CUST_AZ12.csv",
        "schema": StructType([
            StructField("CID", StringType(), True),
            StructField("BDATE", DateType(), True),
            StructField("GEN", StringType(), True)
        ])
    },
    "crm_sales_details": {
        "path": "/Volumes/sales-crm-erp/bronze/raw_csv_files/source_crm/sales_details.csv",
        "schema": StructType([
            StructField("sls_ord_num", StringType(), True),
            StructField("sls_prd_key", StringType(), True),
            StructField("sls_cust_id", LongType(), True),
            StructField("sls_order_dt", LongType(), True),
            StructField("sls_ship_dt", LongType(), True),
            StructField("sls_due_dt", LongType(), True),
            StructField("sls_sales", LongType(), True),
            StructField("sls_quantity", LongType(), True),
            StructField("sls_price", LongType(), True)
        ])
    },
    "erp_px_cat_g1v2": {
        "path": "/Volumes/sales-crm-erp/bronze/raw_csv_files/source_erp/PX_CAT_G1V2.csv",
        "schema": StructType([
            StructField("ID", StringType(), True),
            StructField("CAT", StringType(), True),
            StructField("SUBCAT", StringType(), True),
            StructField("MAINTENANCE", StringType(), True)
        ])
    },
    "crm_prd_info": {
        "path": "/Volumes/sales-crm-erp/bronze/raw_csv_files/source_crm/prd_info.csv",
        "schema": StructType([
            StructField("prd_id", LongType(), True),
            StructField("prd_key", StringType(), True),
            StructField("prd_nm", StringType(), True),
            StructField("prd_cost", LongType(), True),
            StructField("prd_line", StringType(), True),
            StructField("prd_start_dt", DateType(), True),
            StructField("prd_end_dt", DateType(), True)
        ])
    },
    "crm_cust_info": {
        "path": "/Volumes/sales-crm-erp/bronze/raw_csv_files/source_crm/cust_info.csv",
        "schema": StructType([
            StructField("cst_id", LongType(), True),
            StructField("cst_key", StringType(), True),
            StructField("cst_firstname", StringType(), True),
            StructField("cst_lastname", StringType(), True),
            StructField("cst_marital_status", StringType(), True),
            StructField("cst_gndr", StringType(), True),
            StructField("cst_create_date", DateType(), True)
        ])
    },
    "erp_loc_a101": {
        "path": "/Volumes/sales-crm-erp/bronze/raw_csv_files/source_erp/LOC_A101.csv",
        "schema": StructType([
            StructField("CID", StringType(), True),
            StructField("CNTRY", StringType(), True)
        ])
    }
}

try:
    print("="*50)
    print("Loading Bronze Layer")
    print("="*50)

    batch_start = time.time()

    for table_name, info in tables.items():
        path = info["path"]
        schema = info["schema"]
        
        print(f">> Truncating Table: bronze.{table_name}")
        spark.sql(f"TRUNCATE TABLE `sales-crm-erp`.bronze.{table_name}")
        
        print(f">> Inserting Data Into: bronze.{table_name}")
        start_time = time.time()
        
        df = spark.read.option("header", True).schema(schema).csv(path)
        df.write.format("delta").mode("overwrite").saveAsTable(f"`sales-crm-erp`.bronze.{table_name}")
        
        end_time = time.time()
        print(f">> Load Duration: {end_time - start_time:.2f} seconds")
        print(">> -------------")

    batch_end = time.time()
    print("="*50)
    print("Loading Bronze Layer is Completed")
    print(f"   - Total Load Duration: {batch_end - batch_start:.2f} seconds")
    print("="*50)

except Exception as e:
    print("="*50)
    print("ERROR OCCURRED DURING LOADING BRONZE LAYER")
    print(f"Error Message: {str(e)}")
    print("="*50)


Loading Bronze Layer
>> Truncating Table: bronze.erp_cust_az12
>> Inserting Data Into: bronze.erp_cust_az12
>> Load Duration: 2.12 seconds
>> -------------
>> Truncating Table: bronze.crm_sales_details
>> Inserting Data Into: bronze.crm_sales_details
>> Load Duration: 2.51 seconds
>> -------------
>> Truncating Table: bronze.erp_px_cat_g1v2
>> Inserting Data Into: bronze.erp_px_cat_g1v2
>> Load Duration: 2.09 seconds
>> -------------
>> Truncating Table: bronze.crm_prd_info
>> Inserting Data Into: bronze.crm_prd_info
>> Load Duration: 2.04 seconds
>> -------------
>> Truncating Table: bronze.crm_cust_info
>> Inserting Data Into: bronze.crm_cust_info
>> Load Duration: 2.13 seconds
>> -------------
>> Truncating Table: bronze.erp_loc_a101
>> Inserting Data Into: bronze.erp_loc_a101
>> Load Duration: 2.15 seconds
>> -------------
Loading Bronze Layer is Completed
   - Total Load Duration: 15.62 seconds
