# Data Ingestion - Bronze Layer

## Overview
This notebook implements the **Bronze Layer** of the Medallion Architecture for the Capstone Data Pipeline. It ingests raw data from multiple sources (transactions, customers, and products) and creates initial Delta tables with schema validation, data rescue capabilities, and audit logging.

## Objective
- Ingest raw CSV and JSON data from landing zones
- Apply predefined schemas to validate incoming data
- Implement schema evolution and data rescue mechanisms
- Create Bronze-level Delta tables for downstream processing
- Maintain audit trails for data lineage tracking

## Architecture
The notebook follows the **Medallion Architecture** pattern:
- **Bronze**: Raw data ingestion layer (this notebook)
- **Silver**: Cleaned and validated data
- **Gold**: Business-ready aggregated data

## Key Features
- **Schema Evolution**: Uses Databricks Auto Loader with `rescue` mode to handle schema changes
- **Data Rescue**: Captures unexpected columns in `_rescue` column for debugging
- **Audit Logging**: Tracks all ingestion events with timestamps
- **Structured Schemas**: Enforces strict typing on ingestion

## Data Sources

### 1. Products
- **Format**: CSV
- **Path**: `/Volumes/capstone/bronze/raw/files/landing/products_raw`
- **Fields**: item_id, product_name, category

### 2. Transactions
- **Format**: CSV
- **Path**: `/Volumes/capstone/bronze/raw/files/landing/transactions_raw`
- **Fields**: order_id, item_id, quantity, price, order_timestamp, corrupted_flag

### 3. Customers
- **Format**: JSON (multiline)
- **Path**: `/Volumes/capstone/bronze/raw/files/landing/customers_raw`
- **Fields**: customer_id, name, contact (nested: email), region

## Output Tables
| Table Name | Location | Format |
|-----------|----------|--------|
| `capstone.bronze.products` | Managed Delta | Delta |
| `capstone.bronze.transaction` | Managed Delta | Delta |
| `capstone.bronze.customer` | Managed Delta | Delta |

## Audit Logging
- **Log Location**: `/Volumes/capstone/bronze/history`
- **Contents**: Table creation events with timestamps, row counts, and status
- Accessible via Delta table format for querying

## Configuration Parameters

### Path Variables
- `base_raw_path`: Root path for raw data volumes
- `checkpointlocation`: Checkpoint directory for streaming operations
- `log_path`: Audit log storage location

### Processing Options
- **CSV Options**: Format detection, header parsing, schema evolution with rescue mode
- **JSON Options**: Multiline support, format detection, schema evolution with rescue mode

## Dependencies
- **Spark SQL**: Core data processing
- **Databricks Auto Loader**: Incremental data ingestion
- **Delta Lake**: Table format and ACID transactions
- **capstone_pipeline.main**: Custom functions for table creation and logging

## Usage Notes
- Ensure all source paths exist before running
- Review audit logs after ingestion for any data rescue events
- Monitor `_rescue` columns for data quality issues
- Checkpoint locations prevent duplicate processing in streaming scenarios

In [None]:
dbutils.widgets.text("catalog", "capstone", "Enter the Catalog: ")

UsageError: Line magic function `%skip` not found.


In [1]:
# Import required PySpark data types for schema definition
from pyspark.sql.types import StructType, StructField, StringType

In [None]:
sqlcmd= f'CREATE catalog IF NOT EXISTS {dbutils.widgets.get("catalog")}'
spark.sql(sqlcmd)

sqlcmd= f'use catalog {dbutils.widgets.get("catalog")}'
spark.sql(sqlcmd)

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near '{'. SQLSTATE: 42601 (line 1, pos 12)

== SQL ==
use catalog {dbutils.widgets.get("catalog")}
------------^^^


JVM stacktrace:
org.apache.spark.sql.catalyst.parser.ParseException
	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:479)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:120)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:167)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:118)
	at org.apache.spark.sql.classic.SparkSession.$anonfun$sql$7(SparkSession.scala:824)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:265)
	at org.apache.spark.sql.classic.SparkSession.$anonfun$sql$6(SparkSession.scala:824)
	at com.databricks.spark.util.FrameProfiler$.$anonfun$record$1(FrameProfiler.scala:114)
	at com.databricks.spark.util.FrameProfilerExporter$.maybeExportFrameProfiler(FrameProfilerExporter.scala:200)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:105)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:698)
	at org.apache.spark.sql.classic.SparkSession.$anonfun$sql$5(SparkSession.scala:820)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:860)
	at org.apache.spark.sql.classic.SparkSession.sql(SparkSession.scala:819)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.executeSQL(SparkConnectPlanner.scala:3766)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:3590)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:3433)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:385)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:281)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:238)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:532)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:860)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:532)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:124)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:118)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:123)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:531)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:238)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$execute$1(ExecuteThreadRunner.scala:141)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at com.databricks.spark.connect.service.UtilizationMetrics.recordActiveQueries(UtilizationMetrics.scala:43)
	at com.databricks.spark.connect.service.UtilizationMetrics.recordActiveQueries$(UtilizationMetrics.scala:40)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.recordActiveQueries(ExecuteThreadRunner.scala:53)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:139)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:586)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:296)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:586)

In [None]:
sqlcmd= f'CREATE SCHEMA IF NOT EXISTS {dbutils.widgets.get("catalog")}.bronze'
spark.sql(sqlcmd)

In [None]:
# Define base paths for raw data ingestion and checkpoint locations
base_raw_path = f"/Volumes/{dbutils.widgets.get("catalog")}"
landing_path = f"{base_raw_path}/bronze/raw/files/landing"
transactions_path = f"{landing_path}/transactions_raw"
customers_path = f"{landing_path}/customers_raw"
products_path = f"{landing_path}/products_raw"
checkpointlocation = f"{base_raw_path}/meta/checkpoint"

In [None]:
# Define schema for transactions data
# Fields: order_id, item_id, quantity, price, order_timestamp, corrupted_flag
transactions_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("item_id", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("price", StringType(), True),
    StructField("order_timestamp", StringType(), True),
    StructField("corrupted_flag", StringType(), True)
])

In [None]:
# Define schema for customers data
# Includes nested contact structure with email field
customers_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("contact", StructType([
        StructField("email", StringType(), True)
    ]), True),
    StructField("region", StringType(), True)
])

In [None]:
# Define schema for products data
# Fields: item_id, product_name, category
products_schema = StructType([
    StructField("item_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True)
])

In [None]:
# Configure Databricks Auto Loader options for CSV and JSON ingestion
# cloudFiles.schemaEvolutionMode = "rescue" captures unexpected columns in _rescue column
csv_options = {
    "cloudFiles.format": "csv",
    "header": "true",
    "cloudFiles.schemaEvolutionMode": "rescue",
    "rescuedDataColumn": "_rescue"
}

# JSON options support multiline JSON documents with schema rescue mode
json_options = {
    "cloudFiles.format": "json",
    "multiline": "true",
    "cloudFiles.schemaEvolutionMode": "rescue",
    "rescuedDataColumn": "_rescue"
}

In [None]:
# Import helper functions from the capstone_pipeline module
# Functions: create_table_bronze (creates Bronze layer Delta tables), audit_log (logs table creation events)
from capstone_pipeline.main import create_table_bronze, audit_log

# Define path for audit logs
log_path = f"/Volumes/{dbutils.widgets.get("catalog")}/meta/history"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {dbutils.widgets.get("catalog")}.bronze.history")

In [None]:
# Create Bronze layer products table from CSV data with schema validation and rescue mode
table_name = f'{dbutils.widgets.get("catalog")}.bronze.products'
chkpnt_path = checkpointlocation + "/bronzeproduct"
create_table_bronze(spark, products_schema, products_path, chkpnt_path, csv_options, table_name)
audit_log(spark, table_name, log_path)

In [None]:
# Create Bronze layer transactions table from CSV data with schema validation and rescue mode
table_name = f'{dbutils.widgets.get("catalog")}.bronze.transactions'
chkpnt_path = checkpointlocation + "/bronzetransaction"
create_table_bronze(spark, transactions_schema, transactions_path, chkpnt_path, csv_options, table_name)
audit_log(spark, table_name, log_path)

In [None]:
# Create Bronze layer customers table from multiline JSON data with schema validation and rescue mode
table_name = f'{dbutils.widgets.get("catalog")}.bronze.customers'
chkpnt_path = checkpointlocation + "/bronzecustomer"
create_table_bronze(spark, customers_schema, customers_path, chkpnt_path, json_options, table_name)
audit_log(spark, table_name, log_path)

In [None]:
# Display audit logs to verify all tables were successfully created and ingested
display(spark.read.format("delta").load(log_path))
