### BRONZE INGESTION PIPELINE  
**RAW GCS → BRONZE LAYER (DELTA TABLES)**

---

#### Purpose
This notebook implements the **BRONZE layer ingestion** for the Data Lakehouse.

It reads raw data files (e.g., CSV) from **Google Cloud Storage (GCS)**, performs minimal parsing, and writes them in **Delta Lake** format to the **BRONZE layer** in the medallion architecture.

---

#### Actions Performed
- Reads raw source data from GCS buckets (CRM and ERP systems)
- Converts raw CSV data into Delta format
- Saves curated raw copies in the BRONZE folder of the Data Lake
- Records **audit logs** of ingestion (table name, record counts, status)

---

#### Characteristics of BRONZE Layer
- Immutable historical raw ingested data
- Schema inferred at load time
- Minimal transformations (parsing only, no business logic)
- Provides consistent, structured Delta tables for downstream **SILVER layer**

---

#### Audit / Lineage
- Ingestion metadata is logged as a Delta table (`_bronze_audit_logs`)
- Includes table name, source path, load timestamp, record counts, and status

---

#### Parameters
- Paths to raw data sources (GCS)
- Target BRONZE Delta location in GCS

---

#### Usage Notes
- Designed to be **idempotent** (overwrite mode for each Delta table)
- Configurable list of source files in notebook (easy to add/remove sources)
- Intended for **regular batch schedule** or **triggered orchestration**

---

#### Example Usage
- Databricks interactive notebook.
- Databricks job scheduled for nightly ingestion

In [0]:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType
from pyspark.sql.functions import to_date, to_timestamp

In [0]:
# Raw Data
datalake_crm_path = 'gs://my-bucket-deep/Data-Lake/source_crm/'
datalake_erp_path = 'gs://my-bucket-deep/Data-Lake/source_erp/'

# Bronze layer path
bronze_path = 'gs://my-bucket-deep/Medallion/bronze'

#### Config Section

In [0]:
# Configuration: list of data sources

bronze_sources = [
    {
        "name": "crm_cust_info",
        "path": f"{datalake_crm_path}cust_info.csv",
        "format": "csv",
        "options": {"header": True, "inferSchema": True}
    },
    {
        "name": "crm_prd_info",
        "path": f"{datalake_crm_path}prd_info.csv",
        "format": "csv",
        "options": {"header": True, "inferSchema": True}
    },
    {
        "name": "crm_sales_details",
        "path": f"{datalake_crm_path}sales_details.csv",
        "format": "csv",
        "options": {"header": True, "inferSchema": True}
    },
    {
        "name": "erp_cust_az12",
        "path": f"{datalake_erp_path}CUST_AZ12.csv",
        "format": "csv",
        "options": {"header": True, "inferSchema": True}
    },
    {
        "name": "erp_loc_a101",
        "path": f"{datalake_erp_path}LOC_A101.csv",
        "format": "csv",
        "options": {"header": True, "inferSchema": True}
    },
    {
        "name": "erp_px_cat_g1v2",
        "path": f"{datalake_erp_path}PX_CAT_G1V2.csv",
        "format": "csv",
        "options": {"header": True, "inferSchema": True}
    }
]


#### Expected Schema

In [0]:
schema_df = spark.read.json('gs://my-bucket-deep/Data-Lake/config/expected_schemas.json')

We need a Python dictionary, not a DataFrame.

In [0]:
# Expected schema config
from google.cloud import storage
import json

# config in GCS
bucket_name = "my-bucket-deep"
blob_path = "Data-Lake/config/expected_schemas.json"

# Initialize client
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)

# Download as string
content = blob.download_as_text()

# Parse JSON
expected_schemas_raw = json.loads(content)

In [0]:
type_map = {
    "StringType": StringType(),
    "IntegerType": IntegerType(),
    "DateType": DateType(),
    "TimestampType": TimestampType()
}

expected_schemas = {}

for table, fields in expected_schemas_raw.items():
    struct_fields = [StructField(f['name'], type_map[f['type']], True) for f in fields]
    expected_schemas[table] = StructType(struct_fields)

print("expected_schemas loaded:", list(expected_schemas.keys()))

expected_schemas loaded: ['crm_cust_info', 'crm_prd_info', 'crm_sales_details', 'erp_loc_a101', 'erp_cust_az12', 'erp_px_cat_g1v2']


#### Logging function

In [0]:
# Utility Logger
def log(msg):
    print(f"[{datetime.now().isoformat()}] {msg}")

#### Type Casting

In [0]:
def cast_columns(df, expected_schema):
    for field in expected_schema.fields:
        col_name = field.name
        if col_name in df.columns:
            if isinstance(field.dataType, DateType):
                df = df.withColumn(col_name, to_date(col_name))
            elif isinstance(field.dataType, TimestampType):
                df = df.withColumn(col_name, to_timestamp(col_name))
    return df

#### Schema validation

In [0]:
def normalize_type(t):
    if t in ("date", "timestamp"):
        return "timestamp"
    return t

def validate_schema(df, expected_schema, table_name):
    actual_fields = set((f.name, normalize_type(f.dataType.simpleString())) for f in df.schema.fields)
    expected_fields = set((f.name, normalize_type(f.dataType.simpleString())) for f in expected_schema.fields)

    if actual_fields != expected_fields:
        raise Exception(
            f"Schema mismatch for {table_name}.\nExpected: {expected_fields}\nActual: {actual_fields}"
        )

#### Bronze Loading

In [0]:
def load_to_bronze_table(source, bronze_base_path):
    try:
        log(f"START: Loading {source['name']} from {source['path']}")

        df = spark.read.format(source['format']).options(**source['options']).load(source['path'])

        df = df.toDF(*[c.lower() for c in df.columns])

        df = cast_columns(df, expected_schemas[source['name']])

        # Schema Validation (if defined)
        if source['name'] in expected_schemas:
            validate_schema(df, expected_schemas[source['name']], source['name'])
            log(f"INFO: Schema validation passed for {source['name']}")

        # Basic check
        record_count = df.count()
        log(f"INFO: {source['name']} loaded with {record_count} records")

        # Write as Delta
        output_path = f"{bronze_base_path}/{source['name']}"
        df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(output_path)

        log(f"SUCCESS: Written {source['name']} to {output_path}")

        return {
            "table_name": source['name'],
            "status": "SUCCESS",
            "records": record_count,
            "error": None,
            "loaded_at": datetime.now().isoformat(),
            "source_path": source['path']
        }

    except Exception as e:
        log(f"ERROR: Failed to load {source['name']} - {e}")
        return {
            "table_name": source['name'],
            "status": "FAILED",
            "records": 0,
            "error": str(e),
            "loaded_at": datetime.now().isoformat(),
            "source_path": source['path']
        }


In [0]:
# Define Audit Schema
audit_schema = StructType([
    StructField("table_name", StringType(), True),
    StructField("status", StringType(), True),
    StructField("records", IntegerType(), True),
    StructField("error", StringType(), True),
    StructField("loaded_at", StringType(), True),
    StructField("source_path", StringType(), True)
])

In [0]:
# Process all sources and collect audit records
audit_records = []
for source in bronze_sources:
    result = load_to_bronze_table(source, bronze_path)
    audit_records.append(result)

[2025-06-30T07:50:48.402201] START: Loading crm_cust_info from gs://my-bucket-deep/Data-Lake/source_crm/cust_info.csv
[2025-06-30T07:50:54.579091] INFO: Schema validation passed for crm_cust_info
[2025-06-30T07:50:57.431256] INFO: crm_cust_info loaded with 18494 records
[2025-06-30T07:51:17.409793] SUCCESS: Written crm_cust_info to gs://my-bucket-deep/Medallion/bronze/crm_cust_info
[2025-06-30T07:51:17.410014] START: Loading crm_prd_info from gs://my-bucket-deep/Data-Lake/source_crm/prd_info.csv
[2025-06-30T07:51:20.595515] INFO: Schema validation passed for crm_prd_info
[2025-06-30T07:51:21.107917] INFO: crm_prd_info loaded with 397 records
[2025-06-30T07:51:30.482537] SUCCESS: Written crm_prd_info to gs://my-bucket-deep/Medallion/bronze/crm_prd_info
[2025-06-30T07:51:30.482753] START: Loading crm_sales_details from gs://my-bucket-deep/Data-Lake/source_crm/sales_details.csv
[2025-06-30T07:51:33.251175] INFO: Schema validation passed for crm_sales_details
[2025-06-30T07:51:34.036008] I

In [0]:
# Write Audit Logs
# Avoids ambiguous NoneType inference.
# Clean records
for rec in audit_records:
    if rec["error"] is None:
        rec["error"] = ""

# Create DataFrame with explicit schema
audit_df = spark.createDataFrame(audit_records, schema=audit_schema)

# Write audit logs
audit_log_path = f"{bronze_path}/_bronze_audit_logs"
audit_df.write.format('delta').mode('append').save(audit_log_path)

log(f"SUCCESS: Audit logs written to {audit_log_path}")

[2025-06-30T07:52:17.626211] SUCCESS: Audit logs written to gs://my-bucket-deep/Medallion/bronze/_bronze_audit_logs


In [0]:
audit_df.display()

table_name,status,records,error,loaded_at,source_path
crm_cust_info,SUCCESS,18494,,2025-06-30T07:51:17.409965,gs://my-bucket-deep/Data-Lake/source_crm/cust_info.csv
crm_prd_info,SUCCESS,397,,2025-06-30T07:51:30.482691,gs://my-bucket-deep/Data-Lake/source_crm/prd_info.csv
crm_sales_details,SUCCESS,60398,,2025-06-30T07:51:41.693567,gs://my-bucket-deep/Data-Lake/source_crm/sales_details.csv
erp_cust_az12,SUCCESS,18484,,2025-06-30T07:51:51.665599,gs://my-bucket-deep/Data-Lake/source_erp/CUST_AZ12.csv
erp_loc_a101,SUCCESS,18484,,2025-06-30T07:52:01.426480,gs://my-bucket-deep/Data-Lake/source_erp/LOC_A101.csv
erp_px_cat_g1v2,SUCCESS,37,,2025-06-30T07:52:09.886347,gs://my-bucket-deep/Data-Lake/source_erp/PX_CAT_G1V2.csv
