#Reading from Source File & Write it to Bronze Layer

In [0]:
INGESTION_CONFIG = [ 
  # CRM 
  { 
    "source": "crm", 
    "path": "/Volumes/workspace/bronze/source_systems/source_crm/cust_info.csv", 
    "table": "crm_cust_info_raw"  
  }, 
  {
    "source" : "crm",
    "path": "/Volumes/workspace/bronze/source_systems/source_crm/sales_details.csv",
    "table": "crm_sales_details_raw"
  }, 
  { 
    "source" : "crm",
    "path": "/Volumes/workspace/bronze/source_systems/source_crm/prd_info.csv",
    "table": "crm_prd_info_raw"
  },

  # ERP
  {
    "source" : "ERP",
    "path": "/Volumes/workspace/bronze/source_systems/source_erp/CUST_AZ12.csv",
    "table": "erp_CUST_AZ12_raw"
  },
  {
    "source" : "ERP",
    "path": "/Volumes/workspace/bronze/source_systems/source_erp/LOC_A101.csv",
    "table": "erp_LOC_A101_raw"
  },
  {
    "source" : "ERP",
    "path": "/Volumes/workspace/bronze/source_systems/source_erp/PX_CAT_G1V2.csv",
    "table": "erp_PX_CAT_G1V2_raw"
  },
]

In [0]:
%python 

from pyspark.sql.utils import AnalysisException  

def TableName(table):
    """
    checks if table already exists
    """
    try:
        spark.table(table_name)
        return True
    except AnalysisException: 
        return False

def path_exists(path):
    """
    checks if the file exists on file path. 
    """
    try: 
        dbutils.fs.ls(path)
        return True
    except:
        return False


for item in INGESTION_CONFIG: 
    table_name = f"workspace.bronze.{item['table']}"
    
    # table exists check
    if TableName(table_name):
        print(f"{table_name} already loaded, loading skipped.\n")
        continue

    # file path check
    if not path_exists(item['path']):
        print(f"file does not exist. {item['path']} skipped")
        continue

    print(f"Reading file {item['source']} -> workspace.bronze.{item['table']}")

    df = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{item['path']}")
    )

    # write to the bronze layer
    df.write.mode("overwrite").format("delta").saveAsTable(f"workspace.bronze.{item['table']}")