
# Bibliotecas


In [0]:
import uuid
import gc
import urllib.request
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, TimestampType, LongType, DateType, TimestampType, BooleanType, DoubleType
from datetime import datetime, date


## Configuração da estrutura do Unity catalog

* data_ex (catalog) 
    * metadados (schema)
        * meta_bronze (table)
        * meta_silver (table)
        * meta_gold (table)
    * lhdw (schema) 
        * versao001 (volumes)
        * versao002 (volumes)
        * versao003 (volumes)       
    * bronze (schema)
        * bronze_log_carga (table)
        * versao003 (volume)
            * bronze_dim_categoria_produto
            * bronze_dim_cliente 
            * bronze_dim_data 
            * bronze_dim_localidade
            * bronze_dim_produto 
            * bronze_fato_vendas  
    * silver (schema)
        * versao* (volume)
            * silver_dim_categoria_produto
            * silver_dim_cliente 
            * silver_dim_data 
            * silver_dim_localidade
            * silver_dim_produto 
            * silver_fato_vendas
    * gold  (schema)
        * versao* (volume)
            * dim_categoria_produto
            * dim_cliente 
            * dim_data 
            * dim_localidade
            * dim_produto 
            * fato_vendas

In [0]:
# Criação do catalog
spark.sql("create catalog if not exists data_ex")

# Criação do schema
spark.sql("create schema if not exists data_ex.metadados")
spark.sql("create schema if not exists data_ex.lhdw")
spark.sql("create schema if not exists data_ex.bronze")
spark.sql("create schema if not exists data_ex.silver")
spark.sql("create schema if not exists data_ex.gold")

# Criação das tabelas de metadados fixos, para controle de job 
spark.sql("""create table if not exists data_ex.metadados.meta_bronze (
    id_job string
) using delta """)

spark.sql("""create table if not exists data_ex.metadados.meta_silver (
    id_job string
) using delta """)

spark.sql("""create table if not exists data_ex.metadados.meta_gold  (
    id_job string
) using delta """)

# Abaixo são geradas uuids do tipo 4, a qual não requer argumentos e é gerada aleatoriamente
id_job_bronze = str(uuid.uuid4())
id_job_silver = str(uuid.uuid4())
id_job_gold = str(uuid.uuid4())

# Armazena a uuid gerada na respectiva camada
spark.sql(f"insert into data_ex.metadados.meta_bronze values ('{id_job_bronze}')")
spark.sql(f"insert into data_ex.metadados.meta_silver values ('{id_job_silver}')")
spark.sql(f"insert into data_ex.metadados.meta_gold values ('{id_job_gold}')")

# Configuração da sessão de trabalho bronze

In [0]:
# Otimização retirada do material disponibilizado do curso de Databricks
spark = SparkSession.builder \
    .appName("Load Data Bronze") \
    .config("spark.sql.shuffle.partitions", "200")  \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Define um número fixo de partições para shuffle, melhorando o paralelismo                 
# Define o tamanho máximo de partições para evitar muitos arquivos pequenos        
# Usa o codec Snappy para compressão rápida, otimizando tempo de leitura e escrita    
# Habilita otimizações adaptativas, ajustando o número de partições dinamicamente com base no tamanho dos dados

#Funções

## (volume || 0) download_dataset(csv_files, url, path)

In [0]:
def download_dataset(csv_files, url, path):
    # Tenta realizar o download dos arquivos
    try :
        # Variavel para armazenar o nome do novo volume
        volume = ""
        try:
            # Conta quantos volumes já existem dentro desse do schema
            numero = spark.sql("show volumes in data_ex.lhdw").count()
            # Cria o comando sql para criar um novo volume para a nova versao do download
            volume = f"download{numero+1:03d}"
            sql_command = f"create volume if not exists data_ex.lhdw.{volume}"
            # Executa o comando sql acima
            spark.sql(sql_command)
        # lanca um execao caso houver um erro
        except Exception as e:
            print(f"Erro ao criar o volume: {e}")
            return 0
        for file in csv_files:
            down_path = f"{url+file}"
            up_path = f"{path+volume}/{file}"
            urllib.request.urlretrieve(down_path, up_path)
            print(f"Csv {file} baixado com sucesso!")
    # Caso não consigua realizar o download, uma exceção é lançada
    except Exception as e:
        print(f"Erro ao baixar o arquivo: {file} - {e}")
        return 0
    return volume

## makeLogTable():

In [0]:
def make_log_tabel(camada):
    relacoes = {
            "bronze" : "bronze_log_carga",
            "silver" : "silver_log_carga",
            "gold"   : "gold_log_carga"
        }
    n = 0
    # Conta quantas tabelas existem
    try:
        n = spark.sql("show tables in data_ex.bronze").count()
    except:
        print("ERRO ao ler a tabela de logs")
    # Cria a tabela se não existir 
    if n == 0:
        try :
            spark.sql(f"""create table if not exists data_ex.bronze.{relacoes[camada]} (
                id_carga string,
                id_job string,
                nome_arquivo string,
                fonte string,
                camada string,
                path_origem string,
                path_destino string,
                data_inicio timestamp,
                data_fim timestamp,
                duracao_ms string,
                registros_lidos integer,
                registros_gravados integer,
                status string,
                mensagem_erro string,
                data_execusao date
                ) using delta """)
            print("gerougerou")
            return 1
        except Exception as e:
            print(f"Erro ao criar a tabela de logs: {e}")
            return 0
    # Erro, existe mais de uma tabela
    elif n > 1:
        print(f"Erro na árvore de diretórios, exite mais de {n} tabelas de losgs na camada bronze")
        return 0
    # A tabela ja existe
    else:
        print("Tabela de logs ja existes na camada bronze")
        return 1

## generate_log(volume, camada, file, new_file_name)

In [0]:
def generate_log(volume, camada, file, new_name, schema_log):
  camada_origem = "lhdw"
  global log_data
  
  log_data.update({
    "id_carga" : str(uuid.uuid4()),
    "id_job"   : spark.sql("select * from data_ex.metadados.meta_bronze").collect()[0][0],
    "nome_arquivo" : new_name,
    "fonte" : "filesystem_local",
    "camada" : "bronze",
    "path_origem" : f"/Volumes/data_ex/bronze/{volume}/{file}",
    "path_destino" : f"/Volumes/data_ex/silver/{volume}/{new_name}",
    "data_inicio" : spark.range(1).select(current_timestamp()).collect()[0][0],
    "status" : "Running",
    "data_execusao" : spark.range(1).select(current_date()).collect()[0][0]
  })

  try:
    df = spark.createDataFrame([(
      log_data["id_carga"],
      log["id_job"],
      log_data["nome_arquivo"],
      "filesystem_local",
      camada,
      str(f"/Volumes/data_ex/bronze/{volume}/{file}"), 
      str(f"/Volumes/data_ex/silver/{volume}/{new_name}"),
      log_data["data_inicio"],
      None,
      None,
      None,
      None,
      "Running",
      None,
      log_data["data_execusao"]
    )], schema = schema_log)
    df.write.format("delta").mode("append").saveAsTable("data_ex.bronze.bronze_log_carga")
    return 1
  except Exception as e:
    print(f"Erro ao gerar o log: {e}")
    return 0

## log_update_leitura()

In [0]:
from pyspark.sql import Row

def log_update_leitura(registros_lidos):
    try:
        global log_data

        row = Row(
            id_carga=log_data["id_carga"],
            registros_lidos=registros_lidos
        )

        df_update = spark.createDataFrame([row])
        df_update.createOrReplaceTempView("log_leitura")

        spark.sql("""
            MERGE INTO data_ex.bronze.bronze_log_carga as tgt
            USING log_leitura as src
            ON tgt.id_carga = src.id_carga
            WHEN MATCHED THEN UPDATE SET
                tgt.registros_lidos = src.registros_lidos
        """)
        return 1
    except Exception as e:
        print(f"Erro ao atualizar o log de leitura: {e}")
        return 0

## update_log()

In [0]:
def update_log():
    try:
        global log_data
        log_data["status"] = "True"   # Atualiza o status do log
        data_fim = spark.range(1).select(current_timestamp()).collect()[0][0]
        log_data["data_fim"] = data_fim  
        duracao_ms = (data_fim - log_data[7]).total_seconds() * 1000
        log_data["duracao_ms"] = duracao_ms
        acao = store_log()
        if not acao:
            return 0
        else:
            return 1
    except Exception as e:
        print(f"Erro na funcao gerarLog(): {e}")

## store_log()

In [0]:
from pyspark.sql import Row

def store_log():
    try:
        global log_data

        row = Row(
            id_carga=log_data["id_carga"],
            id_job=log_data["id_job"],
            nome_arquivo=log_data["nome_arquivo"],
            fonte=log_data["fonte"],
            camada=log_data["camada"],
            path_origem=log_data["path_origem"],
            path_destino=log_data["path_destino"],
            data_inicio=log_data["data_inicio"],
            data_fim=log_data["data_fim"],
            duracao_ms=log_data["duracao_ms"],
            registros_lidos=log_data["registros_lidos"],
            registros_gravados=log_data["registros_gravados"],
            status=log_data["status"],
            mensagem_erro=log_data["mensagem_erro"],
            data_carga=log_data["data_carga"]
        )

        df_log = spark.createDataFrame([row])
        df_log.createOrReplaceTempView("log_final")

        spark.sql("""
            MERGE INTO data_ex.bronze.bronze_log_carga AS tgt
            USING log_final AS src
            ON tgt.id_carga = src.id_carga
            WHEN MATCHED THEN UPDATE SET
                tgt.id_job = src.id_job,
                tgt.nome_arquivo = src.nome_arquivo,
                tgt.fonte = src.fonte,
                tgt.camada = src.camada,
                tgt.path_origem = src.path_origem,
                tgt.path_destino = src.path_destino,
                tgt.data_inicio = src.data_inicio,
                tgt.data_fim = src.data_fim,
                tgt.duracao_ms = src.duracao_ms,
                tgt.registros_lidos = src.registros_lidos,
                tgt.registros_gravados = src.registros_gravados,
                tgt.status = src.status,
                tgt.mensagem_erro = src.mensagem_erro,
                tgt.data_carga = src.data_carga

            WHEN NOT MATCHED THEN INSERT (
                id_carga,
                id_job,
                nome_arquivo,
                fonte,
                camada,
                path_origem,
                path_destino,
                data_inicio,
                data_fim,
                duracao_ms,
                registros_lidos,
                registros_gravados,
                status,
                mensagem_erro,
                data_carga
            )
            VALUES (
                src.id_carga,
                src.id_job,
                src.nome_arquivo,
                src.fonte,
                src.camada,
                src.path_origem,
                src.path_destino,
                src.data_inicio,
                src.data_fim,
                src.duracao_ms,
                src.registros_lidos,
                src.registros_gravados,
                src.status,
                src.mensagem_erro,
                src.data_carga
            )
        """)

        return 1

    except Exception as e:
        print(f"Erro ao gravar o log (MERGE): {e}")
        return 0


## read_dim

In [0]:
def read_dim(path, file, schema):
    try:
        df = spark.read\
            .option("header", "true")\
            .schema(schema)\
            .csv(f"{path}/{file}")
        print("ler dim")
        return df
    except:
        print("ler dim")
        return 0

## save_dim()

In [0]:
def save_dim(df_read, dime, camada):
    try:
        df_fato = df_read.write.format("delta").mode("overwrite").save(f"{camada}/{dime}")
        return 1
    except Exception as e:
        return 0

## read_fato()


In [0]:
def read_fato(path, files, schema, n):
    print("entrou na read_fato()")
    try:
        vendas_df = [0]
        # Aplica um loop para ler todos os arquivos vendas
        for i in range(0, n):
            try:
                df = spark.read.option("header", "true")\
                    .schema(schema_vendas)\
                    .csv(f"{path}/{files[i]}")
                vendas_df[0] = vendas_df[0] + df.count()
                vendas_df.append(df)
            except Exception as e:
                print(f"Erro ao ler o arquivo {path}/{files[i]}: {e}")
        return vendas_df
    except Exception as e:
        print(f"Erro ao ler o arquivo {files[i]}: {e}")
    return 0

## Save_fato

In [0]:
def save_fato(df, fato, n, camada):
    #Loop para criar a tabela de fato vendas
    for i in range(0, n):
        try:
            df[i].write.format("delta").mode("append").save(f"{camada}/{fato}")
        except Exception as e:
            print(f"Erro ao salvar o dataframe vendas: {e}")
            return 0
    return 1

## Funcao de Comando

In [0]:
def bronze_work(csv_files, url, lhdw_path, schemas, dim_name, fato_name, camada, n, schema_log):
  
  # Faz o download do Dataset, cria um novo volume para armazenar os arquivos contidos
  volume_salvo = download_dataset(csv_files, url, lhdw_path)
  if not volume_salvo:
    return

  # Garante a existencia da tabelas de logs de carga da camada bronze
  if not make_log_tabel(camada):
    return
      
  # Definição dos diretorios de leitura
  lhdw_path = f"{lhdw_path}/{volume_salvo}"

  # Criação do volume no schema bronze
  spark.sql(f"create volume if not exists data_ex.bronze.{volume_salvo}")

  # Definição do path bronze
  bronze_path = f"/Volumes/data_ex/bronze/{volume_salvo}"

  bronze_work_dim(csv_files, schemas, dim_name, camada, n, schema_log, lhdw_path, volume_salvo)

  bronze_work_fato(csv_files, schemas, camada, fato_name, volume_salvo, n, lhdw_path, volume_salvo)


In [0]:
log_data = {}
def bronze_work_dim(csv_files, schemas, dime, camada, n, schema_log, lhdw_path, volume):
    print("dim")
  # Lista com o nome dos arquivos csv dimensão a serem processados 
    files = csv_files[:-n]
    
  # indice para registrar o nome da dimenssao a ser salvada
    for i, (file, schema, new_name) in enumerate(zip(files, schemas, dime)):
        print(f"{i},{file}")
        # Tenta ler os arquivos csv em lhdw_path
            
        # Inicia o registo de logs
        if not  generate_log(volume, camada, file, new_name, schema_log):
            break
        print("loglog")
        # Se schema não for vendas, aplica o schema correspondente a file
        if schema != schema_vendas:
        
            # Tenta chamar a funcao para ler um arquivo de nao vendas
            df_read = read_dim(lhdw_path, file, schema)
            if not df_read:
                printf(f"Erro ao ler o arquivo {file}")
                break
            
            # Lê o arquivo csv
            global log_data
            registros_lidos = df_read.count()
            log_data["registros_lidos"] = registros_lidos
            if not log_update_leitura(registros_lidos):
                print(f"Erro ao gravar a quantidade de dados lidos")
                break
            
            # Salva
            save = save_dim(df_read, dime[i], camada)
            if not save:
                printf(f"Erro ao salvar o dataframe ")
                break
            
            # Conta quantos registros foram salvos
            registros_gravados = spark.read.format("delta").load(f"{camada}/{dime[i]}").count()
            log_data["registros_gravados"] = registros_gravados

            
            acao = update_log()
            if not acao:
                printf("ERRO ao gravar o log")
                break
            
            print(f"DataFrame salvo com sucesso em {camada}/{dime[i]}")   
            
       
            
                
           
        

In [0]:
def bronze_work_fato(csv_files, schemas, camada, fato_name, volume_salvo, n, lhdw_path, volume):
    global log_data
    print("fato")
    if not  generate_log(volume, camada, "vendas", "bronze_dim_vendas", schema_log):
        printf("ERRO ao gerar log")
        return
    
    # Chama a função de leitura
    df_read = read_fato(lhdw_path, csv_files[-4:] ,schema_vendas, n)

    if not df_read:
        print(f"Erro ao ler os arquivos de vendas")
        return
    
    registros_lidos = df_read[0]
    log_data["registros_lidos"] = registros_lidos
    df_read.pop(0)

    if not log_update_leitura(registros_lidos):
        print(f"Erro ao gravar a quantidade de dados lidos em fatos")
        return

    # Salvar
    if not save_fato(df_read, fato_name, n, camada):
        printf(f"Erro ao chamar a função save_fato() : {e}")
        return
    
    # Salvamento concluído, armazenando o log respectivo
    registros_gravados = spark.read.format("delta").load(f"{camada}/{fato_name}").count()
    log_data["registros_gravados"] = registros_gravados

    if not update_log():
        return
    
    print(f"DataFrame salvo com sucesso em {camada}/{fato_name}")
    
  

# Main

In [0]:
# Lista dos nomes dos schemas em ordem de execução
schemas = []

# Definição manual do schemas a serem aplicados nos arquivos csv (futuros parquets)
schema_categoria = StructType([
    StructField("categoria_id", LongType(), True),
    StructField("categoria_nome", StringType(), True),
])
schemas.append(schema_categoria)

schema_cliente = StructType([
    StructField("cliente_id", LongType(), True),
    StructField("nome_cliente", StringType(), True),
    StructField("estado", StringType(), True),
    StructField("cidade", StringType(), True)
])
schemas.append(schema_cliente)

schema_data = StructType([
    StructField("data_id", LongType(), True),
    StructField("data", DateType(), True),
    StructField("ano", IntegerType(), True),
    StructField("mes", IntegerType(), True),
    StructField("dia", IntegerType(), True),
    StructField("dia_semana", StringType(), True), 
    StructField("final_de_semana", BooleanType(), True)
])
schemas.append(schema_data)

schema_localidade = StructType([
    StructField("localidade_id", LongType(), True),
    StructField("estado", StringType(), True),
    StructField("cidade", StringType(), True),
    ])  
schemas.append(schema_localidade)

schema_produto = StructType([
    StructField("produto_id", LongType(), True),
    StructField("preco_lista", DoubleType(), True),
    StructField("categoria_nome", StringType(), True)
])
schemas.append(schema_produto)

schema_vendas = StructType([
    StructField("venda_id", LongType(), True),
    StructField("cliente_id", LongType(), True),
    StructField("produto_id", LongType(), True),
    StructField("data_id", LongType(), True),
    StructField("categoria_id", LongType(), True),
    StructField("localidade_id", LongType(), True),
    StructField("quantidade", LongType(), True),
    StructField("preco_lista", DoubleType(), True),
    StructField("valor_total", DoubleType(), True)
])
schemas.append(schema_vendas)

# schema dos logs

schema_log = StructType([
    StructField("id_carga", StringType(), False),         
    StructField("id_job", StringType(), False),           
    StructField("nome_arquivo", StringType(), True),
    StructField("fonte", StringType(), True),
    StructField("camada", StringType(), False),
    StructField("path_destino", StringType(), True),
    StructField("data_inicio", TimestampType(), False),    
    StructField("data_fim", TimestampType(), True),       
    StructField("duracao_ms", LongType(), True),
    StructField("registros_lidos", LongType(), True),
    StructField("registros_gravados", LongType(), True),
    StructField("status", StringType(), False),
    StructField("mensagem_erro", StringType(), True),
    StructField("data_execucao", DateType(), False)        
])


# Lista de .csv a serem lidos
csv_files = [
    "categoria_produto.csv",
    "cliente.csv",
    "data.csv",
    "localidade.csv",
    "produto.csv",
    "vendas_part1.csv",
    "vendas_part2.csv",
    "vendas_part3.csv",
    "vendas_part4.csv"
]

# Reposotório do dataset para o desafio
url = "https://raw.githubusercontent.com/andrerosa1977/dataexperts2026/main/"

# Path para o schema de armazenamento, sem o volume
lhdw_path = "/Volumes/data_ex/lhdw/"

# Lista de nomes das dimenssoes bronze
dim_name = [
  "bronze_dim_categoria_produto",
  "bronze_dim_cliente",
  "bronze_dim_data", 
  "bronze_dim_localidade",
  "bronze_dim_produto", 
]

# Nome da tabela fato
fato_name = "bronze_fato_vendas"

# Numero de csv que fazem parte da bronze fato
n_fatos = 4

camada = "bronze"





In [0]:
bronze_work(csv_files, url, lhdw_path, schemas, dim_name, fato_name, camada, n_fatos, schema_log)

In [0]:
%sql
SHOW VOLUMES IN data_ex.desafio

In [0]:
%sql
DROP schema data_ex.desafio;


In [0]:
bronze_path = f"/Volumes/data_ex/bronze/download001/bronze_dim_categoria_produto"
df = spark.read.format("parquet").load(bronze_path)
display(df)

In [0]:
import os

os.listdir("/Volumes/data_ex/lhdw/")