# Bronze Layer ETL Pipeline using Delta Live Tables

This notebook implements the Bronze layer data ingestion from CSV files using Databricks Delta Live Tables (DLT). It processes 5 source files:

1. customers.csv - Customer information
2. transaction_btc.csv - Bitcoin transactions 
3. transaction_commodities.csv - Commodities transactions
4. quotation_btc.csv - Bitcoin price quotations
5. quotation_yfinance.csv - Gold price quotations

The pipeline implements streaming ingestion with data quality checks.

In [None]:
# Import required libraries
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define source paths - these will be passed as parameters to the pipeline
SOURCE_DIR = "dbfs:/FileStore/data_raw/"

# Define schema for each source
customers_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("documento", StringType(), True),
    StructField("segmento", StringType(), True),
    StructField("pais", StringType(), True),
    StructField("estado", StringType(), True),
    StructField("cidade", StringType(), True),
    StructField("created_at", TimestampType(), True)
])

transaction_btc_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("data_hora", TimestampType(), True),
    StructField("ativo", StringType(), True),
    StructField("quantidade", DoubleType(), True),
    StructField("tipo_operacao", StringType(), True),
    StructField("moeda", StringType(), True),
    StructField("cliente_id", StringType(), True),
    StructField("canal", StringType(), True),
    StructField("mercado", StringType(), True),
    StructField("arquivo_origem", StringType(), True),
    StructField("importado_em", TimestampType(), True)
])

transaction_commodities_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("data_hora", TimestampType(), True),
    StructField("commodity_code", StringType(), True),
    StructField("quantidade", DoubleType(), True),
    StructField("tipo_operacao", StringType(), True),
    StructField("unidade", StringType(), True),
    StructField("moeda", StringType(), True),
    StructField("cliente_id", StringType(), True),
    StructField("canal", StringType(), True),
    StructField("mercado", StringType(), True),
    StructField("arquivo_origem", StringType(), True),
    StructField("importado_em", TimestampType(), True)
])

quotation_btc_schema = StructType([
    StructField("ativo", StringType(), True),
    StructField("preco", DoubleType(), True),
    StructField("moeda", StringType(), True),
    StructField("horario_coleta", TimestampType(), True)
])

quotation_yfinance_schema = StructType([
    StructField("ativo", StringType(), True),
    StructField("preco", DoubleType(), True),
    StructField("moeda", StringType(), True),
    StructField("horario_coleta", TimestampType(), True)
])

## Bronze Tables Creation

Here we create the bronze tables using DLT to ingest data from the source CSV files. We'll use the `@dlt.table` decorator with data quality expectations.

In [None]:
# Bronze table for customers
@dlt.table(
    name="bronze_customers",
    comment="Raw customer data ingested from CSV files"
)
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_fail("valid_customer_documento", "documento IS NOT NULL")
@dlt.expect("valid_timestamp", "created_at IS NOT NULL")
def bronze_customers():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(customers_schema)
        .load(f"{SOURCE_DIR}/customers.csv")
        .select("*", current_timestamp().alias("_ingested_at"))
    )

# Bronze table for Bitcoin transactions
@dlt.table(
    name="bronze_transactions_btc",
    comment="Raw Bitcoin transaction data ingested from CSV files"
)
@dlt.expect_or_fail("valid_transaction_id", "transaction_id IS NOT NULL")
@dlt.expect_or_fail("valid_quantidade", "quantidade > 0")
@dlt.expect("valid_timestamp", "data_hora IS NOT NULL")
def bronze_transactions_btc():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(transaction_btc_schema)
        .load(f"{SOURCE_DIR}/transacation_btc.csv")
        .select("*", current_timestamp().alias("_ingested_at"))
    )

# Bronze table for commodities transactions
@dlt.table(
    name="bronze_transactions_commodities",
    comment="Raw commodities transaction data ingested from CSV files"
)
@dlt.expect_or_fail("valid_transaction_id", "transaction_id IS NOT NULL")
@dlt.expect_or_fail("valid_quantidade", "quantidade > 0")
@dlt.expect("valid_timestamp", "data_hora IS NOT NULL")
def bronze_transactions_commodities():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(transaction_commodities_schema)
        .load(f"{SOURCE_DIR}/transaction_commodities.csv")
        .select("*", current_timestamp().alias("_ingested_at"))
    )

# Bronze table for Bitcoin quotations
@dlt.table(
    name="bronze_quotations_btc",
    comment="Raw Bitcoin price quotation data ingested from CSV files"
)
@dlt.expect_or_fail("valid_ativo", "ativo IS NOT NULL")
@dlt.expect_or_fail("valid_preco", "preco > 0")
@dlt.expect("valid_timestamp", "horario_coleta IS NOT NULL")
def bronze_quotations_btc():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(quotation_btc_schema)
        .load(f"{SOURCE_DIR}/quotation_btc.csv")
        .select("*", current_timestamp().alias("_ingested_at"))
    )

# Bronze table for YFinance quotations (Gold)
@dlt.table(
    name="bronze_quotations_yfinance",
    comment="Raw YFinance price quotation data ingested from CSV files"
)
@dlt.expect_or_fail("valid_ativo", "ativo IS NOT NULL")
@dlt.expect_or_fail("valid_preco", "preco > 0")
@dlt.expect("valid_timestamp", "horario_coleta IS NOT NULL")
def bronze_quotations_yfinance():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(quotation_yfinance_schema)
        .load(f"{SOURCE_DIR}/quotation_yfinance.csv")
        .select("*", current_timestamp().alias("_ingested_at"))
    )

## Pipeline Configuration and Deployment

To deploy this pipeline:

1. Create a Pipeline in the Databricks workspace
2. Set the Pipeline Mode to "Triggered"
3. Set the Source to this notebook
4. Configure the following parameters:
   - Target: The database where tables will be created (e.g., "raw")
   - Storage Location: The DBFS path for table storage
   - Source Directory: The path to source CSV files
5. Set the Cluster Mode to "Fixed Size" with appropriate sizing
6. Enable autoscaling if needed
7. Click Create and then Start the pipeline

Example configuration in JSON format:

In [None]:
# Example Pipeline Configuration
{
    "name": "raw_to_bronze_etl",
    "storage": "dbfs:/pipelines/raw_to_bronze",
    "configuration": {
        "pipelines.trigger.interval": "6 hour",
        "source_path": "dbfs:/FileStore/data_raw/",
        "target_database": "raw"
    },
    "clusters": [
        {
            "label": "default",
            "autoscale": {
                "min_workers": 1,
                "max_workers": 2
            }
        }
    ],
    "libraries": [
        {
            "notebook": {
                "path": "/Repos/pipeline/bronze/bronze_tables"
            }
        }
    ],
    "continuous": false
}