# Bronze Layer


### Create Catalog

In [0]:
create catalog if not exists capstone;

### -Use Catalog
### -Create Schema

In [0]:
use catalog capstone;

CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;

See if we have any created _Volumes_

In [0]:
SHOW VOLUMES IN capstone.bronze;

If we don't have any then we go ahead and create the needed _Volume_

In [0]:
CREATE VOLUME IF NOT EXISTS capstone.bronze.raw_files;

Test volume

In [0]:
%python
# test the volume path and read the file
test_path = "/Volumes/capstone/bronze/raw_files/diseases/ICD10codes.csv"
spark.read.option("header", True).csv(test_path).limit(5).display()

Test ingest first table

In [0]:
%python
#import needed functions
from pyspark.sql.functions import current_timestamp, lit, col

path = "/Volumes/capstone/bronze/raw_files/diseases/ICD10codes.csv"

df_raw = (spark.read
          .option("header", False)        # <-- shumë e rëndësishme
          .option("inferSchema", False)
          .csv(path))

# rename columns to safe names
new_cols = [f"col_{i+1}" for i in range(len(df_raw.columns))]
df = df_raw.toDF(*new_cols)

df = (df
      .withColumn("ingestion_timestamp", current_timestamp())
      .withColumn("source_system", lit("CDC/ICD"))
)
# write to delta
df.write.mode("overwrite").format("delta").saveAsTable("capstone.bronze.diseases_icd10_raw")

print(" Created table: capstone.bronze.diseases_icd10_raw")

Test to see the bronze table

In [0]:
USE CATALOG capstone;
SELECT COUNT(*) AS cnt FROM bronze.diseases_icd10_raw;
SELECT * FROM bronze.diseases_icd10_raw LIMIT 5;

Test ingest second table

In [0]:
%python
# import needed functions
from pyspark.sql.functions import current_timestamp, lit
# path
path = "/Volumes/capstone/bronze/raw_files/diseases/U.S._Chronic_Disease_Indicators_20260223.csv"
# read data
df = (spark.read
      .option("header", True)        
      .option("inferSchema", False)
      .csv(path)
      .withColumn("ingestion_timestamp", current_timestamp())
      .withColumn("source_system", lit("CDC/CDI"))
)
# write to delta
df.write.mode("overwrite").format("delta").saveAsTable("capstone.bronze.diseases_chronic_indicators_raw")
# display
print(" Created table: capstone.bronze.diseases_chronic_indicators_raw")

Test see if the table is created

In [0]:

SELECT COUNT(*) AS cnt FROM bronze.diseases_chronic_indicators_raw;
SELECT * FROM bronze.diseases_chronic_indicators_raw LIMIT 5;

### Ingest all tables in the bronze layers

In [0]:
%python
# Import functions për audit columns
from pyspark.sql.functions import current_timestamp, lit

# Function to ingest CSV files
def bronze_csv(path, table, source_system, header=True):
    df = (spark.read
          .option("header", header) # Indicates whether the file contains a header row
          .option("inferSchema", False)# Bronze layer keeps raw schema (no automatic inference)
          .csv(path) # Read CSV file
          .withColumn("ingestion_timestamp", current_timestamp())   # Audit column for load time
          .withColumn("source_system", lit(source_system)) # Tracks data origin
    )
    df.write.mode("overwrite").format("delta").saveAsTable(table)
    return df.count()

# Function to ingest JSON files
def bronze_json(path, table, source_system):
    df = (spark.read
          .json(path) # Read JSON file
          .withColumn("ingestion_timestamp", current_timestamp())
          .withColumn("source_system", lit(source_system))
    )
    df.write.mode("overwrite").format("delta").saveAsTable(table)
    return df.count()

# Base directory containing raw datasets
base = "/Volumes/capstone/bronze/raw_files"

# ==========================================
# DATASET CONFIGURATION
# ==========================================
# Each entry includes:
# - File format (csv/json)
# - File path
# - Target Delta table name
# - Source system
# - Header indicator (if applicable)
# ==========================================


datasets = [
    # ---------------- Diseases ----------------
    {"fmt":"csv",  "path":f"{base}/diseases/ICD10codes.csv",
     "table":"capstone.bronze.diseases_icd10_raw", "src":"CDC/ICD", "header":False},

    {"fmt":"csv",  "path":f"{base}/diseases/U.S._Chronic_Disease_Indicators_20260223.csv",
     "table":"capstone.bronze.diseases_chronic_indicators_raw", "src":"CDC/CDI", "header":True},

    {"fmt":"json", "path":f"{base}/diseases/drug-label-0013-of-0013.json",
     "table":"capstone.bronze.diseases_drug_label_raw", "src":"FDA/LABEL"},

    {"fmt":"csv",  "path":f"{base}/diseases/icd9to10dictionary.csv",
     "table":"capstone.bronze.icd9to10_dictionary_raw", "src":"ICD/MAP", "header":True},

    # ---------------- Geography ----------------
    # kjo është .txt, jo csv -> e lëmë jashtë për momentin (ose e lexojmë si text më poshtë)
    {"fmt":"csv",  "path":f"{base}/geography/ACSDT5Y2023.B01003-Data.csv",
     "table":"capstone.bronze.geography_population_raw", "src":"CENSUS/ACS", "header":True},

    # ---------------- Insurance ----------------
    {"fmt":"csv", "path":f"{base}/Insurance/benefits-and-cost-sharing-puf.csv",
     "table":"capstone.bronze.insurance_benefits_raw", "src":"CMS", "header":True},

    {"fmt":"csv", "path":f"{base}/Insurance/plan-attributes-puf.csv",
     "table":"capstone.bronze.insurance_plan_attributes_raw", "src":"CMS", "header":True},

    # ---------------- Medications (OpenFDA style) ----------------
    {"fmt":"csv", "path":f"{base}/medications/DrugReviews.csv",
     "table":"capstone.bronze.medication_reviews_raw", "src":"DRUG_REVIEWS", "header":True},

    {"fmt":"csv", "path":f"{base}/medications/exclusivity.csv",
     "table":"capstone.bronze.openfda_exclusivity_raw", "src":"OPENFDA", "header":True},

    {"fmt":"csv", "path":f"{base}/medications/package.csv",
     "table":"capstone.bronze.openfda_package_raw", "src":"OPENFDA", "header":True},

    {"fmt":"csv", "path":f"{base}/medications/patent.csv",
     "table":"capstone.bronze.openfda_patent_raw", "src":"OPENFDA", "header":True},

    {"fmt":"csv", "path":f"{base}/medications/product.csv",
     "table":"capstone.bronze.openfda_product_raw", "src":"OPENFDA", "header":True},

    {"fmt":"csv", "path":f"{base}/medications/products.csv",
     "table":"capstone.bronze.openfda_products_raw", "src":"OPENFDA", "header":True},

    # ---------------- Pharmacies ----------------
    {"fmt":"csv", "path":f"{base}/pharmacies/Pharmacies.csv",
     "table":"capstone.bronze.pharmacies_raw", "src":"PHARMACY_DIR", "header":True},
]

See the errors and the tables that failed to ingest

In [0]:
%python
# Set active catalog
spark.sql("USE CATALOG capstone")
# Create Bronze schema if it does not already exist
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")

# Initialize tracking containers
results, errors = [], [] 
# Stores successful loads (table name + row count)
#Stores failed loads (table name + path + error message)

# Iterate through dataset configuration
for d in datasets:
    try:
        if d["fmt"] == "csv":
            rows = bronze_csv(d["path"], d["table"], d["src"], header=d.get("header", True))
        elif d["fmt"] == "json":
            rows = bronze_json(d["path"], d["table"], d["src"])
        else:
            raise ValueError("Unsupported format")

        results.append((d["table"], rows))
        print(f"{d['table']} | rows={rows}")

    except Exception as e:
        errors.append((d["table"], d["path"], str(e)[:200]))
        print(f"FAILED {d['table']}")
        print("   ", d["path"])
        print("   ", str(e)[:200])

# Return summary of execution
# results → list of successfully ingested tables
# errors  → list of failed ingestions (if any)
results, errors

### Fixing the errors 
>   First drop the tables

In [0]:
USE CATALOG capstone;

DROP TABLE IF EXISTS bronze.diseases_icd10_raw;
DROP TABLE IF EXISTS bronze.icd9to10_dictionary_raw;

DROP TABLE IF EXISTS bronze.openfda_package_raw;
DROP TABLE IF EXISTS bronze.openfda_product_raw;
DROP TABLE IF EXISTS bronze.openfda_products_raw;

DROP TABLE IF EXISTS bronze.diseases_drug_label_raw;
DROP TABLE IF EXISTS bronze.diseases_drug_label_raw_text;

### Fixing the errors
-  #### Ingest the tables

In [0]:
%python
from pyspark.sql.functions import current_timestamp, lit

    
    #Reads a raw CSV file safely without assuming header structure,
    #assigns generic column names,
    #adds metadata columns,
    #and saves it as a Delta table.
    

 # Read CSV as completely raw (no header, no schema inference)
def bronze_raw_safe(path: str, table: str, source_system: str):
    df_raw = (spark.read
              .option("header", False) # Do not treat first row as header
              .option("inferSchema", False) # Keep all columns as string (raw Bronze principle)
              .csv(path))
    # safe column names
    new_cols = [f"col_{i+1}" for i in range(len(df_raw.columns))]

     # Rename columns and add audit metadata
    df = (df_raw.toDF(*new_cols)
          .withColumn("ingestion_timestamp", current_timestamp())
          .withColumn("source_system", lit(source_system)))
    
    # Save as Delta table (overwrite used for development)
    df.write.mode("overwrite").format("delta").saveAsTable(table)

    # Print ingestion summary
    print(f" {table} | rows={df.count()}")
# ------------------------------------------
# Base directory containing raw Bronze files
# ------------------------------------------
# This is the root location where all raw datasets
# are stored before ingestion into Delta tables.
base = "/Volumes/capstone/bronze/raw_files"

In [0]:
%python
# ICD10 (raw-safe)
bronze_raw_safe(f"{base}/diseases/ICD10codes.csv",
                "capstone.bronze.diseases_icd10_raw",
                "CDC/ICD")

# ICD9->10 dictionary (raw-safe)
bronze_raw_safe(f"{base}/diseases/icd9to10dictionary.csv",
                "capstone.bronze.icd9to10_dictionary_raw",
                "ICD/MAP")

# OpenFDA files failed (raw-safe)
bronze_raw_safe(f"{base}/medications/package.csv",
                "capstone.bronze.openfda_package_raw",
                "OPENFDA")

bronze_raw_safe(f"{base}/medications/product.csv",
                "capstone.bronze.openfda_product_raw",
                "OPENFDA")

bronze_raw_safe(f"{base}/medications/products.csv",
                "capstone.bronze.openfda_products_raw",
                "OPENFDA")

# Drug label JSON - ruaje si TEXT (raw lines) temporary
df_text = (spark.read.text(f"{base}/diseases/drug-label-0013-of-0013.json")
           .withColumn("ingestion_timestamp", current_timestamp())
           .withColumn("source_system", lit("FDA/LABEL")))

df_text.write.mode("overwrite").format("delta").saveAsTable("capstone.bronze.diseases_drug_label_raw_text")
print("capstone.bronze.diseases_drug_label_raw_text")

In [0]:
USE CATALOG capstone;
SHOW TABLES IN bronze;

SELECT COUNT(*) FROM bronze.diseases_icd10_raw;
SELECT COUNT(*) FROM bronze.icd9to10_dictionary_raw;
SELECT COUNT(*) FROM bronze.openfda_package_raw;
SELECT COUNT(*) FROM bronze.diseases_drug_label_raw_text;