# Pipeline: Ingest to Bronze Layer

## Data Source

- **Path:** `s3://buckethospitaldata/`
- **Format:** CSV
- **Storage Provider:** AWS S3 (External Object Storage)

## Destination

- **Catalog Location:** `workspace.hospital_bronze`
- **Format:** Delta Lake Table (Managed Streaming Table)
- **Managed by:** Unity Catalog + Databricks Managed Storage (DBFS under the hood, for Free Edition)


### Configuration

#### Parameter for running Entities in S3

In [0]:
dbutils.widgets.text("dimensional_table", "")
dimensional_table = dbutils.widgets.get("dimensional_table")

assert dimensional_table, "Missing required parameter: dimensional_table"

#### Define paths of data storage locations

In [0]:
# Databricks Storage
catalog_name = "workspace"
schema_bronze = "hospital_bronze"

# data source
entity_source_path = f"s3://buckethospitaldata/data_batching/{dimensional_table}/" #path to dimensional_table folder in data source

# Auto Loader
schema_location = f"s3://buckethospitaldata/pipeline_checkpoints/data_batching/_schemas/bronze/{dimensional_table}" 
checkpoint_location = f"s3://buckethospitaldata/pipeline_checkpoints/data_batching/_checkpoints/bronze/{dimensional_table}"

### Read data fromn S3

In [0]:
df_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaLocation", schema_location)
    .load(entity_source_path)
)


### Quick Transformation
Remove blank spaces in columns names

In [0]:
import re

def clean_column_names(df):
    """
    Sanitize column names for Delta Lake: 
    - Replace any non-alphanumeric characters with underscore
    - Lowercase everything
    - Strip leading and trailing underscores
    """
    cleaned_cols = []
    for col in df.columns:
        new_col = re.sub('[^0-9a-zA-Z]+', '_', col).strip('_')
        cleaned_cols.append((col, new_col))

    for original, cleaned in cleaned_cols:
        if original != cleaned:
            df = df.withColumnRenamed(original, cleaned)
    return df

# apply function to clean data before writing it
df_stream_cleaned = clean_column_names(df_stream)


### Write data To Databricks Storage

In [0]:
(df_stream_cleaned.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_location)
    .outputMode("append")
    .trigger(once=True)
    .table(f"{schema_bronze}.{dimensional_table}")) #Databricks Storage location: catalog.schema.table
