# Carregando os dados do DuckDB

O objetivo dessa etapa √© realizar a carga nos dados no banco duckdb.

## Etapas realizadas neste c√≥digo:
1. Importa√ß√£o das bibliotecas necess√°rias (duckdb, pandas, time, os, datetime).
2. Defini√ß√£o de uma fun√ß√£o de log para registrar mensagens com timestamp.
3. Mapeamento dos arquivos de uma pasta para um dicion√°rio com nomes e caminhos completos.
4. Conex√£o ao banco de dados DuckDB.
5. Cria√ß√£o/verifica√ß√£o do schema 'raw' no banco de dados.
6. Inser√ß√£o dos arquivos mapeados como tabelas no schema 'raw' do DuckDB, utilizando detec√ß√£o autom√°tica de formato.
7. Cria√ß√£o das demais zonas de dados


## 1- Import e cria√ß√£o das fun√ß√µes

In [1]:
!pip install duckdb



In [2]:
import duckdb
import pandas as pd
import time
import os
from datetime import datetime

# Fun√ß√£o utilit√°ria para log com hor√°rio
def log(msg):
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}")

In [3]:
def mapear_arquivos_pasta(pasta):
    """
    Mapeia todos os arquivos da pasta e retorna um dicion√°rio
    com o nome do arquivo como chave e o caminho completo como valor.
    """
    arquivos_dict = {}
    for root, dirs, files in os.walk(pasta):
        for file in files:
            caminho_completo = os.path.join(root, file)
            arquivos_dict[file] = caminho_completo
    return arquivos_dict

In [4]:
def inserir_arquivos_no_duckdb(arquivos_dict, con):
    """
    Para cada arquivo no dicion√°rio, checa se a tabela existe.
    Se n√£o existir, insere usando read_csv_auto ou read_parquet.
    Tenta diferentes encodings para arquivos CSV caso o padr√£o UTF-8 falhe.
    """
    
    # Garante que o schema 'raw' existe antes de inserir os dados
    try:
        con.execute("CREATE SCHEMA IF NOT EXISTS raw;")
        print("üìÅ Schema criado/verificado: raw")
    except Exception as e:
        log(f"Erro ao criar/verificar schema 'raw': {e}")
        raise
    
    # Lista de encodings para tentar (em ordem de prioridade) - usando nomes suportados pelo DuckDB
    # latin-1 √© equivalente a ISO-8859-1
    encodings = ['utf-8', 'latin-1']
    
    # Adiciona os dados no schema 'raw'
    for nome_arquivo, caminho_arquivo in arquivos_dict.items():
        nome_tabela = os.path.splitext(nome_arquivo)[0]
        tabela_com_schema = f"raw.{nome_tabela}"
        
        # Checa se a tabela j√° existe no schema 'raw'
        try:
            existe = con.execute(
                f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'raw' AND table_name = '{nome_tabela}'"
            ).fetchone()[0]
            if existe:
                log(f"Tabela '{tabela_com_schema}' j√° existe. Pulando inser√ß√£o.")
                continue
        except Exception as e:
            log(f"Erro ao verificar exist√™ncia da tabela '{tabela_com_schema}': {e}")
        
        # Detecta tipo de arquivo e monta comando
        if caminho_arquivo.endswith(".parquet"):
            comando = f"""
                CREATE TABLE {tabela_com_schema} AS SELECT * FROM read_parquet('{caminho_arquivo}')
            """
            log(f"Inserindo '{caminho_arquivo}' na tabela '{tabela_com_schema}'...")
            try:
                con.execute(comando)
                log(f"Tabela '{tabela_com_schema}' criada e dados inseridos com sucesso.")
            except Exception as e:
                log(f"Erro ao inserir '{caminho_arquivo}': {e}")
                continue
                
        elif caminho_arquivo.endswith(".csv") or caminho_arquivo.endswith(".CSV"):
            # Tenta diferentes encodings
            sucesso = False
            # Converte para caminho absoluto e normaliza
            caminho_absoluto = os.path.abspath(caminho_arquivo)
            caminho_normalizado = caminho_absoluto.replace('\\', '/')
            
            for encoding in encodings:
                try:
                    # Usa read_csv_auto com par√¢metros espec√≠ficos para CSV com ponto e v√≠rgula
                    comando = f"""
                        CREATE TABLE {tabela_com_schema} AS 
                        SELECT * FROM read_csv_auto('{caminho_normalizado}', 
                            encoding='{encoding}', 
                            delim=';',
                            header=true,
                            ignore_errors=false)
                    """
                    log(f"Inserindo '{caminho_arquivo}' na tabela '{tabela_com_schema}' (tentando encoding: {encoding})...")
                    con.execute(comando)
                    log(f"‚úÖ Tabela '{tabela_com_schema}' criada e dados inseridos com sucesso (encoding: {encoding}).")
                    sucesso = True
                    break
                except Exception as e:
                    if encoding == encodings[-1]:  # √öltimo encoding da lista
                        log(f"‚ùå Erro ao inserir '{caminho_arquivo}' com todos os encodings tentados. √öltimo erro: {e}")
                    else:
                        log(f"‚ö†Ô∏è Encoding {encoding} falhou, tentando pr√≥ximo...")
                    continue
            
            if not sucesso:
                log(f"‚ùå N√£o foi poss√≠vel inserir '{caminho_arquivo}' com nenhum dos encodings testados.")
                
        else:
            log(f"Formato de arquivo n√£o suportado para '{caminho_arquivo}'. Pulando.")
            continue

In [5]:
def listar_tabelas(conexao):
    """
    Retorna uma lista com o nome de todas as tabelas existentes no banco DuckDB conectado.
    """
    try:
        resultado = conexao.execute(
            "SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = 'raw' ORDER BY table_name"
        ).fetchall()
        tabelas = [f"{schema}.{nome}" for schema, nome in resultado]
        if not tabelas:
            # Tenta listar todas as tabelas para diagn√≥stico
            todas = conexao.execute(
                "SELECT table_schema, table_name FROM information_schema.tables ORDER BY table_schema, table_name"
            ).fetchall()
            log(f"Nenhuma tabela encontrada no schema 'raw'. Tabelas dispon√≠veis: {todas}")
        return tabelas
    except Exception as e:
        log(f"Erro ao listar tabelas: {e}")
        return []

def verificar_tabela(conexao, schema, tabela):
    """
    Verifica se uma tabela existe e retorna informa√ß√µes sobre ela.
    """
    try:
        resultado = conexao.execute(
            f"SELECT COUNT(*) FROM {schema}.{tabela}"
        ).fetchone()[0]
        log(f"Tabela {schema}.{tabela} existe com {resultado} linhas.")
        return True
    except Exception as e:
        log(f"Tabela {schema}.{tabela} n√£o existe ou erro ao acessar: {e}")
        return False

## 2: Defini√ß√£o de entradas

In [6]:
# Caminho do arquivo (pode ser CSV, Parquet, etc)
PATH_DIRETORIO = "data_input"

# Caminho do banco DuckDB (arquivo .db)
CAMINHO_DUCKDB = "bd/dev.duckdb"

# Cria a conex√£o com o banco DuckDB
con = duckdb.connect(CAMINHO_DUCKDB)

In [7]:
inserir_arquivos_no_duckdb(mapear_arquivos_pasta(PATH_DIRETORIO), con)

üìÅ Schema criado/verificado: raw
[2025-11-20 00:03:16] Inserindo 'data_input\MICRODADOS_CADASTRO_CURSOS_2017.CSV' na tabela 'raw.MICRODADOS_CADASTRO_CURSOS_2017' (tentando encoding: utf-8)...
[2025-11-20 00:03:16] ‚ö†Ô∏è Encoding utf-8 falhou, tentando pr√≥ximo...
[2025-11-20 00:03:16] Inserindo 'data_input\MICRODADOS_CADASTRO_CURSOS_2017.CSV' na tabela 'raw.MICRODADOS_CADASTRO_CURSOS_2017' (tentando encoding: latin-1)...
[2025-11-20 00:03:18] ‚úÖ Tabela 'raw.MICRODADOS_CADASTRO_CURSOS_2017' criada e dados inseridos com sucesso (encoding: latin-1).
[2025-11-20 00:03:18] Inserindo 'data_input\MICRODADOS_CADASTRO_CURSOS_2018.CSV' na tabela 'raw.MICRODADOS_CADASTRO_CURSOS_2018' (tentando encoding: utf-8)...
[2025-11-20 00:03:18] ‚ö†Ô∏è Encoding utf-8 falhou, tentando pr√≥ximo...
[2025-11-20 00:03:18] Inserindo 'data_input\MICRODADOS_CADASTRO_CURSOS_2018.CSV' na tabela 'raw.MICRODADOS_CADASTRO_CURSOS_2018' (tentando encoding: latin-1)...
[2025-11-20 00:03:21] ‚úÖ Tabela 'raw.MICRODADOS_C

In [8]:
listar_tabelas(con)

['raw.MICRODADOS_CADASTRO_CURSOS_2017',
 'raw.MICRODADOS_CADASTRO_CURSOS_2018',
 'raw.MICRODADOS_CADASTRO_CURSOS_2023',
 'raw.MICRODADOS_CADASTRO_CURSOS_2024']

In [9]:
#Cria√ß√£o dos schemas
for schema in  ["staging", "intermediate", "mart"]:
    con.execute(f"CREATE SCHEMA IF NOT EXISTS {schema};")

In [10]:
con.close()