#Ingerindo arquivos .csv para tabelas delta (bronze)

# 1.0 Criando caminho do arquivo

In [0]:
import os
from pyspark.sql.functions import current_timestamp, input_file_name, lit

In [0]:
#Configurando caminho
catalog = "workspace_ecommerce"
schema = "bronze"
volume = "staging_zone_olist"

In [0]:
volume_path = f"/Volumes/{catalog}/{schema}/{volume}/"

In [0]:
#Listando arquivos no diretório
files = dbutils.fs.ls(volume_path)

In [0]:
display(files)

path,name,size,modificationTime
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_customers_dataset.csv,olist_customers_dataset.csv,9033957,1767094670000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_geolocation_dataset.csv,olist_geolocation_dataset.csv,61273883,1767100580000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_order_items_dataset.csv,olist_order_items_dataset.csv,15438671,1767094672000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_order_payments_dataset.csv,olist_order_payments_dataset.csv,5777138,1767094669000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_order_reviews_dataset.csv,olist_order_reviews_dataset.csv,14451670,1767094671000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_orders_dataset.csv,olist_orders_dataset.csv,17654914,1767094672000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_products_dataset.csv,olist_products_dataset.csv,2379446,1767094667000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_sellers_dataset.csv,olist_sellers_dataset.csv,174703,1767094666000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/product_category_name_translation.csv,product_category_name_translation.csv,2613,1767100575000


## Código completo em uma única célula para aplicar o tratamento em todos os arquivos .csv

In [0]:
for file in files:
    if file.name.endswith(".csv"):
        # 1. Definir o nome da tabela baseado no nome do arquivo.
        # Ex: olist_orders_dataset.csv -> orders
        table_name = (
            file.name.replace("olist_", "")
            .replace("_dataset.csv", "")
            .replace(".csv", "")
        )
        full_table_name = f"{catalog}.{schema}.{table_name}"

        print(f"Processando arquivo: {file.name} -> Tabela: {full_table_name}")

    df = (spark.read
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("delimiter", ",") \
        .load(file.path)
      )
    
    df_enriquecido = df \
        .withColumn("dt_ingestao", current_timestamp()) \
        .withColumn("arquivo_origem", lit(file.name))

    destination_table_path = f"{full_table_name}"

    df_enriquecido.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(destination_table_path)

print("Carga completa de todos os arquivos!")

Processando arquivo: olist_customers_dataset.csv -> Tabela: workspace_ecommerce.bronze.customers
Processando arquivo: olist_geolocation_dataset.csv -> Tabela: workspace_ecommerce.bronze.geolocation
Processando arquivo: olist_order_items_dataset.csv -> Tabela: workspace_ecommerce.bronze.order_items
Processando arquivo: olist_order_payments_dataset.csv -> Tabela: workspace_ecommerce.bronze.order_payments
Processando arquivo: olist_order_reviews_dataset.csv -> Tabela: workspace_ecommerce.bronze.order_reviews
Processando arquivo: olist_orders_dataset.csv -> Tabela: workspace_ecommerce.bronze.orders
Processando arquivo: olist_products_dataset.csv -> Tabela: workspace_ecommerce.bronze.products
Processando arquivo: olist_sellers_dataset.csv -> Tabela: workspace_ecommerce.bronze.sellers
Processando arquivo: product_category_name_translation.csv -> Tabela: workspace_ecommerce.bronze.product_category_name_translation
Carga completa de todos os arquivos!


# 2.0 Limpando nome dos arquivos

## 2.1 Iteração: Varrendo volume (Staging Area) buscando arquivos .csv

##2.2 Limpeza de nomes: Transforma o nome do arquivo `olist_customers_dataset.csv` em um nome de tabela limpo -> customers

In [0]:
for file in files:
    if file.name.endswith(".csv"):
        # 1. Definir o nome da tabela baseado no nome do arquivo.
        # Ex: olist_orders_dataset.csv -> orders
        table_name = (
            file.name.replace("olist_", "")
            .replace("_dataset.csv", "")
            .replace(".csv", "")
        )
        full_table_name = f"{catalog}.{schema}.{table_name}"

        print(f"Processando arquivo: {file.name} -> Tabela: {full_table_name}")

Processando arquivo: olist_customers_dataset.csv -> Tabela: workspace_ecommerce.bronze.customers
Processando arquivo: olist_geolocation_dataset.csv -> Tabela: workspace_ecommerce.bronze.geolocation
Processando arquivo: olist_order_items_dataset.csv -> Tabela: workspace_ecommerce.bronze.order_items
Processando arquivo: olist_order_payments_dataset.csv -> Tabela: workspace_ecommerce.bronze.order_payments
Processando arquivo: olist_order_reviews_dataset.csv -> Tabela: workspace_ecommerce.bronze.order_reviews
Processando arquivo: olist_orders_dataset.csv -> Tabela: workspace_ecommerce.bronze.orders
Processando arquivo: olist_products_dataset.csv -> Tabela: workspace_ecommerce.bronze.products
Processando arquivo: olist_sellers_dataset.csv -> Tabela: workspace_ecommerce.bronze.sellers
Processando arquivo: product_category_name_translation.csv -> Tabela: workspace_ecommerce.bronze.product_category_name_translation


# 3.0 Lendo arquivo .csv

In [0]:
display(dbutils.fs.ls(volume_path))

path,name,size,modificationTime
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_customers_dataset.csv,olist_customers_dataset.csv,9033957,1767094670000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_geolocation_dataset.csv,olist_geolocation_dataset.csv,61273883,1767100580000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_order_items_dataset.csv,olist_order_items_dataset.csv,15438671,1767094672000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_order_payments_dataset.csv,olist_order_payments_dataset.csv,5777138,1767094669000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_order_reviews_dataset.csv,olist_order_reviews_dataset.csv,14451670,1767094671000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_orders_dataset.csv,olist_orders_dataset.csv,17654914,1767094672000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_products_dataset.csv,olist_products_dataset.csv,2379446,1767094667000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/olist_sellers_dataset.csv,olist_sellers_dataset.csv,174703,1767094666000
dbfs:/Volumes/workspace_ecommerce/bronze/staging_zone_olist/product_category_name_translation.csv,product_category_name_translation.csv,2613,1767100575000


In [0]:
df = (spark.read
      .format("csv") \
      .option("header", "true") \
      .option("inferSchema", "true") \
      .option("delimiter", ",") \
      .load(file.path)
      )

## 3.1 Adição de colunas de auditoria

##3.2 .withColumn("arquivo_origem", lit(arquivo.name)) -> Salva apenas o nome do arquivo e não o caminho completo

In [0]:
df_enriquecido = df \
    .withColumn("dt_ingestao", current_timestamp()) \
    .withColumn("arquivo_origem", lit(file.name))

#4.0 Escrita na camada bronze (Tabela Delta)

##4.1 Definindo caminho da tabela

In [0]:
destination_table_path = f"{full_table_name}"

In [0]:
df_enriquecido.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(destination_table_path)

print("Carga completa de todos os arquivos!")

Carga completa de todos os arquivos!
