# nb_01_ingestao_bronze
**Objetivo:** realizar a ingestão dos dados brutos extraídos do banco AdventureWorks2017 para a camada Bronze, preservando a estrutura original das tabelas e garantindo rastreabilidade dos dados.


In [0]:
%sql
-- Cria catálogo e schemas
CREATE CATALOG IF NOT EXISTS mvp_catalog;
USE CATALOG mvp_catalog;

CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;

-- Cria Volumes
CREATE VOLUME IF NOT EXISTS bronze.bronze_data;
CREATE VOLUME IF NOT EXISTS silver.silver_data;
CREATE VOLUME IF NOT EXISTS gold.gold_data;

SHOW CATALOGS;
SHOW SCHEMAS;


In [0]:
%python
from pyspark.sql import functions as F


In [0]:
%python
def clean_column_name(col_name):
    cleaned = (
        col_name.strip()
        .replace(" ", "_")
        .replace(";", "")
        .replace("{", "")
        .replace("}", "")
        .replace("(", "")
        .replace(")", "")
        .replace("\n", "")
        .replace("\t", "")
        .replace("=", "")
        .lower()
    )
    return cleaned[:255]


In [0]:
%python
def ingest_bronze(table_name, file_name):
    raw = (
        spark.read
        .option("header", "true")
        .option("inferSchema", "false")
        .csv(f"/Volumes/mvp_catalog/bronze/bronze_data/{file_name}")
    )

    norm = raw.select([
        F.when(F.col(c) == "NULL", None).otherwise(F.col(c)).alias(c)
        for c in raw.columns
    ])

    bronze_df = norm.toDF(
        *[clean_column_name(c) for c in norm.columns]
    )

    spark.sql(f"DROP TABLE IF EXISTS bronze.{table_name}")

    bronze_df.write \
        .mode("overwrite") \
        .format("delta") \
        .saveAsTable(f"bronze.{table_name}")


#PRODUTOS

In [0]:
%python
ingest_bronze(
    table_name="product",
    file_name="Product.csv"
)


In [0]:
%python
ingest_bronze(
    table_name="product_subcategory",
    file_name="ProductSubcategory.csv"
)


In [0]:
%python
ingest_bronze(
    table_name="product_category",
    file_name="ProductCategory.csv"
)


#VENDAS

In [0]:
%python
ingest_bronze(
    table_name="sales_order_header",
    file_name="SalesOrderHeader.csv"
)


In [0]:
%python
ingest_bronze(
    table_name="sales_order_detail",
    file_name="SalesOrderDetail.csv"
)


#CLIENTES

In [0]:
%python
ingest_bronze(
    table_name="customer",
    file_name="Customer.csv"
)


In [0]:
%python
ingest_bronze(
    table_name="person",
    file_name="Person.csv"
)


In [0]:
%python
ingest_bronze(
    table_name="email_address",
    file_name="EmailAddress.csv"
)


#TERRITÓRIO

In [0]:
%python
ingest_bronze(
    table_name="sales_territory",
    file_name="SalesTerritory.csv"
)


In [0]:
%python
ingest_bronze(
    table_name="country_region",
    file_name="CountryRegion.csv"
)


In [0]:
%python
product_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "false")  # FUNDAMENTAL
    .csv("/Volumes/mvp_catalog/bronze/bronze_data/Product.csv")
)


In [0]:
%python
product_norm = product_raw.select([
    F.when(F.col(c) == "NULL", None).otherwise(F.col(c)).alias(c)
    for c in product_raw.columns
])




In [0]:
%python
product_bronze = product_norm.toDF(
    *[clean_column_name(col) for col in product_norm.columns]
)


In [0]:
%python
product_bronze.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("bronze.product")


#PRODUCT SUBCATEGORY

In [0]:
%python
subcategory_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "false")
    .csv("/Volumes/mvp_catalog/bronze/bronze_data/ProductSubcategory.csv")
)


In [0]:
%python
subcategory_norm = subcategory_raw.select([
    F.when(F.col(c) == "NULL", None).otherwise(F.col(c)).alias(c)
    for c in subcategory_raw.columns
])


In [0]:
%python
subcategory_bronze = subcategory_norm.toDF(
    *[clean_column_name(col) for col in subcategory_norm.columns]
)


In [0]:
%python
subcategory_bronze.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("bronze.product_subcategory")



#PRODUCT CATEGORY

In [0]:
%python
category_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "false")
    .csv("/Volumes/mvp_catalog/bronze/bronze_data/ProductCategory.csv")
)


In [0]:
%python
category_norm = category_raw.select([
    F.when(F.col(c) == "NULL", None).otherwise(F.col(c)).alias(c)
    for c in category_raw.columns
])


In [0]:
%python
category_bronze = category_norm.toDF(
    *[clean_column_name(col) for col in category_norm.columns]
)


In [0]:
%python
category_bronze.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("bronze.product_category")



#VALIDAÇÃO FINAL

In [0]:
%sql
SHOW TABLES IN bronze;


In [0]:
%sql
SELECT COUNT(*) FROM bronze.sales_order_header;



In [0]:
%sql
SELECT COUNT(*) FROM bronze.product;


In [0]:
%sql
SELECT COUNT(*) FROM bronze.sales_order_detail;

In [0]:
%sql
-- Listar conteúdo do volume para confirmar onde o CSV foi colocado
LIST '/Volumes/mvp_catalog/bronze/bronze_data/';