##### 1. Iniciando Ambiente Python



In [0]:
%pip install pandas openpyxl tqdm utils ace_tools xlrd

Python interpreter will be restarted.
Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
Collecting utils
  Downloading utils-1.0.2.tar.gz (13 kB)
Collecting ace_tools
  Downloading ace_tools-0.0-py3-none-any.whl (1.1 kB)
Collecting xlrd
  Downloading xlrd-2.0.1-py2.py3-none-any.whl (96 kB)
Collecting et-xmlfile
  Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Building wheels for collected packages: utils
  Building wheel for utils (setup.py): started
  Building wheel for utils (setup.py): finished with status 'done'
  Created wheel for utils: filename=utils-1.0.2-py2.py3-none-any.whl size=13928 sha256=6c2569c9a9887944350fe0bb3047232ed8f0479aab0dac1286b44d43e36378b3
  Stored in directory: /root/.cache/pip/wheels/4c/a5/a3/ab48e06c936b39960801612ee2767ff53764119f33d3d646e7
Successfully built utils
Installing collected packages: et-xmlfile, xlrd, utils, tqdm, openpyxl, ace-tools
Successfu

In [0]:
    from platform import python_version
    import openpyxl
    import pandas as pd
    import os
    import re
    import logging
    import time
    import tqdm
    import logging
    import functools
    import xlrd
    
    from tqdm import tqdm
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    from pyspark.sql.functions import lit
    from pyspark.sql.utils import AnalysisException
    from platform import python_version

    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")



##### 2. Classes em Python

In [0]:
def log_etapa(titulo_etapa):
    """
    Decorador para imprimir cabeçalho e tempo de execução no logger da classe.
    Exemplo de uso: @log_etapa("Importar Bibliotecas")
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            linha = "═" * 50
            mensagem = f"""

    ╔{linha}╗
    ║ {titulo_etapa:<48} ║
    ╚{linha}╝
            """
            self.logger.info(mensagem)

            inicio = time.time()
            resultado = func(self, *args, **kwargs)
            fim = time.time()

            duracao = round(fim - inicio, 1)
            self.logger.info(f"""
  ⏱️ Tempo de execução da etapa '{titulo_etapa}': {duracao} segundos\n""")
            return resultado
        return wrapper
    return decorator

def listar_anos_por_tabela(schema):
    tabelas = (
        spark.sql(f"SHOW TABLES IN {schema}")
        .filter("isTemporary = false")
        .select("tableName")
        .rdd.flatMap(lambda x: x)
        .collect()
    )

    resultados = []

    for tabela in tabelas:
        nome_completo = f"{schema}.{tabela}"
        try:
            df = spark.table(nome_completo)

            # Infos principais
            num_linhas = df.count()
            num_colunas = len(df.columns)
            colunas_exemplo = df.columns

            # Anos disponíveis
            if "NU_ANO_CENSO" in df.columns:
                anos = [row["NU_ANO_CENSO"] for row in df.select("NU_ANO_CENSO").distinct().orderBy("NU_ANO_CENSO").collect()]
            else:
                anos = ["(sem NU_ANO_CENSO)"]

            resultados.append((tabela, anos, num_linhas, num_colunas, colunas_exemplo))

        except AnalysisException as e:
            resultados.append((tabela, [f"(erro: {str(e)})"], None, None, []))

    return spark.createDataFrame(resultados, ["Tabela", "Anos", "Qt. Linhas", "Qt. Colunas", "Colunas"])


In [0]:
class TransformadorDBFS:
    @staticmethod
    def identificar_arquivos(deletar=False):
        """
        Lista recursivamente os arquivos em dbfs:/FileStore/tables.
        Se `deletar=True`, remove arquivos e diretórios encontrados.
        """
        caminho_inicial = "dbfs:/FileStore/tables/"
        logging.info(f"📦 Acessando: {caminho_inicial}")

        def listar_estrutura(caminho, prefixo=""):
            try:
                entradas = sorted(dbutils.fs.ls(caminho), key=lambda x: x.name.lower())
                for i, entrada in enumerate(entradas):
                    nome = entrada.name.rstrip("/")
                    is_ultimo = i == len(entradas) - 1
                    conector = "└──" if is_ultimo else "├──"
                    novo_prefixo = prefixo + ("    " if is_ultimo else "│   ")
                    caminho_completo = entrada.path

                    if entrada.path.endswith('/'):
                        logging.info(f"{prefixo}{conector} 📁 {nome}/")
                        listar_estrutura(caminho_completo, novo_prefixo)
                        if deletar:
                            try:
                                dbutils.fs.rm(caminho_completo, True)
                                logging.info(f"{prefixo}✅ Diretório removido: {nome}")
                            except Exception as e:
                                logging.error(f"{prefixo}❌ Erro ao remover diretório {nome}: {e}")
                    else:
                        logging.info(f"{prefixo}{conector} 📄 {nome}")
                        if deletar:
                            try:
                                dbutils.fs.rm(caminho_completo, True)
                                logging.info(f"{prefixo}✅ Arquivo removido: {nome}")
                            except Exception as e:
                                logging.error(f"{prefixo}❌ Erro ao remover arquivo {nome}: {e}")
            except Exception as e:
                logging.warning(f"{prefixo}⚠️ Erro ao acessar '{caminho}': {e}")

        if deletar:
            logging.info("🗑️ Modo exclusão ativado.")
        else:
            logging.info("🔍 Modo leitura ativado.")
        listar_estrutura(caminho_inicial)

    @staticmethod
    def listar_databases(deletar=False):
        """
        Lista os diretórios de usuários em dbfs:/user/.
        Se `deletar=True`, remove os diretórios encontrados.
        """
        caminho_base = "dbfs:/user/"
        logging.info(f"📦 Acessando: {caminho_base}")

        try:
            arquivos = sorted(dbutils.fs.ls(caminho_base), key=lambda x: x.name.lower())
            if not arquivos:
                logging.warning("⚠️ Nenhum diretório encontrado.")
                return

            nomes_existentes = [arquivo.name.rstrip('/') for arquivo in arquivos]
            for i, nome in enumerate(nomes_existentes):
                conector = "└──" if i == len(nomes_existentes) - 1 else "├──"
                logging.info(f"{conector} 📁 {nome}/")

            if deletar:
                logging.info("🗑️ Modo exclusão ativado. Removendo diretórios...")
                for base in nomes_existentes:
                    caminho_completo = f"{caminho_base}{base}"
                    try:
                        dbutils.fs.rm(caminho_completo, True)
                        logging.info(f"✅ Diretório removido: {base}")
                    except Exception as erro_remocao:
                        logging.error(f"❌ Erro ao remover '{base}': {erro_remocao}")
            else:
                logging.info("🔍 Modo leitura: nenhuma base será excluída.")

        except Exception as erro_geral:
            logging.error(f"❌ Erro ao acessar {caminho_base}: {erro_geral}")

    @staticmethod
    def listar_parquets(deletar=False):
        """
        Lista arquivos Parquet armazenados em dbfs:/FileStore/parquet_cache.
        Se `deletar=True`, remove os arquivos.
        """
        caminho_parquet = "dbfs:/FileStore/parquet_cache/"
        logging.info(f"📦 Acessando: {caminho_parquet}")
        logging.info("🗂️ Modo: " + ("🗑️ Exclusão" if deletar else "🔍 Leitura"))

        try:
            arquivos = sorted(dbutils.fs.ls(caminho_parquet), key=lambda x: x.name.lower())
            if not arquivos:
                logging.info("⚠️ Nenhum arquivo Parquet encontrado.")
                return

            for i, entrada in enumerate(arquivos):
                nome = entrada.name
                caminho_completo = entrada.path
                conector = "└──" if i == len(arquivos) - 1 else "├──"

                if nome.endswith(".parquet"):
                    logging.info(f"{conector} 📄 {nome}")
                    if deletar:
                        try:
                            dbutils.fs.rm(caminho_completo, True)
                            logging.info(f"    ✅ Arquivo removido: {nome}")
                        except Exception as e:
                            logging.error(f"    ❌ Erro ao remover '{nome}': {e}")

        except Exception as e:
            logging.warning(f"⚠️ Diretório não encontrado: {caminho_parquet}")

    @staticmethod
    def deletar_arquivos(lista_arquivos):
        """Remove arquivos individualmente e imprime um resumo formatado."""
        tabela_str = ""
        for arquivo in lista_arquivos:
            try:
                info = dbutils.fs.ls(arquivo)[0]
                nome = info.name
                tamanho_mb = round(info.size / (1024 * 1024), 2)
                modificado = datetime.fromtimestamp(info.modificationTime / 1000).strftime("%Y-%m-%d %H:%M:%S") \
                    if info.modificationTime > 0 else "N/A"

                dbutils.fs.rm(arquivo, recurse=False)
                status = "✅ REMOVIDO"
                logging.info(f"🧹 Arquivo removido: {arquivo}")

            except Exception as e:
                nome = arquivo.split("/")[-1]
                tamanho_mb = "-"
                modificado = "-"
                status = "❌ ERRO"
                logging.error(f"❌ Erro ao remover {arquivo}: {e}")

            tabela_str += f"║ {nome:<40} ║ {str(tamanho_mb):>8} MB ║ {modificado:<19} ║ {status:^10} ║\n"

        cabecalho = (
            f"\n╔{'':═^42}╦{'':═^12}╦{'':═^21}╦{'':═^12}╗\n"
            f"║ {'Arquivo':^40} ║ {'Tamanho':^10} ║ {'Modificado em':^19} ║ {'Status':^10} ║\n"
            f"╠{'':═^42}╬{'':═^12}╬{'':═^21}╬{'':═^12}╣\n"
        )
        rodape = f"╚{'':═^42}╩{'':═^12}╩{'':═^21}╩{'':═^12}╝\n"
        logging.info("\n" + cabecalho + tabela_str + rodape)


    @staticmethod
    def deletar_pastas(lista_pastas):
        """Remove pastas do DBFS e imprime um resumo com status."""
        tabela_str = ""
        for pasta in lista_pastas:
            try:
                info = dbutils.fs.ls(pasta)[0]
                nome = pasta.rstrip("/").split("/")[-1]
                modificado = datetime.fromtimestamp(info.modificationTime / 1000).strftime("%Y-%m-%d %H:%M:%S") \
                    if info.modificationTime > 0 else "N/A"

                dbutils.fs.rm(pasta, recurse=True)
                status = "✅ REMOVIDA"
                logging.info(f"🧹 Pasta removida: {pasta}")
                tamanho = "-"
            except Exception as e:
                nome = pasta.rstrip("/").split("/")[-1]
                modificado = "-"
                status = "❌ ERRO"
                tamanho = "-"
                logging.error(f"❌ Erro ao remover pasta {pasta}: {e}")

            tabela_str += f"║ {nome:<40} ║ {tamanho:>8} MB ║ {modificado:<19} ║ {status:^10} ║\n"

        cabecalho = (
            f"\n╔{'':═^42}╦{'':═^12}╦{'':═^21}╦{'':═^12}╗\n"
            f"║ {'Pasta':^40} ║ {'Tamanho':^10} ║ {'Modificado em':^19} ║ {'Status':^10} ║\n"
            f"╠{'':═^42}╬{'':═^12}╬{'':═^21}╬{'':═^12}╣\n"
        )
        rodape = f"╚{'':═^42}╩{'':═^12}╩{'':═^21}╩{'':═^12}╝\n"
        logging.info("\n" + cabecalho + tabela_str + rodape)




In [0]:
class TransformadorBase:
    def __init__(self, spark=None, config=None, salvar_em_arquivo=False, caminho_log="dbfs:/FileStore/logs/pipeline.log", log_detalhado=True):
        """
            Classe base para transformação de dados no pipeline em camadas (Bronze, Silver, Gold).

            Parâmetros:
            -----------
            spark : SparkSession, opcional
                Sessão Spark ativa. Caso não seja informada, será criada automaticamente.
            
            config : dict, opcional
                Dicionário de configuração do pipeline (ex: table_configs, table_dim_config).
            
            salvar_em_arquivo : bool, padrão=False
                Define se os logs também devem ser salvos em arquivo no DBFS.
            
            caminho_log : str, padrão="dbfs:/FileStore/logs/pipeline.log"
                Caminho no DBFS para salvar o arquivo de log, se `salvar_em_arquivo=True`.
            
            log_detalhado : bool, padrão=True
                Define se os logs da execução devem ser exibidos em nível detalhado
                (ex: validação campo por campo). Se False, mostra apenas os resultados principais.
        """
        self.spark = spark or SparkSession.builder.getOrCreate()
        self.config = config
        self.salvar_em_arquivo = salvar_em_arquivo
        self.caminho_log = caminho_log
        self.logger = logging.getLogger(self.__class__.__name__)
        self.log_detalhado = log_detalhado


    def configurar_logger(self):
        """
        Configura o logger com timestamp em décimos de segundo.
        Se `self.salvar_em_arquivo=True`, também salva os logs no caminho especificado.
        """
        import time
        import os

        class OneDecimalFormatter(logging.Formatter):
            converter = time.localtime
            def formatTime(self, record, datefmt=None):
                ct = self.converter(record.created)
                t = time.strftime("%H:%M:%S", ct)
                fraction = int((record.created - int(record.created)) * 10)
                return f"{t}.{fraction}"

        formatter = OneDecimalFormatter(fmt='%(levelname)s | %(asctime)s ___ %(name)s: %(message)s')

        # Console
        handler_console = logging.StreamHandler()
        handler_console.setFormatter(formatter)
        handlers = [handler_console]

        # Arquivo (se ativado)
        if self.salvar_em_arquivo:
            caminho_local = f"/dbfs{self.caminho_log[5:]}" if self.caminho_log.startswith("dbfs:/") else self.caminho_log
            os.makedirs(os.path.dirname(caminho_local), exist_ok=True)
            handler_arquivo = logging.FileHandler(caminho_local, mode="w", encoding="utf-8")
            handler_arquivo.setFormatter(formatter)
            handlers.append(handler_arquivo)

        # Registrar no logger principal
        root_logger = logging.getLogger()
        root_logger.handlers = []  # limpa handlers antigos
        for h in handlers:
            root_logger.addHandler(h)
        root_logger.setLevel(logging.INFO)

        # Atualiza o logger da instância com nome da classe
        self.logger = logging.getLogger(self.__class__.__name__)

    @log_etapa("1.  Importar Bibliotecas")
    def importar_bibliotecas(self):
        """
        Importa bibliotecas padrão usadas no pipeline e exibe a versão de cada uma em formato tabular no log.
        Também configura o logger.
        """
        self.configurar_logger()
        spark = SparkSession.builder.getOrCreate()

        bibliotecas = {
            "tqdm": tqdm,
            "time": time,
            "logging": logging,
            "re": re,
            "os": os,
            "spark": spark,
            "pandas": pd,
            "openpyxl": openpyxl,
        }

        bibliotecas_str = ""
        for nome, biblioteca in sorted(bibliotecas.items()):
            version = getattr(biblioteca, '__version__', None)
            if version is None:
                version = getattr(biblioteca, 'version', 'N/A')
            bibliotecas_str += f"║ {nome:<23}║ {version:>12} ║\n"

        versao_python = f"Versão do Python: {python_version()}"
        largura_total = 35

        mensagem = (
            "\n"
            "╔════════════════════════╦══════════════╗\n"
            "║       Biblioteca       ║    Versão    ║\n"
            "╠════════════════════════╬══════════════╣\n"
          f"{bibliotecas_str}"
            "╠════════════════════════╩══════════════╣\n"
            f"║  {versao_python:^{largura_total}}  ║\n"
            "╚═══════════════════════════════════════╝"
            "\n"
        )

        self.logger.info(mensagem)

    @log_etapa("2.  Validação das configurações de fonte de dados")
    def validar_config(self):
        def status(condicao, obrigatorio=True):
            return "✅" if condicao else ("❌" if obrigatorio else "⚠️")

        erros = []

        # Validação de table_dim_config
        self.logger.info("📚 Validação de Tabelas Dimensão")
        for nome, conf in self.config.get("table_dim_config", {}).items():
            colunas_ok = "colunas" in conf
            dados_ok = "dados" in conf
            tipo_ok = "tipo" in conf

            if not colunas_ok:
                erros.append(f"table_dim_config -> {nome}: campo 'colunas' ausente.")
            if not dados_ok:
                erros.append(f"table_dim_config -> {nome}: campo 'dados' ausente.")
            if not tipo_ok:
                erros.append(f"table_dim_config -> {nome}: campo 'tipo' ausente.")

            if self.log_detalhado:
                self.logger.info(
                    f"  {nome}: {{ colunas: {status(colunas_ok)}, dados: {status(dados_ok)}, tipo: {status(tipo_ok)} }}"
                )
            else:
                self.logger.info(
                    f"  {nome:<14} -  colunas: {status(colunas_ok)} | dados: {status(dados_ok)} | tipos: {status(tipo_ok)}"
                )

        self.logger.info("📚 Validação de Tabelas Externas")
        for nome_tabela, conf in self.config.get("table_configs", {}).items():
            campos_obrigatorios = ["anos", "pasta", "arquivo", "formato", "nome_tabela", "schemas"]
            campos_status = {campo: campo in conf for campo in campos_obrigatorios}

            for campo, ok in campos_status.items():
                if not ok:
                    erros.append(f"table_configs -> {nome_tabela} - campo obrigatório '{campo}' ausente.")

            if self.log_detalhado:
                self.logger.info(f"  {nome_tabela}: {{")
                for campo, ok in campos_status.items():
                    self.logger.info(f"    {campo}: {status(ok)}")
            else:
                linha = f"  {nome_tabela:<16}: " + " | ".join(
                    f"{campo}: {status(ok)}" for campo, ok in campos_status.items()
                )
                self.logger.info(linha)

            schemas = conf.get("schemas", {})
            if not isinstance(schemas, dict):
                erros.append(f"table_configs -> {nome_tabela}: 'schemas' não é um dicionário.")
                continue

            for nome_schema, schema in schemas.items():
                col_ok = "colunas" in schema or "colunas_prefixo" in schema or "prefixos_colunas" in schema
                tipo_ok = "schema_final_tipos" in schema

                if not col_ok:
                    erros.append(f"table_configs -> {nome_tabela} -> {nome_schema}: sem colunas.")
                if not tipo_ok:
                    erros.append(f"table_configs -> {nome_tabela} -> {nome_schema}: faltando 'schema_final_tipos'.")

                if not self.log_detalhado:
                    continue

                self.logger.info(f"      {nome_schema}: {{")
                self.logger.info(f"        colunas: {status('colunas' in schema, obrigatorio=False)}")
                self.logger.info(f"        colunas_prefixo: {status('colunas_prefixo' in schema or 'prefixos_colunas' in schema, obrigatorio=False)}")
                self.logger.info(f"        renomeacoes: {status('renomeacoes' in schema, obrigatorio=False)}")

                sub = schema.get("substituir_valores", {})
                self.logger.info(
                    f"        substituir_valores: {{ "
                    f"valor_original: {status('valor_original' in sub, obrigatorio=False)}, "
                    f"valor_substituto: {status('valor_substituto' in sub, obrigatorio=False)}, "
                    f"colunas: {status('colunas' in sub, obrigatorio=False)} }}"
                )

                piv = schema.get("pivotar_colunas_em_linhas", {})
                if piv:
                    for campo_pivot in ["id_vars", "nome_coluna_categoria", "nome_coluna_valor"]:
                        if campo_pivot not in piv:
                            erros.append(f"table_configs -> {nome_tabela} -> {nome_schema}: falta '{campo_pivot}' no pivô.")
                    self.logger.info(
                        f"        pivotar_colunas_em_linhas: {{ "
                        f"id_vars: {status('id_vars' in piv)}, "
                        f"nome_coluna_categoria: {status('nome_coluna_categoria' in piv)}, "
                        f"nome_coluna_valor: {status('nome_coluna_valor' in piv)} }}"
                    )
                else:
                    self.logger.info("        pivotar_colunas_em_linhas: ⚠️ (não configurado)")

                self.logger.info(f"        schema_final_nomes: {status('schema_final_nomes' in schema, obrigatorio=False)}")
                self.logger.info(f"        schema_final_tipos: {status('schema_final_tipos' in schema)}")
                self.logger.info(f"        relacionamento: {status('relacionamento' in schema, obrigatorio=False)}")
                self.logger.info(f"      }}")

            if self.log_detalhado:
                self.logger.info(f"  }}")

        if erros:
            self.logger.warning("\n\n🚨 Resumo de problemas encontrados na validação:")
            for erro in erros:
                self.logger.warning(f"  - {erro}")
        else:
            pass

    @staticmethod
    def converter_xlsx_para_parquet(xlsx_path, parquet_path, sheet, deletar=False):
        """
        Converte um arquivo XLSX do DBFS em Parquet localmente e salva no DBFS.

        Parâmetros:
        xlsx_path (str): Caminho no DBFS para o arquivo XLSX.
        parquet_path (str): Caminho de saída para o Parquet.
        sheet (str): Nome da aba do Excel a ser lida.
        deletar (bool): Se True, substitui o arquivo Parquet existente.
        """

        tmp_path = "/tmp/" + os.path.basename(xlsx_path)
        if deletar and os.path.exists(parquet_path):
            os.remove(parquet_path)

        dbutils.fs.cp(xlsx_path, f"file:{tmp_path}")
        df = pd.read_excel(tmp_path, engine="openpyxl", sheet_name=sheet)

        if "Unnamed: 0" in df.columns:
            df = df.drop(columns=["Unnamed: 0"])
        df = df.astype(str)

        df.to_parquet(parquet_path, index=False)
        logging.info(f"✔️ Convertido: {os.path.basename(xlsx_path)} → {parquet_path}")

    def inicializar_database(self, camada):
        """
        (Re)cria o database da camada especificada (bronze, silver ou gold).
        """
        camada = camada.lower().strip()
        if camada not in ["bronze", "silver", "gold"]:
            raise ValueError(f"❌ Camada inválida: '{camada}'. Use 'bronze', 'silver' ou 'gold'.")

        try:
            self.spark.sql(f"DROP DATABASE IF EXISTS {camada} CASCADE")
            self.spark.sql(f"CREATE DATABASE {camada}")
            self.logger.info(f"🆕 Database `{camada}` recriado com sucesso.")
        except Exception as e:
            self.logger.error(f"❌ Erro ao recriar o database `{camada}`: {e}")
            raise


In [0]:
class TransformadorBronze(TransformadorBase):
    def __init__(self, spark, table_configs, table_dim_config=None, schema_bronze="bronze"):
        config = {
            "table_configs": table_configs,
            "table_dim_config": table_dim_config or {}
        }
        super().__init__(spark, config=config)
        self.schema_bronze = schema_bronze
        self.table_configs = table_configs
        self.table_dim_config = table_dim_config or {}

    def carregar_csv(self, file_path, year, overwrite, nome_tabela, file_format="csv", schema="false", header="true", delimiter=";"):
        try:
            df = self.spark.read.format(file_format) \
                .option("inferSchema", schema) \
                .option("header", header) \
                .option("sep", delimiter) \
                .option("encoding", "UTF-8") \
                .load(file_path)

            table_name = f"{self.schema_bronze}.{nome_tabela}_{year}"
            if self._verificar_tabela_existente(table_name, overwrite):
                return

            df.write.format("parquet").mode("overwrite").saveAsTable(table_name)
            self.logger.info(f"✅ Tabela {table_name} carregada com sucesso.")
        except Exception as e:
            self.logger.error(f"❌ Erro ao carregar a tabela {table_name}: {e}")
            raise

    def carregar_excel(self, file_path, year, sheet_name, overwrite, nome_tabela, engine='openpyxl'):
        try:
            table_name = f"{self.schema_bronze}.{nome_tabela}_{year}"

            if self._verificar_tabela_existente(table_name, overwrite):
                return

            try:
                arquivos = [f.name for f in dbutils.fs.ls(os.path.dirname(file_path))]
                nome_arquivo = os.path.basename(file_path)
                if nome_arquivo not in arquivos:
                    self.logger.warning(f"""⚠️ Arquivo não encontrado no DBFS: {file_path}""")
                    return
            except Exception as e:
                self.logger.error(f"❌ Erro ao acessar caminho no DBFS: {file_path} — {e}")
                return

            base_name = os.path.basename(file_path)
            match = re.search(r"(.+?)_\d{4}\.xlsx$", base_name)
            prefix = match.group(1) if match else "temp"
            local_path = f"/tmp/{prefix}_{year}.xlsx"

            dbutils.fs.cp(file_path, f"file:{local_path}")

            # ⛔ NÃO usa header — carrega tudo como dados
            pdf = pd.read_excel(local_path, engine=engine, sheet_name=sheet_name, header=None)
            df = self.spark.createDataFrame(pdf.astype(str))

            df.write.format("parquet").mode("overwrite").saveAsTable(table_name)
            os.remove(local_path)

            # self.logger.info(f"""✅ Tabela {table_name} carregada com sucesso.""")
        except Exception as e:
            self.logger.error(
                f"❌ Erro ao carregar a tabela {table_name} a partir do arquivo {file_path} (aba {sheet_name}): {e}"
            )
            raise

    def _verificar_tabela_existente(self, nome_tabela, overwrite):
        tabela_existe = self.spark.catalog.tableExists(nome_tabela)
        path_fisico = f"/user/hive/warehouse/{nome_tabela.replace('.', '/')}"
        pasta_schema = f"dbfs:/user/hive/warehouse/{self.schema_bronze}.db"
        pasta_existe = False

        try:
            try:
                pastas_hive = dbutils.fs.ls("dbfs:/user/hive/warehouse/")
            except Exception as e:
                self.logger.warning(f"⚠️ Diretório hive/warehouse ainda não existe: {e}")
                pastas_hive = []

            if any(f.name.rstrip("/") == f"{self.schema_bronze}.db" for f in pastas_hive):
                try:
                    arquivos = dbutils.fs.ls(pasta_schema)
                except Exception:
                    arquivos = []
                nome_tabela_fisica = nome_tabela.split(".")[1]
                pasta_existe = any(f.name.rstrip("/") == nome_tabela_fisica for f in arquivos)

        except Exception as e:
            self.logger.warning(f"⚠️ Erro ao verificar a pasta física de {nome_tabela}: {e}")

        if tabela_existe and overwrite:
            self.logger.warning(f"A tabela {nome_tabela} já existe. Removendo para sobrescrever.")
            self.spark.sql(f"DROP TABLE {nome_tabela}")

        if (tabela_existe or pasta_existe) and overwrite:
            try:
                dbutils.fs.rm(f"dbfs:{path_fisico}", True)
                self.logger.info(f"📁 Pasta {path_fisico} removida com sucesso.")
            except Exception as e:
                self.logger.warning(f"⚠️ Não foi possível remover a pasta física da tabela {nome_tabela}: {e}")
            return False

        if tabela_existe and not overwrite:
            # self.logger.info(f"A tabela {nome_tabela} já existe. Ignorando carregamento conforme overwrite=False.")
            return True

        if pasta_existe and not overwrite:
            # self.logger.warning(f"⚠️ Pasta física existente para {nome_tabela}, mas overwrite=False.")
            return True

        return False

    def _construir_caminho_arquivo(self, config, ano):
        return f"dbfs:/FileStore/tables/{config['pasta']}/{config['arquivo']}{ano}.{config['formato']}"

    def listar_tabelas(self):
        schema = self.schema_bronze
        self.logger.info(f"📦 Esquema: {schema}")
        self.logger.info("🔎 Listando tabelas existentes no metastore...")

        try:
            tabelas = self.spark.catalog.listTables(schema)
            if not tabelas:
                self.logger.info(f"⚠️ Nenhuma tabela encontrada no schema '{schema}'.")
                return []

            for i, tabela in enumerate(sorted(tabelas, key=lambda x: x.name)):
                conector = "└──" if i == len(tabelas) - 1 else "├──"
                tipo = tabela.tableType.lower()
                icone = "📄" if tipo == "managed" else "📁"
                self.logger.info(f"{conector} {icone} {tabela.name} ({tipo})")

            return [t.name for t in tabelas]
        except Exception as e:
            self.logger.error(f"❌ Erro ao listar tabelas da camada bronze: {e}")
            return []

    def listar_tabelas_detalhado(self):
        from pyspark.sql.utils import AnalysisException

        schema = self.schema_bronze
        self.logger.info(f"📦 Catálogo detalhado da camada '{schema}'")

        try:
            tabelas = self.spark.catalog.listTables(schema)
            if not tabelas:
                self.logger.info("⚠️ Nenhuma tabela encontrada.")
                return

            resultados = []

            for t in sorted(tabelas, key=lambda x: x.name):
                nome_completo = f"{schema}.{t.name}"
                try:
                    df = self.spark.table(nome_completo)
                    num_colunas = len(df.columns)
                    num_registros = df.count()
                    metadata = self.spark.sql(f"DESCRIBE DETAIL {nome_completo}").collect()[0]
                    particionamento = len(metadata['partitionColumns']) > 0
                except AnalysisException as e:
                    self.logger.warning(f"⚠️ Não foi possível acessar '{nome_completo}': {e}")
                    num_colunas = "-"
                    num_registros = "-"
                    particionamento = "-"

                resultados.append({
                    "tabela": t.name,
                    "colunas": num_colunas,
                    "registros": num_registros,
                    "particionado": particionamento,
                    "tipo": t.tableType.lower(),
                    "temporária": t.isTemporary
                })

            df_resultado = self.spark.createDataFrame(resultados)
            df_resultado.show(truncate=False)

        except Exception as e:
            self.logger.error(f"❌ Erro ao gerar listagem detalhada: {e}")

    @log_etapa("3.  Pipeline da Camada Bronze")
    def executar_pipeline(self, deletar=False):
        import time
        from tqdm import tqdm

        inicio = time.time()
        status_resumo = {}

        # Tabelas dimensão
        if hasattr(self, "table_dim_config") and self.table_dim_config:
            self.criar_dimensoes(self.table_dim_config)
            for tabela in self.table_dim_config:
                status_resumo[tabela] = {None: True}  # sucesso único

        # Tabelas externas
        table_configs = self.config["table_configs"]
        for tabela, config in table_configs.items():
            if tabela in self.table_dim_config:
                continue

            anos = config.get("anos", [])
            if not anos:
                self.logger.info(f"🚫 [IGNORADO] {tabela}: nenhum ano definido para importação.")
                continue

            for ano in tqdm(anos, desc=f"DOWNLOAD | {tabela.ljust(20)}", unit="ano", leave=True):
                nome_tabela = config["nome_tabela"]
                tabela_bronze = f"{self.schema_bronze}.{nome_tabela}_{ano}"
                caminho_arquivo = self._construir_caminho_arquivo(config, ano)
                formato = config["formato"]
                nome_arquivo = f"{config['arquivo']}{ano}.{formato}"

                status_resumo.setdefault(tabela, {})[ano] = None  # default: erro

                try:
                    if formato == "csv":
                        self.carregar_csv(caminho_arquivo, ano, overwrite=deletar, nome_tabela=nome_tabela)
                    elif formato == "xlsx":
                        self.carregar_excel(
                            caminho_arquivo,
                            ano,
                            sheet_name=config.get("aba", "Plan1"),
                            overwrite=deletar,
                            nome_tabela=nome_tabela
                        )
                    else:
                        self.logger.warning(
                            f"⚠️ [{tabela_bronze}] Formato '{formato}' não suportado. Pulando arquivo: {nome_arquivo}"
                        )
                        status_resumo[tabela][ano] = False
                        continue

                    self.logger.info(f"✔️ [{tabela_bronze}] Sucesso ao carregar: {nome_arquivo}")
                    status_resumo[tabela][ano] = True

                except FileNotFoundError:
                    self.logger.warning(f"⚠️ Arquivo não encontrado no DBFS: {caminho_arquivo}")
                    status_resumo[tabela][ano] = False

                except Exception as e:
                    self.logger.error(f"❌ [{tabela_bronze}] Erro ao carregar {nome_arquivo}: {e}")
                    status_resumo[tabela][ano] = None

        # RESUMO FINAL
        status_icones = {True: "🟩", False: "⚠️", None: "❌"}
        anos_totais = sorted({ano for tabela in status_resumo.values() for ano in tabela if ano is not None})
        largura = max(len(t) for t in status_resumo) + 2

        def montar_tabela(tabelas, com_ano=True):
            if com_ano:
                header = " " * largura + "".join(f"{ano:>8}" for ano in anos_totais)
                linhas = [header, "-" * (largura + 8 * len(anos_totais))]
                for tabela in tabelas:
                    linha = f"{tabela:<{largura}}"
                    for ano in anos_totais:
                        status = status_resumo.get(tabela, {}).get(ano)
                        linha += f"{status_icones.get(status, '⚠️'):>8}"
                    linhas.append(linha)
            else:
                header = " " * largura + "Status"
                linhas = [header, "-" * (largura + 6)]
                for tabela in tabelas:
                    status = status_resumo.get(tabela, {}).get(None)
                    linha = f"{tabela:<{largura}} {status_icones.get(status, '❌')}"
                    linhas.append(linha)

            return "\n".join(linhas)

        tabelas_dim = [t for t in status_resumo if list(status_resumo[t].keys()) == [None]]
        tabelas_dados = [t for t in status_resumo if t not in tabelas_dim]

        self.logger.info("\n📋 Resumo de carregamento - Tabelas de Dimensão\n")
        self.logger.info(f"\n{montar_tabela(tabelas_dim, com_ano=False)}\n")

        self.logger.info("📋 Resumo de carregamento - Tabelas de Dados Externos\n")
        self.logger.info(f"\n{montar_tabela(tabelas_dados, com_ano=True)}\n")

    def criar_dimensoes(self, config_dimensoes, schema_destino="bronze"):
        for nome_dim, parametros in config_dimensoes.items():
            tabela_destino = f"{schema_destino}.{nome_dim}"
            if self.spark.catalog.tableExists(tabela_destino):
                self.logger.info(f"📥 Tabela dimensão `{tabela_destino}` já existe. Etapa ignorada.")
                continue

            self.logger.info(f"🛠️ Criando dimensão `{tabela_destino}`")

            if nome_dim == "d_Calendario":
                data_inicio = parametros.get("data_inicio")
                data_fim = parametros.get("data_fim")

                df = self.spark.sql(f"""
                    SELECT sequence(to_date('{data_inicio}'), to_date('{data_fim}'), interval 1 day) AS datas
                """).selectExpr("explode(datas) AS DATA")

                df = df \
                    .withColumn("ANO", F.year("DATA")) \
                    .withColumn("MES", F.month("DATA")) \
                    .withColumn("DIA", F.dayofmonth("DATA")) \
                    .withColumn("NOME_MES", F.date_format("DATA", "MMMM")) \
                    .withColumn("TRIMESTRE", F.quarter("DATA")) \
                    .withColumn("SEMESTRE", F.when(F.quarter("DATA") <= 2, 1).otherwise(2)) \
                    .withColumn("DIA_SEMANA", F.date_format("DATA", "E")) \
                    .withColumn("NOME_DIA", F.date_format("DATA", "EEEE")) \
                    .withColumn("FIM_SEMANA", F.when(F.col("DIA_SEMANA").isin("Sat", "Sun"), True).otherwise(False)) \
                    .withColumn("DATA_ORDENADA", F.date_format("DATA", "yyyyMMdd").cast("int"))

                df = df.drop("DATA")

            elif nome_dim == "d_Municipios":
                caminho = parametros.get("dados")
                aba = parametros.get("aba")
                colunas = parametros.get("colunas")
                try:
                    self.carregar_xls(
                        file_path=caminho,
                        year="2024",
                        sheet_name=aba,
                        overwrite=True,
                        nome_tabela="temp_d_municipios"
                    )
                    df = self.spark.table(f"{self.schema_bronze}.temp_d_municipios_2024")
                    df = df.toDF(*colunas)
                    df = df.filter(F.col("Codigo_Municipio_Completo").isNotNull())
                except Exception as e:
                    self.logger.error(f"❌ Erro ao carregar a dimensão `{nome_dim}`: {e}")
                    continue

            else:
                colunas = parametros.get("colunas")
                dados = parametros.get("dados")

                if not colunas or not dados:
                    self.logger.warning(f"⚠️ Dados ou colunas ausentes para a dimensão `{nome_dim}`. Pulando...")
                    continue

                df = self.spark.createDataFrame(dados, colunas)

            tipos = parametros.get("tipo")
            if tipos:
                for col, tipo in tipos.items():
                    df = df.withColumn(col.replace(" ", "_"), F.col(col.replace(" ", "_")).cast(tipo))

            df.write.mode("overwrite").format("parquet").saveAsTable(tabela_destino)
            self.logger.info(f"✅ Dimensão `{tabela_destino}` criada com sucesso.")

    def carregar_xls(self, file_path, year, sheet_name, overwrite, nome_tabela, engine='xlrd'):
        try:
            table_name = f"{self.schema_bronze}.{nome_tabela}_{year}"

            if self._verificar_tabela_existente(table_name, overwrite):
                return

            try:
                arquivos = [f.name for f in dbutils.fs.ls(os.path.dirname(file_path))]
                nome_arquivo = os.path.basename(file_path)
                if nome_arquivo not in arquivos:
                    self.logger.warning(f"⚠️ Arquivo não encontrado no DBFS: {file_path}")
                    return
            except Exception as e:
                self.logger.error(f"❌ Erro ao acessar caminho no DBFS: {file_path} — {e}")
                return

            base_name = os.path.basename(file_path)
            match = re.search(r"(.+?)_\d{4}\.xls$", base_name)
            prefix = match.group(1) if match else "temp"
            local_path = f"/tmp/{prefix}_{year}.xls"

            dbutils.fs.cp(file_path, f"file:{local_path}")

            pdf = pd.read_excel(local_path, engine=engine, sheet_name=sheet_name, header=None)
            df = self.spark.createDataFrame(pdf.astype(str))

            df.write.format("parquet").mode("overwrite").saveAsTable(table_name)
            os.remove(local_path)

        except Exception as e:
            self.logger.error(
                f"❌ Erro ao carregar a tabela {table_name} a partir do arquivo {file_path} (aba {sheet_name}): {e}"
            )
            raise

In [0]:
class TransformadorPrata(TransformadorBase):
    def __init__(self, spark, table_configs, table_dim_config=None, schema_destino="silver"):
        config = {
            "table_configs": table_configs,
            "table_dim_config": table_dim_config or {}
        }
        super().__init__(spark, config=config)
        self.schema_bronze = "bronze"
        self.schema_destino = schema_destino
        self.table_configs = table_configs
        self.table_dim_config = table_dim_config or {}

    def carregar_bronze(self, nome_tabela, ano):
        return self.spark.table(f"bronze.{nome_tabela}_{ano}")

    def aplicar_depara_colunas_bronze(self, df, depara):
        """Renomeia as colunas do DataFrame com base em pares (coluna_origem, coluna_destino)."""
        colunas_atuais = set(df.columns)
        colunas_origem = [orig for orig, _ in depara]
        colunas_destino = [novo for _, novo in depara]

        if set(colunas_destino).issubset(colunas_atuais):
            self.logger.info("✅ Colunas já renomeadas. Pulando etapa de renomeação.")
            return df

        if set(colunas_origem).issubset(colunas_atuais):
            df = df.select(*colunas_origem).toDF(*colunas_destino)
            self.logger.info("🔁 Renomeação aplicada com sucesso.")
            return df

        raise ValueError("❌ Estrutura de colunas inesperada no DataFrame. Não foi possível aplicar renomeação.")

    def aplicar_renomeacoes(self, df, renomeacoes):
        for original, novo in renomeacoes.items():
            df = df.withColumnRenamed(original, novo)
        return df

    def substituir_valores(self, df, colunas, valor_original, valor_substituto):
        for col in colunas:
            df = df.withColumn(
                col,
                F.when(F.col(col) == valor_original, valor_substituto).otherwise(F.col(col))
            )
        return df

    def transformar_colunas_em_linhas(self, df, id_vars, nome_categoria, nome_valor):
        from pyspark.sql.functions import expr

        colunas_variaveis = [col for col in df.columns if col not in id_vars]

        exprs = ", ".join(
            [f"stack({len(colunas_variaveis)}, " + 
            ", ".join(f"'{c}', `{c}`" for c in colunas_variaveis) + 
            f") as ({nome_categoria}, {nome_valor})"]
        )
        
        return df.selectExpr(*id_vars, exprs)

    def salvar_tabela_prata(self, df, nome_tabela):
        caminho = f"{self.schema_destino}.{nome_tabela}"
        try:
            df.write.partitionBy("NU_ANO_CENSO").format("parquet").mode("append").saveAsTable(caminho)
            self.logger.info(f"✅ Tabela salva como {caminho} (com particionamento por NU_ANO_CENSO)")
        except Exception as e:
            self.logger.error(f"❌ Erro ao salvar tabela {caminho}: {e}")
            raise
    
    @log_etapa("4.  Pipeline da Camada Silver")
    def executar_pipeline(self):
        tabelas_dim = ["d_Series", "d_Calendario", "d_Municipios"]
        for tabela in tabelas_dim:
            origem = f"{self.schema_bronze}.{tabela}"
            destino = f"{self.schema_destino}.{tabela}"

            # Verifica se a tabela já existe na camada silver
            tabelas_silver = [t.name.lower() for t in self.spark.catalog.listTables(self.schema_destino)]
            if tabela.lower() in tabelas_silver:
                self.logger.info(f"📭 Tabela dimensão `{destino}` já existe. Etapa ignorada.")
                continue

            try:
                df = self.spark.table(origem)
                df.write.mode("overwrite").format("delta").saveAsTable(destino)
                self.logger.info(f"📅 Tabela dimensão `{destino}` copiada da camada Bronze.")
            except Exception as e:
                self.logger.warning(f"⚠️ Falha ao copiar `{origem}` para Silver: {e}")


        table_configs = self.config["table_configs"]
        tabelas_alvo = list(table_configs.keys())

        for nome_tabela in tabelas_alvo:
            config = table_configs.get(nome_tabela)
            if not config:
                self.logger.warning(f"⚠️ Configuração não encontrada para {nome_tabela}")
                continue

            anos = config.get("anos", [])
            for ano in anos:
                if isinstance(ano, list) and len(ano) == 1:
                    ano = ano[0]
                elif isinstance(ano, list):
                    raise ValueError(f"⚠️ Valor inesperado para 'ano': {ano} (lista com mais de 1 item)")

                # ✅ VERIFICAÇÃO ANTECIPADA
                todas_existem = True
                tabelas_silver = [t.name.lower() for t in self.spark.catalog.listTables(self.schema_destino)]
                for schema in config.get("schemas", {}).keys():
                    nome_final = f"{nome_tabela}__{schema}".lower()
                    if nome_final not in tabelas_silver:
                        todas_existem = False
                        break

                if todas_existem:
                    self.logger.info(f"📭 Tabela `{nome_tabela}` (ano {ano}) já processada em todos os schemas. Etapa ignorada.")
                    continue

                try:
                    self.logger.info("\n---------------------------------------------\n")
                    self.logger.info(f"📆 Tabela: {nome_tabela.upper()} | Ano: {ano}")
                    self.transformar_tabela(nome_tabela, config, str(ano))

                except Exception as e:
                    self.logger.error(f"❌ Falha ao processar {nome_tabela}_{ano}: {e}")

    def transformar_tabela(self, nome_tabela_bronze, config, ano):
        try:
            df_bronze = self.carregar_bronze(nome_tabela_bronze, ano)

            depara = config.get("depara_colunas_bronze")
            if depara is not None:
                df_renomeado = self.aplicar_depara_colunas_bronze(df_bronze, depara)
                self.logger.info("  ├── 🔁 Renomeação padrão de colunas aplicada")
            else:
                df_renomeado = df_bronze
                self.logger.info("  ├── 🔁 Renomeação padrão ignorada")

            df_renomeado = df_renomeado.filter(F.col("NU_ANO_CENSO") == ano)
            self.logger.info(f"  ├── 📆 Filtragem por ano (NU_ANO_CENSO = {ano})")

            for chave_schema, schema_info in config.get("schemas", {}).items():
                nome_final = f"{nome_tabela_bronze}__{chave_schema}"
                path_check = f"{self.schema_destino}.{nome_final}".lower()
                tabelas_silver = [t.name.lower() for t in self.spark.catalog.listTables(self.schema_destino)]

                if path_check in tabelas_silver:
                    self.logger.info(f"  ├── ⏩ Tabela {path_check} já existe. Etapa ignorada.")
                    continue

                df_trabalho = df_renomeado

                colunas_fixas = schema_info.get("colunas", [])
                prefixos = schema_info.get("colunas_prefixo") or schema_info.get("prefixos_colunas") or []
                if isinstance(prefixos, str):
                    prefixos = [prefixos]
                colunas_prefixadas = [c for c in df_trabalho.columns if any(c.startswith(p) for p in prefixos)]

                colunas = colunas_fixas + colunas_prefixadas
                if colunas:
                    df_trabalho = df_trabalho.select(*colunas)
                    self.logger.info("  │   ├─ 📌 Colunas selecionadas")

                renomeacoes = schema_info.get("renomeacoes")
                if renomeacoes:
                    df_trabalho = self.aplicar_renomeacoes(df_trabalho, renomeacoes)
                    self.logger.info("  │   ├─ 🔤 Renomeações de colunas")

                substituicoes = schema_info.get("substituir_valores")
                if substituicoes:
                    df_trabalho = self.substituir_valores(
                        df_trabalho,
                        substituicoes["colunas"],
                        substituicoes["valor_original"],
                        substituicoes["valor_substituto"]
                    )
                    self.logger.info("  │   ├─ ♻️ Substituição de valores")

                pivot_config = schema_info.get("pivotar_colunas_em_linhas")
                if pivot_config:
                    df_trabalho = self.transformar_colunas_em_linhas(
                        df_trabalho,
                        pivot_config["id_vars"],
                        pivot_config["nome_coluna_categoria"],
                        pivot_config["nome_coluna_valor"]
                    )
                    self.logger.info(f"  │   ├─ 🔃 Unpivot: coluna `{pivot_config['nome_coluna_categoria']}`")

                for tabela_dim, colunas_chave in schema_info.get("relacionamento", {}).items():
                    try:
                        df_dim = self.spark.table(f"{self.schema_destino}.{tabela_dim}")
                        for col in colunas_chave:
                            df_trabalho = df_trabalho.withColumn(col, F.trim(F.upper(F.col(col))))
                            df_dim = df_dim.withColumn(col, F.trim(F.upper(F.col(col))))
                        df_trabalho = df_trabalho.join(df_dim, on=colunas_chave, how="left")

                        if "COD_SEGMENTO" in df_trabalho.columns:
                            df_trabalho = df_trabalho.filter(F.col("COD_SEGMENTO").isNotNull())

                        self.logger.info(f"  │   ├─ 🔗 Join com `{tabela_dim}`")
                    except Exception as e:
                        self.logger.warning(f"  │   ├─ ⚠️ Falha no join com {tabela_dim}: {e}")

                nomes_finais = schema_info.get("schema_final_nomes")
                colunas_atuais = df_trabalho.columns

                if nomes_finais:
                    colunas_faltantes = [col for col in nomes_finais if col not in colunas_atuais]
                    if colunas_faltantes:
                        self.logger.warning(f"  │   ├─ ⚠️ Colunas ausentes antes da seleção final: {colunas_faltantes}")
                        self.logger.warning(f"  │   ├─ 🔎 Colunas disponíveis: {colunas_atuais}")
                    else:
                        df_trabalho = df_trabalho.select(*nomes_finais)
                        self.logger.info("  │   ├─ 🏷️ Seleção e ordenação das colunas finais aplicada")
                else:
                    self.logger.warning("  │   ├─ ⚠️ Lista de nomes finais (`schema_final_nomes`) não definida.")

                tipos_finais = schema_info.get("schema_final_tipos")
                if tipos_finais:
                    for coluna, tipo in tipos_finais.items():
                        df_trabalho = df_trabalho.withColumn(coluna, F.col(coluna).cast(tipo))
                    self.logger.info("  │   ├─ 📆 Tipagem aplicada")

                self.salvar_tabela_prata(df_trabalho, nome_final)
                self.logger.info(f"  │   └─ ✅ Salvo como {self.schema_destino}.{nome_final}")

        except Exception as e:
            self.logger.error(f"❌ Erro ao transformar tabela {nome_tabela_bronze}_{ano}: {e}")
            raise


In [0]:
class TransformadorGold(TransformadorBase):
    def __init__(self, spark, table_configs, table_dim_config=None, schema_silver="silver", schema_gold="gold"):
        config = {
            "table_configs": table_configs,
            "table_dim_config": table_dim_config or {}
        }
        super().__init__(spark, config=config)
        self.schema_silver = schema_silver
        self.schema_gold = schema_gold
        self.table_configs = table_configs
        self.table_dim_config = table_dim_config or {}

    def copiar_dimensoes_para_gold(self):
        tabelas = ["d_Series",  "d_Calendario", "d_Municipios"]
        tabelas_gold = [t.name.lower() for t in self.spark.catalog.listTables("gold")]

        for tabela in tabelas:
            nome_tabela_gold = tabela.lower()
            if nome_tabela_gold in tabelas_gold:
                self.logger.info(f"📭 Tabela `{self.schema_gold}.{tabela}` já existe. Etapa ignorada.")
                continue

            origem = f"{self.schema_silver}.{tabela}"
            destino = f"{self.schema_gold}.{tabela}"

            try:
                df = self.spark.table(origem)
                df.write.mode("overwrite").format("delta").saveAsTable(destino)
                self.logger.info(f"✅ Tabela `{tabela}` copiada com sucesso para `{self.schema_gold}`")
            except Exception as e:
                self.logger.warning(f"⚠️ Falha ao copiar `{tabela}` para Gold: {e}")

    def tentar_carregar_tabela(self, nome_tabela, schema=None):
        try:
            schema = schema or self.schema_silver
            return self.spark.table(f"{schema}.{nome_tabela}")
        except Exception:
            self.logger.info(f"ℹ️ Tabela `{schema}.{nome_tabela}` ausente. Criando colunas com valores nulos.")
            return None

    def salvar_tabela_gold(self, df, nome_tabela, particionar_por=None):
        try:
            from pyspark.sql.functions import col
            for column in df.columns:
                if df.schema[column].dataType.simpleString() == 'void':
                    df = df.withColumn(column, lit(None).cast("string"))

            caminho = f"{self.schema_gold}.{nome_tabela}"
            writer = df.write.format("parquet").mode("overwrite")
            if particionar_por:
                writer = writer.partitionBy(particionar_por)
            writer.saveAsTable(caminho)
            self.logger.info(f"✅ Tabela salva com sucesso como `{caminho}`;")
        except Exception as e:
            self.logger.error(f"❌ Erro ao salvar tabela Gold `{nome_tabela}`: {e}")
            raise

    @log_etapa("5.  Pipeline da Camada Gold")
    def executar_pipeline(self):
        try:
            self.copiar_dimensoes_para_gold()

            tabelas_existentes = [t.name.lower() for t in self.spark.catalog.listTables("gold")]

            if "indicadores_por_segmento" not in tabelas_existentes:
                self.transformar_indicadores("indicadores_por_segmento")
            else:
                self.logger.info("📭 Tabela `gold.indicadores_por_segmento` já existe. Etapa ignorada.")

        except Exception as e:
            self.logger.error(f"❌ Erro no pipeline da camada Gold: {e}")
            raise


    def transformar_indicadores(self, nome_tabela_final):
        try:
            self.logger.info("🚀 Iniciando construção da base com d_Calendario e d_Series")

            df_anos = self.tentar_carregar_tabela("d_Calendario")
            df_series = self.tentar_carregar_tabela("d_Series")

            if df_anos is None or df_series is None:
                raise ValueError("❌ Tabelas de dimensão obrigatórias ausentes: d_Calendario ou d_Series")

            df_base = self.spark.table("silver.censo_escolar__escolas").select("CO_ENTIDADE").dropDuplicates()
            df_anos = df_anos.select("ANO").withColumnRenamed("ANO", "NU_ANO_CENSO").dropDuplicates()
            df_series = df_series.select("COD_SEGMENTO", "COD_SERIE").dropDuplicates()

            df_base = df_base.crossJoin(df_anos).crossJoin(df_series)

            tabelas_para_join = [
                ("alunos_por_turma__ATU",         ["CO_ENTIDADE", "NU_ANO_CENSO", "COD_SEGMENTO", "COD_SERIE"], ["ATU"]),
                ("hora_aula_diaria__HAD",         ["CO_ENTIDADE", "NU_ANO_CENSO", "COD_SEGMENTO", "COD_SERIE"], ["HAD"]),
                ("taxa_escolar__TAP",             ["CO_ENTIDADE", "NU_ANO_CENSO", "COD_SEGMENTO", "COD_SERIE"], ["TAP"]),
                ("taxa_escolar__TRE",             ["CO_ENTIDADE", "NU_ANO_CENSO", "COD_SEGMENTO", "COD_SERIE"], ["TRE"]),
                ("taxa_escolar__TAB",             ["CO_ENTIDADE", "NU_ANO_CENSO", "COD_SEGMENTO", "COD_SERIE"], ["TAB"]),
                ("censo_escolar__QT_MATRICULA",   ["CO_ENTIDADE", "NU_ANO_CENSO", "COD_SEGMENTO", "COD_SERIE"], ["QT_MAT"]),
                ("censo_escolar__QT_TURMAS",      ["CO_ENTIDADE", "NU_ANO_CENSO", "COD_SEGMENTO", "COD_SERIE"], ["QT_TUR"]),
                ("censo_escolar__QT_DOCENTES",    ["CO_ENTIDADE", "NU_ANO_CENSO", "COD_SEGMENTO", "COD_SERIE"], ["QT_DOC"])
            ]

            from pyspark.sql.functions import lit

            for nome_tabela, chaves, colunas in tabelas_para_join:
                df_join = self.tentar_carregar_tabela(nome_tabela)
                if df_join is None:
                    for col in colunas:
                        df_base = df_base.withColumn(col, lit(None))
                else:
                    df_join = df_join.select(*(chaves + colunas))
                    df_base = df_base.join(df_join, on=chaves, how="left")
                self.logger.info(f"🔗 Join realizado: + {nome_tabela};")

            df_base = df_base.select(*list(dict.fromkeys(df_base.columns)))
            self.salvar_tabela_gold(df_base, nome_tabela_final, particionar_por="NU_ANO_CENSO")

        except Exception as e:
            self.logger.error(f"❌ Erro na criação da tabela Gold `{nome_tabela_final}`: {e}")
            raise


INFO | 01:29:50.8 ___ py4j.clientserver: Received command c on object id p0


##### 3 - Executar Pipeline

In [0]:
anos_config = [2022, 2023, 2024]

table_dim_config = {
    "d_Series": {
        "colunas": ["SEGMENTO", "COD_SEGMENTO", "SERIE", "COD_SERIE", "NOME_SERIE"],
        "dados": [
            ("EI",   1, "CRECHE",         1, "Creche"), 
            ("EI",   1, "PRE_ESCOLA",     2, "Pré-Escola"),
            ("EFI",  2, "1_ANO",         11, "1º Ano"), 
            ("EFI",  2, "2_ANO",         12, "2º Ano"), 
            ("EFI",  2, "3_ANO",         13, "3º Ano"),
            ("EFI",  2, "4_ANO",         14, "4º Ano"), 
            ("EFI",  2, "5_ANO",         15, "5º Ano"), 
            ("EFI",  2, "ANOS_INICIAIS", 16, "Anos Iniciais"),
            ("EFII", 3, "6_ANO",         21, "6º Ano"), 
            ("EFII", 3, "7_ANO",         22, "7º Ano"), 
            ("EFII", 3, "8_ANO",         23, "8º Ano"),
            ("EFII", 3, "9_ANO",         24, "9º Ano"), 
            ("EFIi", 2, "ANOS_FINAIS",   25, "Anos Finais"),
            ("EM",   4, "1_SERIE",       31, "1ª Série"), 
            ("EM",   4, "2_SERIE",       32, "2ª Série"), 
            ("EM",   4, "3_SERIE",       33, "3ª Série"), 
            ("EM",   4, "MEDIO",         34, "Médio")],
        "tipo": {"SEGMENTO": "string", "COD_SEGMENTO": "int", "SERIE": "string", "COD_SERIE": "int", "NOME_SERIE": "string"}},
    "d_Calendario": {
        "colunas": ["ANO", "MES", "DIA", "NOME_MES", "TRIMESTRE", "SEMESTRE", "DIA_SEMANA", "NOME_DIA", "FIM_SEMANA", "DATA_ORDENADA"],
        "dados": None,
        "tipo": {"ANO": "int", "MES": "int", "DIA": "int", 
                 "NOME_MES": "string", "TRIMESTRE": "int", 
                 "SEMESTRE": "int", "DIA_SEMANA": "string", 
                 "NOME_DIA": "string", "FIM_SEMANA": "string", 
                 "DATA_ORDENADA": "int"},
        "data_inicio": f"{min(anos_config)}-01-01",
        "data_fim": f"{max(anos_config)}-12-31",
        "expressao_colunas": {
        "ANO": "year(DATA)",
        "MES": "month(DATA)",
        "DIA": "dayofmonth(DATA)",
        "NOME_MES": "date_format(DATA, 'MMMM')",
        "TRIMESTRE": "quarter(DATA)",
        "SEMESTRE": "IF(quarter(DATA) <= 2, 1, 2)",
        "DIA_SEMANA": "date_format(DATA, 'E')",
        "NOME_DIA": "date_format(DATA, 'EEEE')",
        "FIM_SEMANA": "IF(date_format(DATA, 'E') IN ('Sat', 'Sun'), True, False)",
        "DATA_ORDENADA": "CAST(date_format(DATA, 'yyyyMMdd') AS int)"}},

    "d_Municipios": {
        "aba": "DTB_Municipio",
        "colunas": [
            "UF", "Nome_UF", "Regiao_Intermediaria", "Nome_Regiao_Intermediaria",
            "Regiao_Imediata", "Nome_Regiao_Imediata", "Municipio",
            "Codigo_Municipio_Completo", "Nome_Municipio"
        ],
        "dados": "dbfs:/FileStore/tables/MUNICIPIOS/RELATORIO_DTB_BRASIL_MUNICIPIO.xls",
        "tipo": {
            "UF": "string",
            "Nome_UF": "string",
            "Regiao_Intermediaria": "int",
            "Nome_Regiao_Intermediaria": "string",
            "Regiao_Imediata": "int",
            "Nome_Regiao_Imediata": "string",
            "Municipio": "int",
            "Codigo_Municipio_Completo": "int",
            "Nome_Municipio": "string"
        },
        "renomeacoes": {}
    }
    }



table_configs = {
    "taxa_escolar": {
        "anos": anos_config,
        "pasta": "TAXAS",
        "arquivo": "tx_rend_escolas_",
        "aba": "ESCOLAS",
        "formato": "xlsx",
        "nome_tabela": "taxa_escolar",
        "sobrescrever": False,
        "depara_colunas_bronze": [
            ("0", "NU_ANO_CENSO"), ("1", "NO_REGIAO"), ("2", "SG_UF"), ("3", "CO_MUNICIPIO"),
            ("4", "NO_MUNICIPIO"), ("5", "CO_ENTIDADE"), ("6", "NO_ENTIDADE"),
            ("7", "NO_CATEGORIA"), ("8", "NO_DEPENDENCIA"),
            ("9", "1_CAT_FUN"), ("10", "1_CAT_FUN_AI"), ("11", "1_CAT_FUN_AF"),
            ("12", "1_CAT_FUN_01"), ("13", "1_CAT_FUN_02"), ("14", "1_CAT_FUN_03"),
            ("15", "1_CAT_FUN_04"), ("16", "1_CAT_FUN_05"), ("17", "1_CAT_FUN_06"),
            ("18", "1_CAT_FUN_07"), ("19", "1_CAT_FUN_08"), ("20", "1_CAT_FUN_09"),
            ("21", "1_CAT_MED"), ("22", "1_CAT_MED_01"), ("23", "1_CAT_MED_02"),
            ("24", "1_CAT_MED_03"), ("25", "1_CAT_MED_04"), ("26", "1_CAT_MED_NS"),
            ("27", "2_CAT_FUN"), ("28", "2_CAT_FUN_AI"), ("29", "2_CAT_FUN_AF"),
            ("30", "2_CAT_FUN_01"), ("31", "2_CAT_FUN_02"), ("32", "2_CAT_FUN_03"),
            ("33", "2_CAT_FUN_04"), ("34", "2_CAT_FUN_05"), ("35", "2_CAT_FUN_06"),
            ("36", "2_CAT_FUN_07"), ("37", "2_CAT_FUN_08"), ("38", "2_CAT_FUN_09"),
            ("39", "2_CAT_MED"), ("40", "2_CAT_MED_01"), ("41", "2_CAT_MED_02"),
            ("42", "2_CAT_MED_03"), ("43", "2_CAT_MED_04"), ("44", "2_CAT_MED_NS"),
            ("45", "3_CAT_FUN"), ("46", "3_CAT_FUN_AI"), ("47", "3_CAT_FUN_AF"),
            ("48", "3_CAT_FUN_01"), ("49", "3_CAT_FUN_02"), ("50", "3_CAT_FUN_03"),
            ("51", "3_CAT_FUN_04"), ("52", "3_CAT_FUN_05"), ("53", "3_CAT_FUN_06"),
            ("54", "3_CAT_FUN_07"), ("55", "3_CAT_FUN_08"), ("56", "3_CAT_FUN_09"),
            ("57", "3_CAT_MED"), ("58", "3_CAT_MED_01"), ("59", "3_CAT_MED_02"),
            ("60", "3_CAT_MED_03"), ("61", "3_CAT_MED_04"), ("62", "3_CAT_MED_NS")],
        "schemas": {
            "TAP": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                "colunas_prefixo": "1_CAT",
                "renomeacoes": {
                    "1_CAT_FUN_01": "1_ANO", "1_CAT_FUN_02": "2_ANO", "1_CAT_FUN_03": "3_ANO",
                    "1_CAT_FUN_04": "4_ANO", "1_CAT_FUN_05": "5_ANO", "1_CAT_FUN_06": "6_ANO",
                    "1_CAT_FUN_07": "7_ANO", "1_CAT_FUN_08": "8_ANO", "1_CAT_FUN_09": "9_ANO",
                    "1_CAT_MED_01": "1_SERIE", "1_CAT_MED_02": "2_SERIE", "1_CAT_MED_03": "3_SERIE",
                    "1_CAT_FUN_AI": "AI", "1_CAT_FUN_AF": "AF", "1_CAT_FUN": "FUND", "1_CAT_MED": "MED",
                    "1_CAT_MED_04": "NS_SERIE", "1_CAT_MED_NS": "NS_MED"},
                "substituir_valores": {
                    "valor_original": "--",
                    "valor_substituto": None,
                    "colunas": ["1_ANO", "2_ANO", "3_ANO", "4_ANO", "5_ANO", "6_ANO", "7_ANO", "8_ANO", "9_ANO", "1_SERIE", "2_SERIE", "3_SERIE"]},
                "pivotar_colunas_em_linhas": {
                    "id_vars": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                    "nome_coluna_categoria": "SERIE",
                    "nome_coluna_valor": "TAP"},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE", "COD_SEGMENTO", "COD_SERIE", "TAP"],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "COD_SEGMENTO": "int",
                    "COD_SERIE": "int",
                    "TAP": "float"},
                "relacionamento": {
                    "d_Series": ["SERIE"]}},
            "TRE": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                "colunas_prefixo": "2_CAT",
                "renomeacoes": {
                    "2_CAT_FUN_01": "1_ANO", "2_CAT_FUN_02": "2_ANO", "2_CAT_FUN_03": "3_ANO",
                    "2_CAT_FUN_04": "4_ANO", "2_CAT_FUN_05": "5_ANO", "2_CAT_FUN_06": "6_ANO",
                    "2_CAT_FUN_07": "7_ANO", "2_CAT_FUN_08": "8_ANO", "2_CAT_FUN_09": "9_ANO",
                    "2_CAT_MED_01": "1_SERIE", "2_CAT_MED_02": "2_SERIE", "2_CAT_MED_03": "3_SERIE",
                    "2_CAT_FUN_AI": "AI", "2_CAT_FUN_AF": "AF", "2_CAT_FUN": "FUND", "2_CAT_MED": "MED",
                    "2_CAT_MED_04": "NS_SERIE", "2_CAT_MED_NS": "NS_MED"},
                "substituir_valores": {
                    "valor_original": "--",
                    "valor_substituto": None,
                    "colunas": ["1_ANO", "2_ANO", "3_ANO", "4_ANO", "5_ANO", "6_ANO", "7_ANO", "8_ANO", "9_ANO", "1_SERIE", "2_SERIE", "3_SERIE"]},
                "pivotar_colunas_em_linhas": {
                    "id_vars": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                    "nome_coluna_categoria": "SERIE",
                    "nome_coluna_valor": "TRE"},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE", "COD_SEGMENTO", "COD_SERIE", "TRE"],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "COD_SEGMENTO": "int",
                    "COD_SERIE": "int",
                    "TRE": "float"},
                "relacionamento": {
                    "d_Series": ["SERIE"]}},
            "TAB": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                "colunas_prefixo": "3_CAT",
                "renomeacoes": {
                    "3_CAT_FUN_01": "1_ANO", "3_CAT_FUN_02": "2_ANO", "3_CAT_FUN_03": "3_ANO",
                    "3_CAT_FUN_04": "4_ANO", "3_CAT_FUN_05": "5_ANO", "3_CAT_FUN_06": "6_ANO",
                    "3_CAT_FUN_07": "7_ANO", "3_CAT_FUN_08": "8_ANO", "3_CAT_FUN_09": "9_ANO",
                    "3_CAT_MED_01": "1_SERIE", "3_CAT_MED_02": "2_SERIE", "3_CAT_MED_03": "3_SERIE",
                    "3_CAT_FUN_AI": "AI", "3_CAT_FUN_AF": "AF", "3_CAT_FUN": "FUND", "3_CAT_MED": "MED",
                    "3_CAT_MED_04": "NS_SERIE", "3_CAT_MED_NS": "NS_MED"},
                "substituir_valores": {
                    "valor_original": "--",
                    "valor_substituto": None,
                    "colunas": ["1_ANO", "2_ANO", "3_ANO", "4_ANO", "5_ANO", "6_ANO", "7_ANO", "8_ANO", "9_ANO", "1_SERIE", "2_SERIE", "3_SERIE"]},
                "pivotar_colunas_em_linhas": {
                    "id_vars": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                    "nome_coluna_categoria": "SERIE",
                    "nome_coluna_valor": "TAB"},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE", "COD_SEGMENTO", "COD_SERIE", "TAB"],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "COD_SEGMENTO": "int",
                    "COD_SERIE": "int",
                    "TAB": "float"},
                "relacionamento": {
                    "d_Series": ["SERIE"]}}
        }},
    "alunos_por_turma": {
        "anos": anos_config,
        "pasta": "ATU",
        "arquivo": "ATU_ESCOLAS_",
        "aba": "ESCOLA",
        "formato": "xlsx",
        "nome_tabela": "alunos_por_turma",
        "sobrescrever": False,
        "depara_colunas_bronze": [
            ("0", "NU_ANO_CENSO"),
            ("1", "NO_REGIAO"),
            ("2", "SG_UF"),
            ("3", "CO_MUNICIPIO"),
            ("4", "NO_MUNICIPIO"),
            ("5", "CO_ENTIDADE"),
            ("6", "NO_ENTIDADE"),
            ("7", "NO_CATEGORIA"),
            ("8", "NO_DEPENDENCIA"),
            ("9", "ED_INF_CAT_0"),
            ("10", "CRE_CAT_0"),
            ("11", "PRE_CAT_0"),
            ("12", "FUN_CAT_0"),
            ("13", "FUN_AI_CAT_0"),
            ("14", "FUN_AF_CAT_0"),
            ("15", "FUN_01_CAT_0"),
            ("16", "FUN_02_CAT_0"),
            ("17", "FUN_03_CAT_0"),
            ("18", "FUN_04_CAT_0"),
            ("19", "FUN_05_CAT_0"),
            ("20", "FUN_06_CAT_0"),
            ("21", "FUN_07_CAT_0"),
            ("22", "FUN_08_CAT_0"),
            ("23", "FUN_09_CAT_0"),
            ("24", "MED_CAT_0"),
            ("25", "MED_01_CAT_0"),
            ("26", "MED_02_CAT_0"),
            ("27", "MED_03_CAT_0"),
            ("28", "MED_04_CAT_0"),
            ("29", "MED_NS_CAT_0")],
        "schemas": {
            'ATU': {
                "colunas": [
                    "NU_ANO_CENSO", "CO_ENTIDADE",
                    "FUN_01_CAT_0", "FUN_02_CAT_0", "FUN_03_CAT_0", "FUN_04_CAT_0", "FUN_05_CAT_0",
                    "FUN_06_CAT_0", "FUN_07_CAT_0", "FUN_08_CAT_0", "FUN_09_CAT_0",
                    "MED_01_CAT_0", "MED_02_CAT_0", "MED_03_CAT_0",
                    "CRE_CAT_0", "PRE_CAT_0"],
                "renomeacoes": {
                    "FUN_01_CAT_0": "1_ANO", "FUN_02_CAT_0": "2_ANO", "FUN_03_CAT_0": "3_ANO",
                    "FUN_04_CAT_0": "4_ANO", "FUN_05_CAT_0": "5_ANO", "FUN_06_CAT_0": "6_ANO",
                    "FUN_07_CAT_0": "7_ANO", "FUN_08_CAT_0": "8_ANO", "FUN_09_CAT_0": "9_ANO",
                    "MED_01_CAT_0": "1_SERIE", "MED_02_CAT_0": "2_SERIE", "MED_03_CAT_0": "3_SERIE", "CRE_CAT_0": "CRECHE", "PRE_CAT_0": "PRE_ESCOLA"},
                "substituir_valores": {
                    "valor_original": "--",
                    "valor_substituto": None,
                    "colunas": [
                        "1_ANO", "2_ANO", "3_ANO", "4_ANO", "5_ANO", "6_ANO", "7_ANO", "8_ANO", "9_ANO", "1_SERIE", "2_SERIE", "3_SERIE", "CRECHE", "PRE_ESCOLA"]},
                "pivotar_colunas_em_linhas": {
                    "id_vars": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                    "nome_coluna_categoria": "SERIE",
                    "nome_coluna_valor": "ATU"},
                "schema_final_nomes": ["NU_ANO_CENSO", "CO_ENTIDADE", "COD_SEGMENTO", "COD_SERIE", "ATU"],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "COD_SEGMENTO": "int",
                    "COD_SERIE": "int",
                    "ATU": "float"},
                "relacionamento": {
                    "d_Series": ["SERIE"]
            }}}}, 
    "hora_aula_diaria": {
        "anos": anos_config,
        "pasta": "HAD",
        "arquivo": "HAD_ESCOLAS_",
        "aba": "ESCOLA",
        "formato": "xlsx",
        "nome_tabela": "hora_aula_diaria",
        "sobrescrever": False,
        "depara_colunas_bronze": [
            ("0", "NU_ANO_CENSO"),
            ("1", "NO_REGIAO"),
            ("2", "SG_UF"),
            ("3", "CO_MUNICIPIO"),
            ("4", "NO_MUNICIPIO"),
            ("5", "CO_ENTIDADE"),
            ("6", "NO_ENTIDADE"),
            ("7", "NO_CATEGORIA"),
            ("8", "NO_DEPENDENCIA"),
            ("9", "ED_INF_CAT_0"),
            ("10", "CRE_CAT_0"),
            ("11", "PRE_CAT_0"),
            ("12", "FUN_CAT_0"),
            ("13", "FUN_AI_CAT_0"),
            ("14", "FUN_AF_CAT_0"),
            ("15", "FUN_01_CAT_0"),
            ("16", "FUN_02_CAT_0"),
            ("17", "FUN_03_CAT_0"),
            ("18", "FUN_04_CAT_0"),
            ("19", "FUN_05_CAT_0"),
            ("20", "FUN_06_CAT_0"),
            ("21", "FUN_07_CAT_0"),
            ("22", "FUN_08_CAT_0"),
            ("23", "FUN_09_CAT_0"),
            ("24", "MED_CAT_0"),
            ("25", "MED_01_CAT_0"),
            ("26", "MED_02_CAT_0"),
            ("27", "MED_03_CAT_0"),
            ("28", "MED_04_CAT_0"),
            ("29", "MED_NS_CAT_0")],
        "schemas": {
            "HAD": {
                "colunas": [
                    "NU_ANO_CENSO", "CO_ENTIDADE",
                    "FUN_01_CAT_0", "FUN_02_CAT_0", "FUN_03_CAT_0", "FUN_04_CAT_0", "FUN_05_CAT_0",
                    "FUN_06_CAT_0", "FUN_07_CAT_0", "FUN_08_CAT_0", "FUN_09_CAT_0",
                    "MED_01_CAT_0", "MED_02_CAT_0", "MED_03_CAT_0",
                    "CRE_CAT_0", "PRE_CAT_0"],
                "renomeacoes": {
                    "FUN_01_CAT_0": "1_ANO",
                    "FUN_02_CAT_0": "2_ANO",
                    "FUN_03_CAT_0": "3_ANO",
                    "FUN_04_CAT_0": "4_ANO",
                    "FUN_05_CAT_0": "5_ANO",
                    "FUN_06_CAT_0": "6_ANO",
                    "FUN_07_CAT_0": "7_ANO",
                    "FUN_08_CAT_0": "8_ANO",
                    "FUN_09_CAT_0": "9_ANO",
                    "MED_01_CAT_0": "1_SERIE",
                    "MED_02_CAT_0": "2_SERIE",
                    "MED_03_CAT_0": "3_SERIE",
                    "CRE_CAT_0": "CRECHE",
                    "PRE_CAT_0": "PRE_ESCOLA"},
                "substituir_valores": {
                    "valor_original": "--",
                    "valor_substituto": None,
                    "colunas": [
                        "1_ANO", "2_ANO", "3_ANO", "4_ANO", "5_ANO", "6_ANO", "7_ANO", "8_ANO", "9_ANO",
                        "1_SERIE", "2_SERIE", "3_SERIE", "CRECHE", "PRE_ESCOLA"]},
                "pivotar_colunas_em_linhas": {
                    "id_vars": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                    "nome_coluna_categoria": "SERIE",
                    "nome_coluna_valor": "HAD"},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE", "COD_SEGMENTO", "COD_SERIE", "HAD"],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "COD_SEGMENTO": "int",
                    "COD_SERIE": "int",
                    "HAD": "float"},
                "relacionamento": {
                    "d_Series": ["SERIE"]}
            }
        }},
    "censo_escolar": {
        "anos": anos_config,
        "pasta": "CENSO",
        "arquivo": "microdados_ed_basica_",
        "formato": "csv",
        "nome_tabela": "censo_escolar",
        "sobrescrever": False,
        "depara_colunas_bronze": [],
"schemas": {
    "escolas": {
        "colunas": [
            "NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE",
            "CO_UF", "CO_REGIAO", "CO_MUNICIPIO",
            "SG_UF", "NO_UF", "NO_BAIRRO",
            "TP_DEPENDENCIA", "TP_LOCALIZACAO", "TP_SITUACAO_FUNCIONAMENTO"
        ],
        "schema_final_nomes": [
            "NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE",
            "CO_UF", "CO_REGIAO", "CO_MUNICIPIO",
            "SG_UF", "NO_UF", "NO_BAIRRO",
            "TP_DEPENDENCIA", "TP_LOCALIZACAO", "TP_SITUACAO_FUNCIONAMENTO"
        ],
        "schema_final_tipos": {
            "NU_ANO_CENSO": "int",
            "CO_ENTIDADE": "string",
            "NO_ENTIDADE": "string",
            "NO_BAIRRO": "string",
            "TP_DEPENDENCIA": "int",
            "TP_LOCALIZACAO": "int",
            "TP_SITUACAO_FUNCIONAMENTO": "int",
            "NO_UF": "string",
            "SG_UF": "string",
            "CO_UF": "string",
            "CO_REGIAO": "string",
            "CO_MUNICIPIO": "string"
        }},
            "QT_MATRICULA": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE"],
                "colunas_prefixo": "QT_MAT",
                "renomeacoes": {
                    "QT_MAT_INF_CRE": "CRECHE",
                    "QT_MAT_INF_PRE": "PRE_ESCOLA",
                    "QT_MAT_FUND_AI": "ANOS_INICIAIS",
                    "QT_MAT_FUND_AF": "ANOS_FINAIS",
                    "QT_MAT_MEDIO": "MEDIO"},             
                "substituir_valores": {
                    "valor_original": None,
                    "valor_substituto": None,
                    "colunas": ["CRECHE", "PRE_ESCOLA", "ANOS_INICIAIS", "ANOS_FINAIS"]},
                "pivotar_colunas_em_linhas": {
                    "id_vars": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                    "nome_coluna_categoria": "SERIE",
                    "nome_coluna_valor": "QT_MAT"},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE", "COD_SEGMENTO", "COD_SERIE", "QT_MAT"],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "COD_SEGMENTO": "int",
                    "COD_SERIE": "int",
                    "QT_MAT": "float"},
                "relacionamento": {
                    "d_Series": ["SERIE"]}},
            "QT_TURMAS": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE"],
                "colunas_prefixo": "QT_TUR",
                "renomeacoes": {
                    "QT_TUR_INF_CRE": "CRECHE",
                    "QT_TUR_INF_PRE": "PRE_ESCOLA",
                    "QT_TUR_FUND_AI": "ANOS_INICIAIS",
                    "QT_TUR_FUND_AF": "ANOS_FINAIS",},             
                "substituir_valores": {
                    "valor_original": None,
                    "valor_substituto": None,
                    "colunas": ["CRECHE", "PRE_ESCOLA", "ANOS_INICIAIS", "ANOS_FINAIS"]},
                "pivotar_colunas_em_linhas": {
                    "id_vars": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                    "nome_coluna_categoria": "SERIE",
                    "nome_coluna_valor": "QT_TUR"},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE", "COD_SEGMENTO", "COD_SERIE", "QT_TUR"],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "COD_SEGMENTO": "int",
                    "COD_SERIE": "int",
                    "QT_TUR": "float"},
                "relacionamento": {
                    "d_Series": ["SERIE"]}},           
            "QT_DOCENTES": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE"],
                "colunas_prefixo": "QT_DOC",
                "renomeacoes": {
                    "QT_DOC_INF_CRE": "CRECHE",
                    "QT_DOC_INF_PRE": "PRE_ESCOLA",
                    "QT_DOC_FUND_AI": "ANOS_INICIAIS",
                    "QT_DOC_FUND_AF": "ANOS_FINAIS"},             
                "substituir_valores": {
                    "valor_original": None,
                    "valor_substituto": None,
                    "colunas": ["CRECHE", "PRE_ESCOLA", "ANOS_INICIAIS", "ANOS_FINAIS"]},
                "pivotar_colunas_em_linhas": {
                    "id_vars": ["NU_ANO_CENSO", "CO_ENTIDADE"],
                    "nome_coluna_categoria": "SERIE",
                    "nome_coluna_valor": "QT_DOC"},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE", "COD_SEGMENTO", "COD_SERIE", "QT_DOC"],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "COD_SEGMENTO": "int",
                    "COD_SERIE": "int",
                    "QT_DOC": "float"},
                "relacionamento": {
                    "d_Series": ["SERIE"]}},
            "INFRA_PROFESSORES": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE"],
                "colunas_prefixo": "QT_PROF",
                "renomeacoes": {},             
                "substituir_valores": {},
                "pivotar_colunas_em_linhas": {},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE", "QT_PROF_ADMINISTRATIVOS", "QT_PROF_SERVICOS_GERAIS", "QT_PROF_BIBLIOTECARIO",
                     "QT_PROF_SAUDE", "QT_PROF_COORDENADOR", "QT_PROF_FONAUDIOLOGO", "QT_PROF_NUTRICIONISTA",
                     "QT_PROF_PSICOLOGO", "QT_PROF_ALIMENTACAO", "QT_PROF_PEDAGOGIA", "QT_PROF_SECRETARIO",
                     "QT_PROF_SEGURANCA", "QT_PROF_MONITORES", "QT_PROF_GESTAO", "QT_PROF_ASSIST_SOCIAL",
                     "QT_PROF_TRAD_LIBRAS"],
                "pivotar_colunas_em_linhas": {},
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string"},
                "relacionamento": {}},
            "INFRA_AGUA": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE", 'IN_AGUA_POTAVEL', 'IN_AGUA_REDE_PUBLICA', 'IN_AGUA_POCO_ARTESIANO', 'IN_AGUA_CACIMBA', 'IN_AGUA_FONTE_RIO', 'IN_AGUA_INEXISTENTE'],
                "colunas_prefixo": "IN_AGUA",
                "renomeacoes": {},             
                "substituir_valores": {},
                "pivotar_colunas_em_linhas": {},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE",
                     'IN_AGUA_POTAVEL', 'IN_AGUA_REDE_PUBLICA', 'IN_AGUA_POCO_ARTESIANO', 'IN_AGUA_CACIMBA', 'IN_AGUA_FONTE_RIO', 'IN_AGUA_INEXISTENTE'
                     ], 
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "IN_AGUA_POTAVEL": "boolean",
                    "IN_AGUA_REDE_PUBLICA": "boolean",
                    "IN_AGUA_POCO_ARTESIANO": "boolean",
                    "IN_AGUA_CACIMBA": "boolean",
                    "IN_AGUA_FONTE_RIO": "boolean",
                    "IN_AGUA_INEXISTENTE": "boolean"},
                "relacionamento": {
                    "d_Escolas": ["CO_ENTIDADE"]}},
            "INFRA_ESGOTO": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE", 'IN_ESGOTO_REDE_PUBLICA', 'IN_ESGOTO_FOSSA_SEPTICA', 'IN_ESGOTO_FOSSA_COMUM', 'IN_ESGOTO_FOSSA', 'IN_ESGOTO_INEXISTENTE'],
                "colunas_prefixo": "IN_ESGOTO",
                "renomeacoes": {},             
                "substituir_valores": {},
                "pivotar_colunas_em_linhas": {},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE",
                     'IN_ESGOTO_REDE_PUBLICA', 'IN_ESGOTO_FOSSA_SEPTICA', 'IN_ESGOTO_FOSSA_COMUM', 'IN_ESGOTO_FOSSA', 'IN_ESGOTO_INEXISTENTE'
                     ],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "IN_ESGOTO_REDE_PUBLICA": "boolean",
                    "IN_ESGOTO_FOSSA_SEPTICA": "boolean",
                    "IN_ESGOTO_FOSSA_COMUM": "boolean",
                    "IN_ESGOTO_FOSSA": "boolean",
                    "IN_ESGOTO_INEXISTENTE": "boolean"},
                "relacionamento": {}},
            "INFRA_RECICLAGEM": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE", 'IN_TRATAMENTO_LIXO_SEPARACAO', 'IN_TRATAMENTO_LIXO_REUTILIZA', 'IN_TRATAMENTO_LIXO_RECICLAGEM', 'IN_TRATAMENTO_LIXO_INEXISTENTE'],
                "colunas_prefixo": "IN_TRATAMENTO_LIXO",
                "renomeacoes": {},             
                "substituir_valores": {},
                "pivotar_colunas_em_linhas": {},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE",
                     'IN_TRATAMENTO_LIXO_SEPARACAO', 'IN_TRATAMENTO_LIXO_REUTILIZA', 'IN_TRATAMENTO_LIXO_RECICLAGEM', 'IN_TRATAMENTO_LIXO_INEXISTENTE'
                     ],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "IN_TRATAMENTO_LIXO_SEPARACAO": "boolean",
                    "IN_TRATAMENTO_LIXO_REUTILIZA": "boolean",
                    "IN_TRATAMENTO_LIXO_RECICLAGEM": "boolean",
                    "IN_TRATAMENTO_LIXO_INEXISTENTE": "boolean"},
                "relacionamento": {
                    "d_Escolas": ["CO_ENTIDADE"]}},
            "INFRA_LIXO": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE",  'IN_LIXO_SERVICO_COLETA', 'IN_LIXO_QUEIMA', 'IN_LIXO_ENTERRA', 'IN_LIXO_DESTINO_FINAL_PUBLICO', 'IN_LIXO_DESCARTA_OUTRA_AREA'],
                "colunas_prefixo": "IN_LIXO",
                "renomeacoes": {},             
                "substituir_valores": {},
                "pivotar_colunas_em_linhas": {},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE",
                     'IN_LIXO_SERVICO_COLETA', 'IN_LIXO_QUEIMA', 'IN_LIXO_ENTERRA', 'IN_LIXO_DESTINO_FINAL_PUBLICO', 'IN_LIXO_DESCARTA_OUTRA_AREA'
                     ],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string",
                    "IN_LIXO_SERVICO_COLETA": "boolean",
                    "IN_LIXO_QUEIMA": "boolean",
                    "IN_LIXO_ENTERRA": "boolean",
                    "IN_LIXO_DESTINO_FINAL_PUBLICO": "boolean",
                    "IN_LIXO_DESCARTA_OUTRA_AREA": "boolean"},
                "relacionamento": {
                    "d_Escolas": ["CO_ENTIDADE"]}},
            "INFRA_ACESSIBILIDADE": {
                "colunas": ["NU_ANO_CENSO", "CO_ENTIDADE", "NO_ENTIDADE", 'IN_ACESSIBILIDADE_CORRIMAO', 'IN_ACESSIBILIDADE_ELEVADOR', 'IN_ACESSIBILIDADE_PISOS_TATEIS', 'IN_ACESSIBILIDADE_VAO_LIVRE', 'IN_ACESSIBILIDADE_RAMPAS', 'IN_ACESSIBILIDADE_SINAL_SONORO', 'IN_ACESSIBILIDADE_SINAL_TATIL', 'IN_ACESSIBILIDADE_SINAL_VISUAL', 'IN_ACESSIBILIDADE_INEXISTENTE'],
                "colunas_prefixo": "IN_ACESSIBILIDADE",
                "renomeacoes": {},             
                "substituir_valores": {},
                "pivotar_colunas_em_linhas": {},
                "schema_final_nomes": 
                    ["NU_ANO_CENSO", "CO_ENTIDADE",
                     'IN_ACESSIBILIDADE_CORRIMAO', 'IN_ACESSIBILIDADE_ELEVADOR', 'IN_ACESSIBILIDADE_PISOS_TATEIS', 'IN_ACESSIBILIDADE_VAO_LIVRE', 'IN_ACESSIBILIDADE_RAMPAS', 'IN_ACESSIBILIDADE_SINAL_SONORO', 'IN_ACESSIBILIDADE_SINAL_TATIL', 'IN_ACESSIBILIDADE_SINAL_VISUAL', 'IN_ACESSIBILIDADE_INEXISTENTE'
                     ],
                "schema_final_tipos": {
                    "NU_ANO_CENSO": "int",
                    "CO_ENTIDADE": "string"},
                "relacionamento": {
                    "d_Escolas": ["CO_ENTIDADE"]}}
        
        }},
}

INFO | 00:46:33.9 ___ py4j.clientserver: Received command c on object id p0


In [0]:
def pipeline(spark, table_configs, ano):
    """
        Função principal que orquestra a execução do pipeline ETL em três camadas:
        Bronze → Silver → Gold, utilizando o Spark no Databricks.

        Parâmetros:
        ----------
        spark : SparkSession
            Sessão Spark ativa para execução dos jobs.
        table_configs : dict
            Dicionário de configuração com todas as tabelas do projeto.
        ano : list[int]
            Lista de anos de referência para filtrar os dados.
    """

    # =========================================================================
    # 🧱 ETAPA 0 - Utilitários DBFS (opcional)
    # =========================================================================
    # TransformadorDBFS.identificar_arquivos()             # Lista arquivos disponíveis no DBFS
    # TransformadorDBFS.listar_parquets(deletar=False)     # Lista tabelas Parquet por camada
    # TransformadorDBFS.listar_databases(deletar=True)    # Lista databases Spark disponíveis

    # =========================================================================
    # 🔧 ETAPA 1 - Configuração da Base
    # =========================================================================
    base = TransformadorBase(
        config={
            "table_configs": table_configs,         # Configurações de tabelas externas
            "table_dim_config": table_dim_config    # Configurações de tabelas dimensão
        },
        salvar_em_arquivo=False,                    # Salvar logs em arquivo DBFS? (True/False)
        log_detalhado=False                         # Nível de detalhe nos logs
    )

    # =========================================================================
    # 🔍 ETAPA 2 - Validação Inicial
    # =========================================================================
    base.configurar_logger()        # Configura o logger com timestamp e emojis
    base.importar_bibliotecas()     # Importa bibliotecas requeridas (pandas, spark, tqdm...)
    base.validar_config()           # Valida a estrutura do table_configs

    # =========================================================================
    # 🏗️ ETAPA 3 - Criação dos Databases das Camadas
    # =========================================================================
    # for camada in ["bronze", "silver", "gold"]:
    #     base.inicializar_database(camada)  # Cria os bancos no metastore se não existirem

    # =========================================================================
    # 🪵 ETAPA 4 - Execução dos Pipelines por Camada
    # =========================================================================

    # Bronze: carrega arquivos .xlsx/.csv e salva em Parquet
    TransformadorBronze(
        spark,
        table_configs,
        table_dim_config
    ).executar_pipeline()

    # # Silver: aplica transformações semânticas e joins
    TransformadorPrata(
        spark,
        table_configs,
        table_dim_config
    ).executar_pipeline()

    # # # Gold: cria tabelas analíticas e consolidadas para uso final
    TransformadorGold(
        spark,
        table_configs,
        table_dim_config
    ).executar_pipeline()


# Exemplo de execução:
# -------------------------------------------------------------------------
# Lista de anos a serem processados
ano_censo = [2022, 2023, 2024]

# Execução do pipeline completo
pipeline(spark, table_configs, ano_censo)








INFO | 01:29:54.1 ___ TransformadorBase: 

    ╔══════════════════════════════════════════════════╗
    ║ 1.  Importar Bibliotecas                         ║
    ╚══════════════════════════════════════════════════╝
            
INFO | 01:29:54.1 ___ TransformadorBase: 
╔════════════════════════╦══════════════╗
║       Biblioteca       ║    Versão    ║
╠════════════════════════╬══════════════╣
║ logging                ║      0.5.1.2 ║
║ openpyxl               ║        3.1.5 ║
║ os                     ║          N/A ║
║ pandas                 ║        1.4.2 ║
║ re                     ║        2.2.1 ║
║ spark                  ║        3.3.2 ║
║ time                   ║          N/A ║
║ tqdm                   ║          N/A ║
╠════════════════════════╩══════════════╣
║       Versão do Python: 3.9.21        ║
╚═══════════════════════════════════════╝

INFO | 01:29:54.1 ___ TransformadorBase: 
  ⏱️ Tempo de execução da etapa '1.  Importar Bibliotecas': 0.0 segundos

INFO | 01:29:54.1 ___ Tran

##### 4 - Consultas ao Banco de Dados


###### 4.1 - Camada Bronze - Visão Geral dos Dados Brutos

📌 **Descrição**  
A camada Bronze armazena os **dados brutos**, extraídos diretamente das fontes oficiais do INEP, em seu formato original (CSV, Excel, ODS), antes de qualquer transformação. É a base do pipeline de dados, garantindo **rastreabilidade e integridade histórica**.

📂 **Características principais**
- Dados em formato Parquet.
- Tabelas nomeadas com o padrão: `bronze.nome_tabela_ano`.
- Colunas preservadas conforme estrutura das fontes originais.
- Usada como origem para a construção das camadas Silver e Gold.

---

##### 🔗 Fontes oficiais dos dados

- **Taxas de rendimento escolar**  
  [`https://www.gov.br/inep/pt-br/acesso-a-informacao/dados-abertos/indicadores-educacionais/taxas-de-rendimento-escolar`](https://www.gov.br/inep/pt-br/acesso-a-informacao/dados-abertos/indicadores-educacionais/taxas-de-rendimento-escolar)

- **Microdados do Censo Escolar**  
  [`https://www.gov.br/inep/pt-br/acesso-a-informacao/dados-abertos/microdados/censo-escolar`](https://www.gov.br/inep/pt-br/acesso-a-informacao/dados-abertos/microdados/censo-escolar)

- **Média de alunos por turma**  
  [`https://www.gov.br/inep/pt-br/acesso-a-informacao/dados-abertos/indicadores-educacionais/media-de-alunos-por-turma`](https://www.gov.br/inep/pt-br/acesso-a-informacao/dados-abertos/indicadores-educacionais/media-de-alunos-por-turma)

- **Média de horas-aula diária**  
  [`https://www.gov.br/inep/pt-br/acesso-a-informacao/dados-abertos/indicadores-educacionais/media-de-horas-aula-diaria`](https://www.gov.br/inep/pt-br/acesso-a-informacao/dados-abertos/indicadores-educacionais/media-de-horas-aula-diaria)

---

##### 📁 Exemplos de tabelas na Bronze

- `bronze.taxa_escolar_tab_2023`
- `bronze.taxa_escolar_tap_2023`
- `bronze.taxa_escolar_tre_2023`
- `bronze.alunos_por_turma_2023`
- `bronze.hora_aula_diaria_2023`
- `bronze.censo_escolar_qt_matricula_2023`
- `bronze.censo_escolar_infra_lixo_2023`
- `bronze.d_municipios`

---

🎯 **Objetivo da camada Bronze**  
Ser um repositório confiável e auditável dos dados originais, permitindo reproduzir ou reprocessar qualquer etapa posterior com total rastreabilidade.



In [0]:
%sql
/* 
-- =============================================================
-- 🔎 Verificação das Tabelas nas Camadas Bronze, Silver e Gold
-- =============================================================
-- Etapas:
--   1. Use o comando `SHOW TABLES IN <schema>` para listar as tabelas.
--
-- Observações:
--   ⚠️ Execute apenas um comando por vez.
--   ⚠️ Substitua o schema conforme necessário: bronze | silver | gold.
-- ==========================================================
*/

SHOW TABLES IN bronze;


INFO | 01:34:07.0 ___ py4j.clientserver: Received command c on object id p0


database,tableName,isTemporary
bronze,alunos_por_turma_2022,False
bronze,alunos_por_turma_2023,False
bronze,alunos_por_turma_2024,False
bronze,censo_escolar_2022,False
bronze,censo_escolar_2023,False
bronze,censo_escolar_2024,False
bronze,d_calendario,False
bronze,d_municipios,False
bronze,d_series,False
bronze,hora_aula_diaria_2022,False


###### 4.2 - Camada Silver - Visão Geral das Tabelas

📌 **Descrição**  
A camada Silver representa a modelagem intermediária do pipeline de dados, responsável por padronizar, limpar e estruturar os dados brutos (camada Bronze) em tabelas temáticas, organizadas por tipo de informação (indicadores, infraestrutura, docentes, turmas, etc.). As tabelas já estão preparadas para agregações e joins na camada Gold.

---
##### 📅 `d_calendario`  
**Descrição**: Dimensão de tempo com datas e ciclos escolares.  
**Colunas principais**: `DATA`, `ANO`, `MES`, `DIA_DA_SEMANA`, `FORMATO_YYYYMMDD`.

---

##### 🌍 `d_municipios`  
**Descrição**: Dimensão geográfica com dados oficiais do IBGE.  
**Colunas principais**: `CO_MUNICIPIO`, `SG_UF`, `NO_MUNICIPIO`, `REGIAO_IMEDIATA`.

---

##### 🎓 `d_series`  
**Descrição**: Dimensão com hierarquia educacional por segmento e série.  
**Colunas principais**: `COD_SEGMENTO`, `SEGMENTO`, `COD_SERIE`, `NOME_SERIE`.

---

##### ⏰ `hora_aula_diaria__had`  
**Descrição**: Indicador da carga horária diária por turma.  
**Colunas principais**: `NU_ANO_CENSO`, `CO_ENTIDADE`, `COD_SEGMENTO`, `COD_SERIE`, `HAD`.

---

##### 👨‍🏫 `alunos_por_turma__atu`  
**Descrição**: Indicador de média de alunos por turma por escola, ano e segmento.  
**Exemplo de uso**: Analisar distribuição de alunos e identificar superlotação.  
**Colunas principais**: `NU_ANO_CENSO`, `CO_ENTIDADE`, `COD_SEGMENTO`, `COD_SERIE`, `ATU`.

---

##### 🏫 `censo_escolar__escolas`  
**Descrição**: Cadastro geral das escolas com localização e características administrativas.  
**Colunas principais**: `CO_ENTIDADE`, `SG_UF`, `NO_UF`, `TP_DEPENDENCIA`, `TP_LOCALIZACAO`, `TP_SITUACAO_FUNCIONAMENTO`.

---

##### 📈 `censo_escolar__qt_docentes`  
**Descrição**: Quantidade total de docentes por escola.  
**Colunas principais**: `NU_ANO_CENSO`, `CO_ENTIDADE`, `QT_DOCENTES`.

---

##### 🧒 `censo_escolar__qt_matricula`  
**Descrição**: Total de matrículas por escola, ano e segmento.  
**Colunas principais**: `NU_ANO_CENSO`, `CO_ENTIDADE`, `COD_SEGMENTO`, `COD_SERIE`, `QT_MAT`.

---

##### 🏫 `censo_escolar__qt_turmas`  
**Descrição**: Total de turmas por escola, ano e etapa de ensino.  
**Colunas principais**: `NU_ANO_CENSO`, `CO_ENTIDADE`, `COD_SEGMENTO`, `COD_SERIE`, `QT_TUR`.

---

##### 📘 `taxa_escolar__tab`  
**Descrição**: Taxa de abandono por escola, série e ano.  
**Colunas principais**: `TAB`.

---

##### ✅ `taxa_escolar__tap`  
**Descrição**: Taxa de aprovação por escola, série e ano.  
**Colunas principais**: `TAP`.

---

##### ❌ `taxa_escolar__tre`  
**Descrição**: Taxa de reprovação por escola, série e ano.  
**Colunas principais**: `TRE`.

---

🎯 **Objetivo da camada Silver**  
Consolidar e normalizar os dados do Censo Escolar e indicadores educacionais em tabelas temáticas, garantindo granularidade adequada para transformações analíticas posteriores na camada Gold.


In [0]:
%sql
/* 
-- =============================================================
-- 🔎 Verificação das Tabelas nas Camadas Bronze, Silver e Gold
-- =============================================================
-- Etapas:
--   1. Use o comando `SHOW TABLES IN <schema>` para listar as tabelas.
--
-- Observações:
--   ⚠️ Execute apenas um comando por vez.
--   ⚠️ Substitua o schema conforme necessário: bronze | silver | gold.
-- ==========================================================
*/

SHOW TABLES IN silver;


INFO | 01:34:16.4 ___ py4j.clientserver: Received command c on object id p0


database,tableName,isTemporary
silver,alunos_por_turma__atu,False
silver,censo_escolar__escolas,False
silver,censo_escolar__infra_acessibilidade,False
silver,censo_escolar__infra_agua,False
silver,censo_escolar__infra_esgoto,False
silver,censo_escolar__infra_lixo,False
silver,censo_escolar__infra_professores,False
silver,censo_escolar__infra_reciclagem,False
silver,censo_escolar__qt_docentes,False
silver,censo_escolar__qt_matricula,False


###### 4.3 - Camada Gold - Visão Geral das Tabelas e Views

**Descrição**  
A camada Gold representa a etapa final de modelagem analítica do projeto, onde os dados já foram tratados, integrados e organizados para suportar consultas de alto nível, análises educacionais e visualizações no Power BI. Cada tabela ou view foi desenhada para responder diretamente a perguntas estratégicas relacionadas às metas do Plano Nacional de Educação (PNE).

**Tabelas e Views disponíveis**

##### 📊 `indicadores_por_segmento`  
**Descrição**: Fato principal com indicadores educacionais por escola, série e ano.  
**Usos**: Base para todas as análises quantitativas de desempenho e estrutura.  
**Exemplos de colunas**: `NU_ANO_CENSO`, `CO_ENTIDADE`, `COD_SEGMENTO`, `TAP`, `TRE`, `TAB`, `ATU`, `HAD`, `QT_MAT`, `QT_TUR`, `QT_DOC`.

---

##### 🧮 `vw_indicadores`  
**Descrição**: View integradora com todas as colunas de indicadores, séries, escolas e dados do Censo.  
**Usos**: Base principal para construção de dashboards e análises exploratórias.  
**Observação**: Contém já os nomes padronizados e os dados integrados com segmentação.

---


In [0]:
%sql
/* 
-- =============================================================
-- 🔎 Verificação das Tabelas nas Camadas Bronze, Silver e Gold
-- =============================================================
-- Etapas:
--   1. Use o comando `SHOW TABLES IN <schema>` para listar as tabelas.
--
-- Observações:
--   ⚠️ Execute apenas um comando por vez.
--   ⚠️ Substitua o schema conforme necessário: bronze | silver | gold.
-- ==========================================================
*/

SHOW TABLES IN gold;


INFO | 01:34:24.9 ___ py4j.clientserver: Received command c on object id p0


database,tableName,isTemporary
gold,d_calendario,False
gold,d_municipios,False
gold,d_series,False
gold,indicadores_por_segmento,False


##### 5 - Estudo de Caso

###### 5.1 - View Indicadores

📌 **Descrição**  
Esta view consolida os principais indicadores educacionais por escola, segmento, série e ano, com informações integradas de múltiplas fontes da camada Silver. É a base principal de análise da qualidade, cobertura e estrutura das escolas em atividade no Brasil.

🔗 **Fontes utilizadas**  
- `gold.indicadores_por_segmento`: indicadores por escola, série e ano  
- `silver.d_Series`: segmentação educacional (códigos e nomes)  
- `silver.censo_escolar__escolas`: dados complementares da escola por ano do censo

🧩 **Chaves de junção**  
- `CO_ENTIDADE` + `NU_ANO_CENSO`

📎 **Colunas principais**
| Coluna                     | Descrição                                                   |
|----------------------------|-------------------------------------------------------------|
| `ANO`                      | Ano de referência do Censo Educacional                      |
| `COD_SEGMENTO`             | Código do segmento (ex: EI, EFI, EFII, EM)                  |
| `NOME_SEGMENTO`            | Nome do segmento de ensino                                  |
| `NOME_SERIE`               | Série ou etapa do segmento                                  |
| `ALUNOS_POR_TURMA`         | Quantidade média de alunos por turma                        |
| `HORA_AULA_DIARIA`         | Quantidade média de horas-aula diárias                      |
| `TAXA_APROVACAO`           | Taxa de aprovação no segmento/escola                        |
| `TAXA_REPROVACAO`          | Taxa de reprovação no segmento/escola                       |
| `TAXA_ABANDONO`            | Taxa de abandono escolar                                    |
| `QTD_MATRICULAS`           | Número total de matrículas registradas                      |
| `QTD_TURMAS`               | Número total de turmas registradas                          |
| `QTD_DOCENTES`             | Número total de docentes atuantes                           |
| `UF`, `NOME_UF`            | Unidade da Federação e nome completo do estado              |
| `BAIRRO`                   | Bairro onde a escola está localizada                        |
| `TIPO_DEPENDENCIA`         | Dependência administrativa (Federal, Estadual etc.)         |
| `TIPO_LOCALIZACAO`         | Localização da escola (Urbana, Rural ou Não informado)      |
| `SITUACAO_FUNCIONAMENTO`   | Situação da escola no ano do censo (Ativa, Paralisada etc.) |


🎯 **Utilização recomendada**  
A view é ideal para análises comparativas de indicadores educacionais por tempo, território, dependência administrativa e estrutura escolar. É usada como base para construção das views analíticas da camada Gold, especialmente nas metas do PNE.


In [0]:
%sql
CREATE OR REPLACE VIEW gold.vw_indicadores_completos AS
SELECT DISTINCT 
    ig.NU_ANO_CENSO                 AS ANO,
    se.COD_SEGMENTO                 AS COD_SEGMENTO,
    se.SEGMENTO                     AS NOME_SEGMENTO,
    se.SERIE                        AS NOME_SERIE,
    ig.ATU                          AS ALUNOS_POR_TURMA,
    ig.HAD                          AS HORA_AULA_DIARIA,
    ig.TAB                          AS TAXA_ABANDONO,
    ig.TAP                          AS TAXA_APROVACAO,
    ig.TRE                          AS TAXA_REPROVACAO,
    ig.QT_MAT                       AS QTD_MATRICULAS,
    ig.QT_TUR                       AS QTD_TURMAS,
    ig.QT_DOC                       AS QTD_DOCENTES,
    ce.SG_UF                        AS UF,
    ce.NO_UF                        AS NOME_UF,
    ce.NO_BAIRRO                   AS BAIRRO,

    CASE ce.TP_DEPENDENCIA
        WHEN 1 THEN 'Federal'
        WHEN 2 THEN 'Estadual'
        WHEN 3 THEN 'Municipal'
        WHEN 4 THEN 'Privada'
        ELSE 'Não informado'
    END                            AS TIPO_DEPENDENCIA,

    CASE ce.TP_LOCALIZACAO
        WHEN 1 THEN 'Urbana'
        WHEN 2 THEN 'Rural'
        ELSE 'Não informado'
    END                            AS TIPO_LOCALIZACAO,

    CASE ce.TP_SITUACAO_FUNCIONAMENTO
        WHEN 1 THEN 'Em Atividade'
        WHEN 2 THEN 'Paralisada'
        WHEN 3 THEN 'Extinta (Ano do Censo)'
        WHEN 4 THEN 'Extinta em Anos Anteriores'
        ELSE 'Não informado'
    END                            AS SITUACAO_FUNCIONAMENTO

FROM gold.indicadores_por_segmento ig

LEFT JOIN silver.d_Series se 
       ON ig.COD_SEGMENTO = se.COD_SEGMENTO 
      AND ig.COD_SERIE = se.COD_SERIE

LEFT JOIN (
    SELECT DISTINCT 
        CO_ENTIDADE, NU_ANO_CENSO,
        SG_UF, NO_UF,
        NO_BAIRRO,
        TP_DEPENDENCIA,
        TP_LOCALIZACAO,
        TP_SITUACAO_FUNCIONAMENTO
    FROM silver.censo_escolar__escolas
) ce
    ON ig.CO_ENTIDADE = ce.CO_ENTIDADE
   AND ig.NU_ANO_CENSO = ce.NU_ANO_CENSO;


SELECT * 
FROM gold.vw_indicadores_completos LIMIT 10


INFO | 01:36:12.4 ___ py4j.clientserver: Received command c on object id p0


ANO,COD_SEGMENTO,NOME_SEGMENTO,NOME_SERIE,ALUNOS_POR_TURMA,HORA_AULA_DIARIA,TAXA_ABANDONO,TAXA_APROVACAO,TAXA_REPROVACAO,QTD_MATRICULAS,QTD_TURMAS,QTD_DOCENTES,UF,NOME_UF,BAIRRO,TIPO_DEPENDENCIA,TIPO_LOCALIZACAO,SITUACAO_FUNCIONAMENTO
2022,1,EI,CRECHE,12.0,4.0,,,,12.0,1.0,1.0,RJ,Rio de Janeiro,BELMONTE,Privada,Urbana,Em Atividade
2022,1,EI,CRECHE,9.0,10.5,,,,54.0,6.0,6.0,SP,S�o Paulo,VILA INDUSTRIAL,Municipal,Urbana,Em Atividade
2022,3,EFII,6_ANO,32.0,5.3,0.0,100.0,0.0,,,,SP,S�o Paulo,HEITOR VILLA LOBOS,Privada,Urbana,Em Atividade
2022,3,EFII,6_ANO,,,,,,,,,SP,S�o Paulo,CONJUNTO HABITACIONAL PAULO FREIRE,Municipal,Urbana,Em Atividade
2022,1,EI,CRECHE,,,,,,0.0,0.0,0.0,SP,S�o Paulo,GONZAGA,Privada,Urbana,Em Atividade
2022,3,EFII,6_ANO,,,,,,,,,SP,S�o Paulo,VILA ALBERTINA,Estadual,Urbana,Em Atividade
2022,4,EM,MEDIO,,,,,,,,,RJ,Rio de Janeiro,,Municipal,Rural,Em Atividade
2022,1,EI,CRECHE,9.0,4.5,,,,9.0,1.0,1.0,RJ,Rio de Janeiro,,Municipal,Rural,Em Atividade
2022,4,EM,3_SERIE,,,,,,,,,SP,S�o Paulo,PARQUE VIANA,Municipal,Urbana,Em Atividade
2022,1,EI,PRE_ESCOLA,15.5,4.0,,,,62.0,4.0,4.0,SP,S�o Paulo,JARDIM ROSA BRANCA,Municipal,Urbana,Em Atividade


###### 5.2 -  Desempenho Escolar
**Quais são os 10 estados com melhor desempenho nas taxas escolares em cada ano?**

📌 **Objetivo**:  
Classificar as Unidades da Federação (UF) com base em um *score de qualidade educacional*, calculado como:
> `Taxa de Aprovação - (Taxa de Reprovação + Taxa de Abandono)`

📂 **Fonte**:  
View: `gold.vw_indicadores_completos`

📎 **Filtros aplicados**:  
- `SITUACAO_FUNCIONAMENTO = 'Em Atividade'`  
- Agrupamento por `ANO` e `UF`

🎯 **Indicadores calculados**:  
- Média das taxas de aprovação, reprovação e abandono por ano e UF  
- Score de qualidade educacional por ano e UF  
- Ranking dos 10 melhores estados em cada ano com base no score



In [0]:
%sql
-- View com ranking dos 5 melhores estados por ano com base nas taxas escolares
WITH ranking_uf AS (
    SELECT
        ig.ANO,
        ig.UF AS UF,

        ROUND(AVG(ig.TAXA_APROVACAO), 2)   AS MEDIA_APROVACAO,
        ROUND(AVG(ig.TAXA_REPROVACAO), 2)  AS MEDIA_REPROVACAO,
        ROUND(AVG(ig.TAXA_ABANDONO), 2)    AS MEDIA_ABANDONO,

        -- Score de qualidade: aprovação - (reprovação + abandono)
        ROUND(
            AVG(ig.TAXA_APROVACAO) - AVG(ig.TAXA_REPROVACAO) - AVG(ig.TAXA_ABANDONO),
        2) AS SCORE_QUALIDADE,

        -- Ranking por ano com base no score
        DENSE_RANK() OVER (
            PARTITION BY ig.ANO
            ORDER BY AVG(ig.TAXA_APROVACAO) - AVG(ig.TAXA_REPROVACAO) - AVG(ig.TAXA_ABANDONO) DESC
        ) AS RANK_UF

    FROM gold.vw_indicadores_completos ig
    WHERE ig.SITUACAO_FUNCIONAMENTO = 'Em Atividade'
    GROUP BY ig.ANO, ig.UF
)

SELECT *
FROM ranking_uf
WHERE RANK_UF <= 10
ORDER BY ANO, RANK_UF;


INFO | 01:37:04.2 ___ py4j.clientserver: Received command c on object id p0


ANO,UF,MEDIA_APROVACAO,MEDIA_REPROVACAO,MEDIA_ABANDONO,SCORE_QUALIDADE,RANK_UF
2022,CE,98.85,0.62,0.53,97.69,1
2022,SP,97.58,1.77,0.65,95.16,2
2022,GO,97.11,2.38,0.5,94.23,3
2022,DF,95.53,3.9,0.57,91.05,4
2022,MT,95.23,3.2,1.57,90.47,5
2022,MG,94.95,3.5,1.55,89.91,6
2022,PR,94.95,4.27,0.78,89.9,7
2022,ES,94.87,4.72,0.41,89.75,8
2022,PE,94.77,4.39,0.84,89.54,9
2022,AL,94.58,3.94,1.48,89.16,10


###### 5.3 - Qualidade do Atendimento Escolar  
**Pergunta 3**: Quais são os 10 estados com melhor estrutura de atendimento escolar, por segmento e ano?

📌 **Objetivo**:  
Avaliar a qualidade da estrutura educacional com base na **intensidade do atendimento pedagógico**, considerando:
- Segmento de ensino (`NOME_SEGMENTO`)
- Unidade da Federação (`UF`)

📂 **Fonte**:  
View: `gold.vw_indicadores_completos`

📎 **Filtros aplicados**:  
- `SITUACAO_FUNCIONAMENTO = 'Em Atividade'`  
- `TIPO_DEPENDENCIA = 'Municipal'`  
- `TIPO_LOCALIZACAO = 'Urbana'`  
- Exclusão de nulos em `ALUNOS_POR_TURMA` e `HORA_AULA_DIARIA`

🎯 **Indicadores calculados**:  
- Média de alunos por turma (`ATU`)  
- Média de horas-aula diárias (`HAD`)  
- Score de atendimento (`HAD / ATU`)  
- Ranking dos 10 estados com melhor estrutura, por segmento e ano

📊 **Avaliação dos Resultados — Educação Infantil (2023)**  
Em 2023, para o segmento da **Educação Infantil (EI)** em redes **municipais e urbanas**, os estados com **melhores condições estruturais** foram:

🔝 **Destaques**:
- **Goiás (GO)** liderou com score de **0.9474**, resultado de turmas muito reduzidas (4,75 alunos) e 4,5 horas-aula diárias.
- **Santa Catarina (SC)** e **Rio Grande do Sul (RS)** mantiveram boa combinação entre carga horária e tamanho de turma.
- **São Paulo (SP)** e **Rio de Janeiro (RJ)**, apesar da boa carga horária, apresentaram scores mais baixos devido ao maior número de alunos por turma.

📌 O score `HAD / ATU` reflete o **potencial de atenção pedagógica por aluno**. Quanto maior o valor, **melhor a capacidade de atendimento individualizado**.


In [0]:
%sql
-- View com ranking dos 10 estados por atendimento escolar (HORA_AULA_DIARIA / ALUNOS_POR_TURMA) por segmento
WITH atendimento_uf AS (
    SELECT
        ig.ANO,
        ig.NOME_SEGMENTO,
        ig.UF,

        ROUND(AVG(ig.ALUNOS_POR_TURMA), 2) AS MEDIA_ALUNOS_POR_TURMA,
        ROUND(AVG(ig.HORA_AULA_DIARIA), 2) AS MEDIA_HORA_AULA_DIARIA,

        -- Score de atendimento: mais HORA_AULA_DIARIA e menos alunos por turma
        ROUND(AVG(ig.HORA_AULA_DIARIA) / NULLIF(AVG(ig.ALUNOS_POR_TURMA), 0), 4) AS SCORE_ATENDIMENTO,

        DENSE_RANK() OVER (
            PARTITION BY ig.ANO, ig.NOME_SEGMENTO
            ORDER BY AVG(ig.HORA_AULA_DIARIA) / NULLIF(AVG(ig.ALUNOS_POR_TURMA), 0) DESC
        ) AS RANK_UF

    FROM gold.vw_indicadores_completos ig
    WHERE ig.SITUACAO_FUNCIONAMENTO = 'Em Atividade'
      AND ig.ALUNOS_POR_TURMA IS NOT NULL
      AND ig.HORA_AULA_DIARIA IS NOT NULL
      AND ig.TIPO_DEPENDENCIA = 'Municipal'
      AND ig.TIPO_LOCALIZACAO = 'Urbana'
    GROUP BY ig.ANO, ig.UF, ig.NOME_SEGMENTO
)

SELECT *
FROM atendimento_uf
WHERE RANK_UF <= 10
  AND NOME_SEGMENTO IN ("EI")
ORDER BY ANO, NOME_SEGMENTO, RANK_UF;


INFO | 01:59:26.8 ___ py4j.clientserver: Received command c on object id p0


ANO,NOME_SEGMENTO,UF,MEDIA_ALUNOS_POR_TURMA,MEDIA_HORA_AULA_DIARIA,SCORE_ATENDIMENTO,RANK_UF
2022,EI,RS,15.53,6.97,0.4488,1
2022,EI,SC,16.56,7.23,0.4364,2
2022,EI,PR,17.4,6.95,0.3993,3
2022,EI,SP,18.27,7.18,0.3931,4
2022,EI,MG,16.77,6.11,0.3642,5
2022,EI,GO,19.62,6.75,0.344,6
2022,EI,RJ,18.58,5.93,0.3192,7
2022,EI,PB,19.94,6.31,0.3166,8
2022,EI,ES,16.57,5.23,0.3156,9
2022,EI,BA,18.77,5.79,0.3085,10


###### 5.4 - Distribuição de Matrículas por Segmento (2022)  
**Pergunta 4**: Quais são os estados com maior número médio de matrículas por segmento no ano de 2022?

📌 **Objetivo**:  
Analisar a concentração de matrículas nas redes estaduais, municipais, federais e privadas em **2022**, destacando os estados com maior volume médio de alunos por tipo de escola e localização.

📂 **Fonte**:  
Tabela: `gold.indicadores_por_segmento`  
Dimensões: `silver.censo_escolar__escolas`, `silver.d_Series`

📎 **Filtros aplicados**:  
- `NU_ANO_CENSO = 2022`  
- Apenas registros com valores válidos em `QT_MAT`  
- Segmento filtrado como `'EI'` (Educação Infantil)

🎯 **Indicadores calculados**:  
- Média da quantidade de matrículas por UF (`SG_UF`), tipo de dependência administrativa e localização  
- Ranking dos estados com maior média de matrículas na Educação Infantil em 2022


In [0]:
%sql
-- Consulta da média de matrículas por UF, tipo de dependência e localização no segmento EI
SELECT
    ce.SG_UF AS UF, 

    CASE ce.TP_DEPENDENCIA
        WHEN 1 THEN 'Federal'
        WHEN 2 THEN 'Estadual'
        WHEN 3 THEN 'Municipal'
        WHEN 4 THEN 'Privada'
        ELSE 'Não informado'
    END AS TIPO_DEPENDENCIA,

    CASE ce.TP_LOCALIZACAO
        WHEN 1 THEN 'Urbana'
        WHEN 2 THEN 'Rural'
        ELSE 'Não informado'
    END AS TIPO_LOCALIZACAO,

    ROUND(AVG(ig.QT_MAT), 2) AS MEDIA_MATRICULAS

FROM gold.indicadores_por_segmento ig

LEFT JOIN silver.censo_escolar__escolas ce
  ON ig.CO_ENTIDADE = ce.CO_ENTIDADE AND ig.NU_ANO_CENSO = ce.NU_ANO_CENSO

LEFT JOIN silver.d_Series se
  ON ig.COD_SEGMENTO = se.COD_SEGMENTO AND ig.COD_SERIE = se.COD_SERIE

WHERE ig.NU_ANO_CENSO = 2022
  AND ig.QT_MAT IS NOT NULL
  AND se.SEGMENTO = 'EI'
  -- AND ce.TP_LOCALIZACAOipo = 'Municipal'
  -- AND ce.TP_DEPENDENCIA = 'Urbana'

GROUP BY
    ce.SG_UF,
    TIPO_DEPENDENCIA,
    TIPO_LOCALIZACAO

ORDER BY MEDIA_MATRICULAS DESC;


INFO | 01:57:54.0 ___ py4j.clientserver: Received command c on object id p0


UF,TIPO_DEPENDENCIA,TIPO_LOCALIZACAO,MEDIA_MATRICULAS
MT,Municipal,Urbana,73.5
MS,Municipal,Urbana,66.48
AC,Municipal,Urbana,62.81
AM,Municipal,Urbana,60.74
SP,Municipal,Urbana,58.28
RR,Municipal,Urbana,57.84
RO,Municipal,Urbana,56.76
RR,Privada,Urbana,56.35
SP,Federal,Rural,54.5
AP,Municipal,Urbana,53.94


###### 5.4 - Quantidade de Turmas por Estado e Segmento  
**Pergunta 4**: Quais estados apresentam maior número médio de turmas por segmento?

📌 **Objetivo**:  
Analisar quais redes concentram maior oferta de turmas, segmentadas por estado e etapa de ensino.

📂 **Fonte**:  
View: `gold.vw_indicadores_completos`

📎 **Filtros aplicados**:  
- Apenas escolas com `SITUACAO_FUNCIONAMENTO = 'Em Atividade'`  
- Valores não nulos em `QT_TURMAS`  
- Segmentação por `UF` e `NOME_SEGMENTO`

🎯 **Indicadores calculados**:  
- Média da quantidade de turmas por estado e segmento  
- Ranking dos estados com maior oferta média de turmas


In [0]:
%sql
-- Consulta da média de turmas por UF e segmento para escolas em atividade
SELECT
    ig.ANO,
    ig.UF,
    ig.NOME_SEGMENTO,
    ROUND(AVG(ig.QTD_TURMAS), 2) AS MEDIA_TURMAS

FROM gold.vw_indicadores_completos ig

WHERE ig.SITUACAO_FUNCIONAMENTO = 'Em Atividade'
  AND ig.QTD_TURMAS IS NOT NULL

GROUP BY ig.ANO, ig.UF, ig.NOME_SEGMENTO
ORDER BY ig.ANO, MEDIA_TURMAS DESC;


INFO | 02:02:09.3 ___ py4j.clientserver: Received command c on object id p0


ANO,UF,NOME_SEGMENTO,MEDIA_TURMAS
2022,DF,EFI,11.56
2022,DF,EFIi,9.07
2022,MS,EFI,6.4
2022,SP,EFI,6.24
2022,RJ,EFI,6.07
2022,GO,EFI,5.96
2022,MG,EFI,5.96
2022,RO,EFI,5.84
2022,AL,EFI,5.79
2022,PR,EFI,5.69
