#Read from CSV file

In [0]:
df = (spark.read.option("header", "true")
      .csv("/Volumes/dev_project/bronze/source_systems/source_crm/sales_details.csv"))
df.display() 

sls_ord_num,sls_prd_key,sls_cust_id,sls_order_dt,sls_ship_dt,sls_due_dt,sls_sales,sls_quantity,sls_price
SO43697,BK-R93R-62,21768,20101229,20110105,20110110,3578,1,3578.0
SO43698,BK-M82S-44,28389,20101229,20110105,20110110,3400,1,3400.0
SO43699,BK-M82S-44,25863,20101229,20110105,20110110,3400,1,3400.0
SO43700,BK-R50B-62,14501,20101229,20110105,20110110,699,1,699.0
SO43701,BK-M82S-44,11003,20101229,20110105,20110110,3400,1,3400.0
SO43702,BK-R93R-44,27645,20101230,20110106,20110111,3578,1,3578.0
SO43703,BK-R93R-62,16624,20101230,20110106,20110111,3578,1,3578.0
SO43704,BK-M82B-48,11005,20101230,20110106,20110111,3375,1,3375.0
SO43705,BK-M82S-38,11011,20101230,20110106,20110111,3400,1,3400.0
SO43706,BK-R93R-48,27621,20101231,20110107,20110112,3578,1,3578.0


#Write it to Bronze Layer

In [0]:
df.write.mode("overwrite").saveAsTable("dev_project.bronze.crm_cust_info")

In [0]:
%sql
select * from dev_project.bronze.crm_cust_info

sls_ord_num,sls_prd_key,sls_cust_id,sls_order_dt,sls_ship_dt,sls_due_dt,sls_sales,sls_quantity,sls_price
SO43697,BK-R93R-62,21768,20101229,20110105,20110110,3578,1,3578.0
SO43698,BK-M82S-44,28389,20101229,20110105,20110110,3400,1,3400.0
SO43699,BK-M82S-44,25863,20101229,20110105,20110110,3400,1,3400.0
SO43700,BK-R50B-62,14501,20101229,20110105,20110110,699,1,699.0
SO43701,BK-M82S-44,11003,20101229,20110105,20110110,3400,1,3400.0
SO43702,BK-R93R-44,27645,20101230,20110106,20110111,3578,1,3578.0
SO43703,BK-R93R-62,16624,20101230,20110106,20110111,3578,1,3578.0
SO43704,BK-M82B-48,11005,20101230,20110106,20110111,3375,1,3375.0
SO43705,BK-M82S-38,11011,20101230,20110106,20110111,3400,1,3400.0
SO43706,BK-R93R-48,27621,20101231,20110107,20110112,3578,1,3578.0


In [0]:
INGESTION_CONFIG = [
    {
        "source": "crm",
        "path": "/Volumes/dev_project/bronze/source_systems/source_crm/cust_info.csv",
        
        "table": "crm_cust_info"
    },
    {
        "source": "crm",
        "path": "/Volumes/dev_project/bronze/source_systems/source_crm/prd_info.csv",
        "table": "crm_prd_info"
    },
    {
        "source": "crm",
        "path": "/Volumes/dev_project/bronze/source_systems/source_crm/sales_details.csv",
        "table": "crm_sales_details"
    },
    {
        "source": "erp",
        "path": "/Volumes/dev_project/bronze/source_systems/source_erp/CUST_AZ12.csv",
        "table": "erp_cust_az12"
    },
    {
        "source": "erp",
        "path": "/Volumes/dev_project/bronze/source_systems/source_erp/LOC_A101.csv",
        "table": "erp_loc_a101"
    },
    {
        "source": "erp",
        "path": "/Volumes/dev_project/bronze/source_systems/source_erp/PX_CAT_G1V2.csv",
        "table": "erp_px_cat_g1v2"
    }
]


In [0]:
for item in INGESTION_CONFIG:
    print(f"Ingesting {item['source']} → dev_project.bronze.{item['table']}")

    df = (
        spark.read
             .option("header", "true")
             .option("inferSchema", "true")
             .csv(item["path"])
    )

    # Minimal fix: align schema for crm_cust_info only
    if item["table"] == "crm_cust_info":
        df = df.selectExpr(
            "cast(cst_id as string) as sls_cust_id",
            "cast(cst_key as string) as sls_prd_key",
            "cast(cst_firstname as string) as sls_ord_num",
            "cast(cst_lastname as string) as sls_order_dt",
            "cast(cst_marital_status as string) as sls_ship_dt",
            "cast(cst_gndr as string) as sls_due_dt",
            "cast(cst_create_date as string) as sls_sales"
        )

    (
        df.write
          .mode("overwrite")
          .format("delta")
          .saveAsTable(f"dev_project.bronze.{item['table']}")
    )


Ingesting crm → dev_project.bronze.crm_cust_info
Ingesting crm → dev_project.bronze.crm_prd_info
Ingesting crm → dev_project.bronze.crm_sales_details
Ingesting erp → dev_project.bronze.erp_cust_az12
Ingesting erp → dev_project.bronze.erp_loc_a101
Ingesting erp → dev_project.bronze.erp_px_cat_g1v2
