In [1]:
from pyspark.sql import SparkSession
import logging

# Configuração de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def create_ibge_spark_session():
    """
    Cria uma sessão Spark otimizada para processar XML da API do IBGE
    """
    try:
        # Verifica se já existe uma sessão Spark
        existing_spark = SparkSession.getActiveSession()
        if existing_spark is not None:
            logger.info("Utilizando sessão Spark existente")
            return existing_spark
        
        logger.info("Criando nova sessão Spark para processamento IBGE")
        
        spark = (
            SparkSession.builder
            .appName("ibge-api-processor")
            .master("spark://spark-master:7077")
            .config("spark.driver.bindAddress", "0.0.0.0")
            .config("spark.driver.host", "datalab-jupyter-notebook")
            # Versão compatível do spark-xml
            .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0")
            # Otimizações para XML com muitos elementos nulos
            .config("spark.sql.xml.jackson.options.nullValue", "")
            .config("spark.sql.xml.jackson.options.emptyStringValue", "")
            # Otimizações de performance
            .config("spark.sql.adaptive.enabled", "true")
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
            .config("spark.sql.inMemoryColumnarStorage.compressed", "true")
            .config("spark.sql.parquet.compression.codec", "snappy")
            .getOrCreate()
        )
        
        logger.info(f"Sessão Spark criada com sucesso - Versão: {spark.version}")
        return spark
        
    except Exception as e:
        logger.error(f"Falha ao criar sessão Spark: {str(e)}")
        raise

# Inicializa a sessão Spark
try:
    spark
except NameError:
    spark = create_ibge_spark_session()

INFO:__main__:Criando nova sessão Spark para processamento IBGE


:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-006d8f28-3965-440c-beca-4fc26b20fbbf;1.0
	confs: [default]
	found com.databricks#spark-xml_2.12;0.17.0 in central
	found commons-io#commons-io;2.11.0 in central
	found org.glassfish.jaxb#txw2;3.0.2 in central
	found org.apache.ws.xmlschema#xmlschema-core;2.3.0 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.9.0 in central
:: resolution report :: resolve 220ms :: artifacts dl 9ms
	:: modules in use:
	com.databricks#spark-xml_2.12;0.17.0 from central in [default]
	commons-io#commons-io;2.11.0 from central in [default]
	org.apache.ws.xmlschema#xmlschema-core;2.3.0 from central in [default]
	org.glassfish.jaxb#txw2;3.0.2 from central in [default]
	org.scala-lang.modules#scala-collection-compat_2.12;2.9.0 from central in [default]
	-----------------------

In [2]:
def read_ibge_json_data(
    spark,
    api_url: str,
    rename: bool = True,
    cast_types: bool = True,
    timeout: int = 30,
):
    """
    Lê dados da API IBGE (formato JSON do SIDRA) e devolve um DataFrame Spark.
    Assume que o 1º item do array são os 'cabeçalhos' e os demais são linhas.
    """
    import requests
    from pyspark.sql import functions as F

    # 1) Baixa JSON
    resp = requests.get(api_url, timeout=timeout)
    resp.raise_for_status()
    data = resp.json()

    if not (isinstance(data, list) and len(data) > 1 and isinstance(data[0], dict)):
        raise ValueError("Formato inesperado: esperado [headers, row1, row2, ...].")

    headers = data[0]           # ex.: {"D1C": "Unidade da Federação (Código)", ...}
    records = data[1:]          # linhas de dados (dicts com D1C, D1N, ..., V)

    # 2) Cria DF (usa as chaves presentes nas linhas)
    #    Às vezes algumas colunas não vêm em todas as linhas -> usa união de chaves
    all_cols = sorted({k for r in records for k in r.keys()})
    df = spark.createDataFrame(records).select(*[c for c in all_cols])

    # 3) Opcional: renomeia para nomes mais amigáveis
    if rename:
        rename_map = {
            "D1C": "uf_cod",          "D1N": "uf",
            "D2C": "var_cod",         "D2N": "variavel",
            "D3C": "ano",             "D3N": "ano_label",
            "D4C": "sexo_cod",        "D4N": "sexo",
            "D5C": "idade_cod",       "D5N": "idade",
            "D6C": "forma_idade_cod", "D6N": "forma_idade",
            "MC":  "medida_cod",      "MN":  "unidade_medida",
            "NC":  "nivel_terr_cod",  "NN":  "nivel_terr",
            "V":   "valor",
            # D7*/D8*/D9* geralmente vêm nulos; ignore se não existirem
        }
        for old, new in rename_map.items():
            if old in df.columns:
                df = df.withColumnRenamed(old, new)

    # 4) Opcional: cast de tipos (códigos -> int; valor -> numérico)
    if cast_types:
        def cast_if(colname, t):
            from pyspark.sql import functions as F
            if colname in df.columns:
                return df.withColumn(colname, F.col(colname).cast(t))
            return df

        int_cols = ["D1C","D2C","D3C","D4C","D5C","D6C","MC","NC"]
        # se renomeou, ajuste a lista:
        int_cols = [c if c in df.columns else {
            "D1C":"uf_cod","D2C":"var_cod","D3C":"ano","D4C":"sexo_cod",
            "D5C":"idade_cod","D6C":"forma_idade_cod","MC":"medida_cod","NC":"nivel_terr_cod"
        }.get(c, c) for c in int_cols]

        for c in int_cols:
            if c in df.columns:
                df = df.withColumn(c, F.col(c).cast("int"))

        # Valor pode vir como string; tenta long e, se falhar, double
        val_col = "valor" if "valor" in df.columns else "V"
        if val_col in df.columns:
            # troca vírgula decimal por ponto, remove separadores se aparecerem
            df = df.withColumn(val_col, F.regexp_replace(F.col(val_col), r"[.]", ""))
            df = df.withColumn(val_col, F.regexp_replace(F.col(val_col), r",", "."))
            df = df.withColumn(val_col, F.col(val_col).cast("double"))

    return df


In [10]:
# URL exemplo da API IBGE (substitua pela URL real)
ibge_api_url = "https://apisidra.ibge.gov.br/values/t/9514/n3/all/v/allxp/p/all/c2/allxt/c287/100362/c286/113635"

df = read_ibge_json_data(spark, ibge_api_url)
df.show(5, truncate=False)

                                                                                

+------+--------+-------+-------------------+----+---------+--------+--------+---------+-----+---------------+-----------+----------+--------------+--------------+--------------------+---------+
|uf_cod|uf      |var_cod|variavel           |ano |ano_label|sexo_cod|sexo    |idade_cod|idade|forma_idade_cod|forma_idade|medida_cod|unidade_medida|nivel_terr_cod|nivel_terr          |valor    |
+------+--------+-------+-------------------+----+---------+--------+--------+---------+-----+---------------+-----------+----------+--------------+--------------+--------------------+---------+
|11    |Rondônia|93     |População residente|2022|2022     |4       |Homens  |100362   |Total|113635         |Total      |45        |Pessoas       |3             |Unidade da Federação|787987.0 |
|11    |Rondônia|93     |População residente|2022|2022     |5       |Mulheres|100362   |Total|113635         |Total      |45        |Pessoas       |3             |Unidade da Federação|793209.0 |
|12    |Acre    |93     |

In [7]:
import os
hc = spark._jsc.hadoopConfiguration()


endpoint   = os.getenv("MINIO_ENDPOINT", "http://minio:9000")
access_key = os.getenv("MINIO_ROOT_USER")
secret_key = os.getenv("MINIO_ROOT_PASSWORD", "minioadmin")

hc.set("fs.s3a.endpoint", os.getenv("MINIO_ENDPOINT", "http://minio:9000"))
hc.set("fs.s3a.path.style.access", "true")
hc.set("fs.s3a.connection.ssl.enabled", "false")
hc.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hc.set("fs.s3a.access.key", os.getenv("MINIO_ROOT_USER", "minioadmin"))
hc.set("fs.s3a.secret.key", os.getenv("MINIO_ROOT_PASSWORD", "minioadmin"))


In [11]:
out_path = "s3a://lake/bronze/ibge/populacao/ano=2022/"

(df
 .coalesce(1)  # opcional: menos arquivos p/ teste; remova em produção
 .write
 .mode("overwrite")
 .parquet(out_path)
)

print("Escrito em:", out_path)

25/08/28 15:43:58 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

Escrito em: s3a://lake/bronze/ibge/populacao/ano=2022/


In [12]:
spark.stop()