In [0]:

# Required each time the cluster is restarted which should be only on the first notebook as they run in order 
tiers = ["bronze", "silver", "gold"]
adls_paths = {tier: f"abfss://{tier}@ibgecase.dfs.core.windows.net/" for tier in tiers}

# Accessing paths 
bronze_adls = adls_paths ["bronze"] 
silver_adls = adls_paths ["silver"] 
gold_adls = adls_paths["gold"]

dbutils.fs.ls (bronze_adls)
#dbutils.fs.ls(silver_adls) 
#dbutils.fs.ls(gold_adls)

[]

In [0]:
import requests
from pyspark.sql import SparkSession

# Criar sessão Spark
spark = SparkSession.builder.appName("IBGE_PIB").getOrCreate()

# URL oficial IBGE (PIB Municipal)
url_pib = "https://servicodados.ibge.gov.br/api/v3/agregados/5938/periodos/all/variaveis/all?localidades=N1[all]"
url_pop = "https://servicodados.ibge.gov.br/api/v3/agregados/6579/periodos/all/variaveis/all?localidades=N1[all]"


# Requisição
res = requests.get(url_pib)

if res.status_code == 200:
    data = res.json()
    rows = []
    for item in data:
        var_id = item["id"]
        variavel = item["variavel"]
        unidade = item["unidade"]

        for resultado in item["resultados"]:
            for serie in resultado["series"]:
                localidade = serie["localidade"]["nome"]
                for ano, valor in serie["serie"].items():
                    rows.append({
                        "id": var_id,
                        "variavel": variavel,
                        "unidade": unidade,
                        "localidade": localidade,
                        "ano": int(ano),
                        "valor": None if valor == "-" else float(valor)
                    })

    # Criar DataFrame Spark
    df = spark.createDataFrame(rows)

    # Visualizar
    df.show(10, truncate=False)
    df.printSchema()

    #salvando no blob storage
    df = df.coalesce(1)  #salva um unico arquivo parquet
    df.write.format("parquet").mode("overwrite").save(bronze_adls + "pib_municipal.parquet")
else:
    print(f"Erro {res.status_code}: {res.text}")


+----+---+----------+---------+-------------+----------------------------------------+
|ano |id |localidade|unidade  |valor        |variavel                                |
+----+---+----------+---------+-------------+----------------------------------------+
|2002|37 |Brasil    |Mil Reais|1.488787276E9|Produto Interno Bruto a preços correntes|
|2003|37 |Brasil    |Mil Reais|1.717950386E9|Produto Interno Bruto a preços correntes|
|2004|37 |Brasil    |Mil Reais|1.957751224E9|Produto Interno Bruto a preços correntes|
|2005|37 |Brasil    |Mil Reais|2.170584503E9|Produto Interno Bruto a preços correntes|
|2006|37 |Brasil    |Mil Reais|2.409449916E9|Produto Interno Bruto a preços correntes|
|2007|37 |Brasil    |Mil Reais|2.720262951E9|Produto Interno Bruto a preços correntes|
|2008|37 |Brasil    |Mil Reais|3.109803097E9|Produto Interno Bruto a preços correntes|
|2009|37 |Brasil    |Mil Reais|3.333039339E9|Produto Interno Bruto a preços correntes|
|2010|37 |Brasil    |Mil Reais|3.885847E9  

In [0]:
url_pop = "https://servicodados.ibge.gov.br/api/v3/agregados/6579/periodos/all/variaveis/all?localidades=N1[all]"


# Requisição
res = requests.get(url_pop)

if res.status_code == 200:
    data = res.json()
    rows = []
    for item in data:
        var_id = item["id"]
        variavel = item["variavel"]
        unidade = item["unidade"]

        for resultado in item["resultados"]:
            for serie in resultado["series"]:
                localidade = serie["localidade"]["nome"]
                for ano, valor in serie["serie"].items():
                    rows.append({
                        "id": var_id,
                        "variavel": variavel,
                        "unidade": unidade,
                        "localidade": localidade,
                        "ano": int(ano),
                        "valor": None if valor == "-" else float(valor)
                    })

    # Criar DataFrame Spark
    df = spark.createDataFrame(rows)

    # Visualizar
    df.show(10, truncate=False)
    df.printSchema()

    #salvando no blob storage
    df = df.coalesce(1)  #salva um unico arquivo parquet
    df.write.format("parquet").mode("overwrite").save(bronze_adls + "pop_municipal.parquet")
else:
    print(f"Erro {res.status_code}: {res.text}")


+----+----+----------+-------+------------+----------------------------+
|ano |id  |localidade|unidade|valor       |variavel                    |
+----+----+----------+-------+------------+----------------------------+
|2001|9324|Brasil    |Pessoas|1.72385826E8|População residente estimada|
|2002|9324|Brasil    |Pessoas|1.7463296E8 |População residente estimada|
|2003|9324|Brasil    |Pessoas|1.76871437E8|População residente estimada|
|2004|9324|Brasil    |Pessoas|1.81569056E8|População residente estimada|
|2005|9324|Brasil    |Pessoas|1.84184264E8|População residente estimada|
|2006|9324|Brasil    |Pessoas|1.86770562E8|População residente estimada|
|2008|9324|Brasil    |Pessoas|1.89605006E8|População residente estimada|
|2009|9324|Brasil    |Pessoas|1.9148063E8 |População residente estimada|
|2011|9324|Brasil    |Pessoas|1.92379287E8|População residente estimada|
|2012|9324|Brasil    |Pessoas|1.93904015E8|População residente estimada|
+----+----+----------+-------+------------+--------