In [200]:
# Para iniciar a seção spark
from pyspark.sql import SparkSession 

# Para estruturar e "tipar" os dados
from pyspark.sql.types import StructType, StructField, StringType, MapType, ArrayType
from pyspark.sql.functions import col, explode

# Para coleta dos dados
import requests

In [201]:
# url para o cojunto de dados 
url = "https://servicodados.ibge.gov.br/api/v3/agregados/1839/periodos/2022/variaveis/630?localidades=N1[all]"

### Criando a seção Spark

In [202]:
# Inicia a seção do Spark
spark = SparkSession.builder \
    .appName("DAG__pesquisa_industrial_anual_empresas") \
    .getOrCreate()

### Coletando os dados em formato json

In [203]:
# Coleta os dados RAW com tratamento de erro
try:
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"Falha na requisição. Status code: {response.status_code}")
    data = response.json()
except Exception as e:
    print(f"Ocorreu um erro: {e}")

### Schema utilizado para estruturar os dados

In [204]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("variavel", StringType(), True),
    StructField("unidade", StringType(), True),
    StructField("resultados", ArrayType(StructType([
        StructField("classificacoes", ArrayType(StructType([
            StructField("id", StringType(), True),
            StructField("nome", StringType(), True),
            StructField("categoria", MapType(StringType(), StringType()), True)
        ])), True),
        StructField("series", ArrayType(StructType([
            StructField("localidade", StructType([
                StructField("id", StringType(), True),
                StructField("nivel", StructType([
                    StructField("id", StringType(), True),
                    StructField("nome", StringType(), True)
                ]), True),
                StructField("nome", StringType(), True)
            ]), True),
            StructField("serie", MapType(StringType(), StringType()), True)
        ])), True)
    ])), True)
])

### Processo de estruturação dos dados

In [205]:
rdd = spark.sparkContext.parallelize(data)

In [206]:
df = spark.read.json(rdd, schema)
df = spark.createDataFrame(data, schema = schema)

In [207]:
df = df.withColumn("resultados", explode(col("resultados"))) \
                .withColumn("classificacoes", explode(col("resultados.classificacoes"))) \
                .withColumn("series", explode(col("resultados.series")))

In [208]:
df_mapped = df.select(
    col("id").alias("ID"),
    col("variavel").alias("VARIÁVEL"),
    col("unidade").alias("UNIDADE"),
    col("series.localidade.id").alias("LOC_ID"),
    col("series.localidade.nome").alias("LOC_NOME"),
    col("series.localidade.nivel.id").alias("LOC_NIVEL_ID"),
    col("series.localidade.nivel.nome").alias("LOC_NIVEL_NOME"),
    col("series.serie").alias("SERIE"),
    col("classificacoes.id").alias("CLSFC_ID"),
    col("classificacoes.nome").alias("CLSFC_NOME"),
    col("classificacoes.categoria").alias("CATEGORIA"),
)

In [209]:
df_mapped = df_mapped.select(
    "*", 
    explode("SERIE").alias("SERIE_KEY", "SERIE_VALUE")
)

df_mapped = df_mapped.select(
    "*",
    explode("CATEGORIA").alias("CATEGORIA_KEY", "CATEGORIA_VALUE")
)

In [210]:
df_finalistico = df_mapped.drop("SERIE", "CATEGORIA")

In [211]:
df_finalistico.show()

+---+------------------+--------+------+--------+------------+--------------+--------+--------------------+---------+-----------+-------------+---------------+
| ID|          VARIÁVEL| UNIDADE|LOC_ID|LOC_NOME|LOC_NIVEL_ID|LOC_NIVEL_NOME|CLSFC_ID|          CLSFC_NOME|SERIE_KEY|SERIE_VALUE|CATEGORIA_KEY|CATEGORIA_VALUE|
+---+------------------+--------+------+--------+------------+--------------+--------+--------------------+---------+-----------+-------------+---------------+
|630|Número de empresas|Unidades|     1|  Brasil|          N1|        Brasil|     319|Faixas de pessoal...|     2022|     346105|       104029|          Total|
|630|Número de empresas|Unidades|     1|  Brasil|          N1|        Brasil|    9117|   Tipo de indústria|     2022|     346105|        99735|          Total|
+---+------------------+--------+------+--------+------------+--------------+--------+--------------------+---------+-----------+-------------+---------------+

