In [None]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [None]:
url = "https://servicodados.ibge.gov.br/api/v3/agregados/1839/periodos/2022/variaveis/630?localidades=N1[all]"

In [None]:
def get_data_from_url(url):
    response = requests.get(url=url)
    response.raise_for_status()
    return response.json()

def create_spark_session():
    return SparkSession.builder.getOrCreate()

def create_dataframe(spark, data):
    schema = T.StructType([
        T.StructField("id", T.StringType(), True),
        T.StructField("variavel", T.StringType(), True),
        T.StructField("unidade", T.StringType(), True),
        T.StructField("resultados", T.ArrayType(T.StructType([
            T.StructField("classificacoes", T.ArrayType(T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("nome", T.StringType(), True),
                T.StructField("categoria", T.MapType(T.StringType(), T.StringType()), True)
            ])), True),
            T.StructField("series", T.ArrayType(T.StructType([
                T.StructField("localidade", T.StructType([
                    T.StructField("id", T.StringType(), True),
                    T.StructField("nivel", T.StructType([
                        T.StructField("id", T.StringType(), True),
                        T.StructField("nome", T.StringType(), True)
                    ]), True),
                    T.StructField("nome", T.StringType(), True)
                ]), True),
                T.StructField("serie", T.MapType(T.StringType(), T.StringType()), True)
            ])), True)
        ])), True)
    ])
    return spark.createDataFrame(data, schema)

def process_dataframe(df):
    df_exploded_resultados = df.withColumn("resultado", F.explode("resultados"))
    df_exploded_classificacoes = df_exploded_resultados.withColumn("classificacao", F.explode("resultado.classificacoes"))
    df_exploded_series = df_exploded_classificacoes.withColumn("serie", F.explode("resultado.series"))
    df_out = df_exploded_series.select(
        F.col("id").alias("id"),
        F.col("variavel").alias("variavel"),
        F.col("unidade").alias("unidade"),
        F.col("classificacao.id").alias("classificacao_id"),
        F.col("classificacao.nome").alias("classificacao_nome"),
        F.col("classificacao.categoria").alias("classificacao_categoria"),
        F.col("serie.localidade.id").alias("localidade_id"),
        F.col("serie.localidade.nivel.id").alias("localidade_nivel_id"),
        F.col("serie.localidade.nivel.nome").alias("localidade_nivel_nome"),
        F.col("serie.localidade.nome").alias("localidade_nome"),
        F.col("serie.serie").alias("serie")
    )
    return df_out

In [None]:
#main

data = get_data_from_url(url)
spark = create_spark_session()
df = create_dataframe(spark, data)
df_out = process_dataframe(df)
df_out.show(truncate=False)