<a href="https://colab.research.google.com/github/DebbieMatt/PYTHON_DW/blob/main/codigo_auxiliar.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

"""
Data Warehouse IBGE - Pipeline ETL Melhorado
Autor: D√©bora Mateus
Descri√ß√£o: Sistema completo de ETL para dados do IBGE com valida√ß√£o,
           logging e tratamento de erros robusto
"""

In [14]:
import pandas as pd
import sqlite3 as sql
from datetime import datetime
from pathlib import Path
import logging
from typing import Optional, Dict, Tuple
import sys

In [15]:
# ==================== CONFIGURA√á√ÉO DE LOGGING ====================
def configurar_logging(nivel=logging.INFO):
    """
    Configura sistema de logging para rastreamento do pipeline
    """
    logging.basicConfig(
        level=nivel,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(sys.stdout),
            logging.FileHandler('etl_pipeline.log', encoding='utf-8')
        ]
    )
    return logging.getLogger(__name__)

logger = configurar_logging()

In [16]:
# ==================== CONFIGURA√á√ïES GLOBAIS ====================
class ConfigETL:
    """Classe de configura√ß√£o centralizada"""

    # Caminhos
    BASE_DIR = Path('/content/drive/MyDrive/Colab Notebooks')
    BDODS_PATH = BASE_DIR / "ODS.db"
    BDDW_PATH = BASE_DIR / "DW.db"

    # URLs
    URL_IBGE_MUNICIPIOS = 'https://www.ibge.gov.br/explica/codigos-dos-municipios.php#MT'

    # Arquivos CSV
    CSV_PIB_SETOR = "PIB-SETOR-MT.csv"
    CSV_PIB_MT = "PIB-MT.csv"

    # Nomes de tabelas
    TABELA_LOG_MUNIC = "tabelaLogMunic"
    TABELA_DIM_MUNIC = "dMunicipio"
    TABELA_LOG_PIB_SETOR = "tbLogPIBsetor"
    TABELA_LOG_PIB_MT = "tbLogPIBmt"

    @staticmethod
    def obter_timestamp():
        """Retorna timestamp formatado para carga"""
        return datetime.today().strftime('%d/%m/%Y %H:%M:%S')

In [17]:
# ==================== EXTRA√á√ÉO DE DADOS ====================
class ExtractorIBGE:
    """Classe respons√°vel pela extra√ß√£o de dados do IBGE"""

    @staticmethod
    def extrair_municipios(url: str) -> Optional[pd.DataFrame]:
        """
        Extrai dados de munic√≠pios do IBGE via web scraping

        Args:
            url: URL da p√°gina do IBGE

        Returns:
            DataFrame com dados dos munic√≠pios ou None em caso de erro
        """
        try:
            logger.info(f"Iniciando extra√ß√£o de munic√≠pios: {url}")

            df_municipios = pd.read_html(url, match="Munic√≠pios de Mato Grosso")[0]

            # Renomear colunas
            df_municipios = df_municipios.rename(columns={
                'Munic√≠pios de Mato Grosso': 'Munic',
                'C√≥digos': 'Cod'
            })

            # Configurar √≠ndice
            df_municipios.index.name = 'ID'
            df_municipios.index = df_municipios.index + 1

            # Adicionar timestamp de carga
            df_municipios['dataCarga'] = ConfigETL.obter_timestamp()

            logger.info(f"‚úì Extra√≠dos {len(df_municipios)} munic√≠pios com sucesso")
            return df_municipios

        except Exception as e:
            logger.error(f"‚úó Erro ao extrair munic√≠pios: {str(e)}")
            return None

    @staticmethod
    def extrair_csv(caminho_arquivo: str) -> Optional[pd.DataFrame]:
        """
        Extrai dados de arquivo CSV

        Args:
            caminho_arquivo: Caminho completo do arquivo CSV

        Returns:
            DataFrame com dados do CSV ou None em caso de erro
        """
        try:
            logger.info(f"Iniciando leitura do CSV: {caminho_arquivo}")

            if not Path(caminho_arquivo).exists():
                raise FileNotFoundError(f"Arquivo n√£o encontrado: {caminho_arquivo}")

            df = pd.read_csv(caminho_arquivo, encoding='utf-8')
            df['dtCarga'] = ConfigETL.obter_timestamp()

            logger.info(f"‚úì CSV carregado: {len(df)} registros")
            return df

        except Exception as e:
            logger.error(f"‚úó Erro ao ler CSV: {str(e)}")
            return None

In [18]:
# ==================== TRANSFORMA√á√ÉO DE DADOS ====================
class TransformadorDados:
    """Classe respons√°vel pela transforma√ß√£o e valida√ß√£o de dados"""

    @staticmethod
    def validar_dataframe(df: pd.DataFrame, colunas_obrigatorias: list) -> bool:
        """
        Valida se DataFrame possui todas as colunas obrigat√≥rias

        Args:
            df: DataFrame a ser validado
            colunas_obrigatorias: Lista de colunas que devem existir

        Returns:
            True se v√°lido, False caso contr√°rio
        """
        colunas_faltantes = set(colunas_obrigatorias) - set(df.columns)

        if colunas_faltantes:
            logger.error(f"Colunas obrigat√≥rias faltantes: {colunas_faltantes}")
            return False

        return True

    @staticmethod
    def limpar_dados_municipios(df: pd.DataFrame) -> pd.DataFrame:
        """
        Limpa e valida dados de munic√≠pios

        Args:
            df: DataFrame com dados brutos

        Returns:
            DataFrame limpo
        """
        try:
            # Remover valores nulos
            df = df.dropna(subset=['Munic', 'Cod'])

            # Normalizar nomes de munic√≠pios
            df['Munic'] = df['Munic'].str.strip().str.title()

            # Validar c√≥digos IBGE (devem ter 7 d√≠gitos)
            df['Cod'] = df['Cod'].astype(str).str.zfill(7)

            # Remover duplicatas
            df = df.drop_duplicates(subset=['Cod'])

            logger.info(f"‚úì Dados de munic√≠pios limpos: {len(df)} registros v√°lidos")
            return df

        except Exception as e:
            logger.error(f"‚úó Erro ao limpar dados: {str(e)}")
            return df

    @staticmethod
    def preparar_dimensao(df: pd.DataFrame, colunas: list) -> pd.DataFrame:
        """
        Prepara dados para carregamento na dimens√£o

        Args:
            df: DataFrame original
            colunas: Lista de colunas a manter

        Returns:
            DataFrame preparado para dimens√£o
        """
        return df[colunas].copy()

In [19]:
# ==================== CARGA DE DADOS ====================
class LoaderDW:
    """Classe respons√°vel pela carga de dados nos bancos"""

    @staticmethod
    def criar_bancos() -> Tuple[bool, bool]:
        """
        Cria bancos de dados ODS e DW se n√£o existirem

        Returns:
            Tupla (ods_criado, dw_criado)
        """
        try:
            if not ConfigETL.BASE_DIR.exists():
                logger.error(f"‚úó Diret√≥rio base n√£o existe: {ConfigETL.BASE_DIR}")
                return False, False

            ods_existe = ConfigETL.BDODS_PATH.exists()
            dw_existe = ConfigETL.BDDW_PATH.exists()

            if not ods_existe:
                ConfigETL.BDODS_PATH.touch()
                logger.info("‚úì Banco ODS.db criado")

            if not dw_existe:
                ConfigETL.BDDW_PATH.touch()
                logger.info("‚úì Banco DW.db criado")

            if ods_existe and dw_existe:
                logger.info("‚úì Bancos de dados j√° existem")

            return True, True

        except Exception as e:
            logger.error(f"‚úó Erro ao criar bancos: {str(e)}")
            return False, False

    @staticmethod
    def carregar_ods(df: pd.DataFrame, tabela: str) -> bool:
        """
        Carrega dados no ODS (Operational Data Store)

        Args:
            df: DataFrame com dados a carregar
            tabela: Nome da tabela de destino

        Returns:
            True se bem-sucedido, False caso contr√°rio
        """
        conexao = None
        try:
            logger.info(f"Carregando {len(df)} registros na tabela ODS: {tabela}")

            conexao = sql.connect(ConfigETL.BDODS_PATH)
            df.to_sql(tabela, conexao, if_exists="append", index=True)
            conexao.commit()

            logger.info(f"‚úì Carga ODS conclu√≠da: {tabela}")
            return True

        except Exception as e:
            logger.error(f"‚úó Erro ao carregar ODS: {str(e)}")
            if conexao:
                conexao.rollback()
            return False

        finally:
            if conexao:
                conexao.close()

    @staticmethod
    def carregar_dw(df: pd.DataFrame, tabela: str, if_exists: str = "replace") -> bool:
        """
        Carrega dados no Data Warehouse

        Args:
            df: DataFrame com dados a carregar
            tabela: Nome da tabela de destino
            if_exists: Comportamento se tabela existir ('replace', 'append', 'fail')

        Returns:
            True se bem-sucedido, False caso contr√°rio
        """
        conexao = None
        try:
            logger.info(f"Carregando {len(df)} registros na tabela DW: {tabela}")

            conexao = sql.connect(ConfigETL.BDDW_PATH)
            df.to_sql(tabela, conexao, if_exists=if_exists, index=True)
            conexao.commit()

            logger.info(f"‚úì Carga DW conclu√≠da: {tabela}")
            return True

        except Exception as e:
            logger.error(f"‚úó Erro ao carregar DW: {str(e)}")
            if conexao:
                conexao.rollback()
            return False

        finally:
            if conexao:
                conexao.close()

In [20]:
# ==================== PIPELINE ETL PRINCIPAL ====================
class PipelineETL:
    """Orquestrador principal do pipeline ETL"""

    def __init__(self):
        self.extractor = ExtractorIBGE()
        self.transformer = TransformadorDados()
        self.loader = LoaderDW()
        self.metricas = {
            'inicio': datetime.now(),
            'registros_processados': 0,
            'erros': 0
        }

    def executar_pipeline_municipios(self) -> bool:
        """
        Executa pipeline completo para dados de munic√≠pios

        Returns:
            True se bem-sucedido, False caso contr√°rio
        """
        logger.info("=" * 60)
        logger.info("INICIANDO PIPELINE: Munic√≠pios IBGE")
        logger.info("=" * 60)

        try:
            # 1. EXTRACT
            df_municipios = self.extractor.extrair_municipios(
                ConfigETL.URL_IBGE_MUNICIPIOS
            )
            if df_municipios is None:
                return False

            # 2. TRANSFORM
            df_limpo = self.transformer.limpar_dados_municipios(df_municipios)
            df_dimensao = self.transformer.preparar_dimensao(
                df_limpo, ['Munic', 'Cod']
            )

            # 3. LOAD
            # Carregar no ODS (dados completos com log)
            sucesso_ods = self.loader.carregar_ods(
                df_limpo, ConfigETL.TABELA_LOG_MUNIC
            )

            # Carregar no DW (apenas dimens√£o)
            sucesso_dw = self.loader.carregar_dw(
                df_dimensao, ConfigETL.TABELA_DIM_MUNIC
            )

            self.metricas['registros_processados'] += len(df_municipios)

            return sucesso_ods and sucesso_dw

        except Exception as e:
            logger.error(f"‚úó Erro no pipeline de munic√≠pios: {str(e)}")
            self.metricas['erros'] += 1
            return False

    def executar_pipeline_pib(self) -> bool:
        """
        Executa pipeline completo para dados de PIB

        Returns:
            True se bem-sucedido, False caso contr√°rio
        """
        logger.info("=" * 60)
        logger.info("INICIANDO PIPELINE: PIB Mato Grosso")
        logger.info("=" * 60)

        try:
            # Pipeline PIB por Setor
            caminho_pib_setor = str(ConfigETL.BASE_DIR / ConfigETL.CSV_PIB_SETOR)
            df_pib_setor = self.extractor.extrair_csv(caminho_pib_setor)

            if df_pib_setor is not None:
                sucesso_setor = self.loader.carregar_ods(
                    df_pib_setor, ConfigETL.TABELA_LOG_PIB_SETOR
                )
                self.metricas['registros_processados'] += len(df_pib_setor)
            else:
                sucesso_setor = False

            # Pipeline PIB MT
            caminho_pib_mt = str(ConfigETL.BASE_DIR / ConfigETL.CSV_PIB_MT)
            df_pib_mt = self.extractor.extrair_csv(caminho_pib_mt)

            if df_pib_mt is not None:
                sucesso_mt = self.loader.carregar_ods(
                    df_pib_mt, ConfigETL.TABELA_LOG_PIB_MT
                )
                self.metricas['registros_processados'] += len(df_pib_mt)
            else:
                sucesso_mt = False

            return sucesso_setor and sucesso_mt

        except Exception as e:
            logger.error(f"‚úó Erro no pipeline de PIB: {str(e)}")
            self.metricas['erros'] += 1
            return False

    def executar_completo(self) -> Dict:
        """
        Executa pipeline ETL completo

        Returns:
            Dicion√°rio com m√©tricas de execu√ß√£o
        """
        logger.info("üöÄ INICIANDO PIPELINE ETL COMPLETO")
        logger.info(f"Timestamp: {ConfigETL.obter_timestamp()}")

        # Criar bancos se necess√°rio
        self.loader.criar_bancos()

        # Executar pipelines
        sucesso_municipios = self.executar_pipeline_municipios()
        sucesso_pib = self.executar_pipeline_pib()

        # Calcular m√©tricas finais
        self.metricas['fim'] = datetime.now()
        self.metricas['duracao'] = (
            self.metricas['fim'] - self.metricas['inicio']
        ).total_seconds()
        self.metricas['sucesso_geral'] = sucesso_municipios and sucesso_pib

        # Log final
        logger.info("=" * 60)
        logger.info("PIPELINE ETL CONCLU√çDO")
        logger.info("=" * 60)
        logger.info(f"Status: {'‚úì SUCESSO' if self.metricas['sucesso_geral'] else '‚úó FALHA'}")
        logger.info(f"Registros processados: {self.metricas['registros_processados']}")
        logger.info(f"Erros encontrados: {self.metricas['erros']}")
        logger.info(f"Dura√ß√£o: {self.metricas['duracao']:.2f} segundos")
        logger.info("=" * 60)

        return self.metricas

In [21]:
# ==================== EXECU√á√ÉO ====================
if __name__ == "__main__":
    """
    Ponto de entrada principal do pipeline
    """
    try:
        # Inicializar e executar pipeline
        pipeline = PipelineETL()
        metricas = pipeline.executar_completo()

        # Exibir resumo
        print("\n" + "=" * 60)
        print("RESUMO DA EXECU√á√ÉO")
        print("=" * 60)
        print(f"‚úì Registros processados: {metricas['registros_processados']}")
        print(f"‚úì Tempo total: {metricas['duracao']:.2f}s")
        print(f"‚úì Status: {'SUCESSO' if metricas['sucesso_geral'] else 'FALHA'}")
        print("=" * 60)

    except KeyboardInterrupt:
        logger.warning("‚ö† Pipeline interrompido pelo usu√°rio")
        sys.exit(1)
    except Exception as e:
        logger.critical(f"‚úó Erro cr√≠tico no pipeline: {str(e)}")
        sys.exit(1)


RESUMO DA EXECU√á√ÉO
‚úì Registros processados: 426
‚úì Tempo total: 1.97s
‚úì Status: SUCESSO
