In [1]:
# importe os pacotes aqui
from cryptography.fernet import Fernet
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
from pyspark.sql.types import StructType, StructField, StringType, MapType, ArrayType
import requests

In [2]:
# url para o cojunto de dados 
url = "https://servicodados.ibge.gov.br/api/v3/agregados/1839/periodos/2022/variaveis/630?localidades=N1[all]"

fernet_key = Fernet.generate_key().decode()
secret_key = os.urandom(24).hex()
print(f"FERNET_KEY: {fernet_key}")
print(f"SECRET_KEY: {secret_key}")

# Iniciando uma sessão Spark
spark = SparkSession.builder \
    .appName("Pesquisa Industrial Anual - Empresas") \
    .getOrCreate()

FERNET_KEY: UAjVjHLFXNzt0dFLCtMS6tFob5pwyqyw1C5nmSVmcus=
SECRET_KEY: c0b1cff081e5882c9f12130c8f2972769f09001baa99160a


/home/airflow/.local/lib/python3.11/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# escreva o código de extração e transformação a partir daqui
response = requests.get(url)
data = response.json()

# Definindo o mapeamento do schema
schema = StructType([
    StructField("id", StringType(), True),
    StructField("variavel", StringType(), True),
    StructField("unidade", StringType(), True),
    StructField("resultados", ArrayType(
        StructType([
            StructField("classificacoes", ArrayType(
                StructType([
                    StructField("id", StringType(), True),
                    StructField("nome", StringType(), True),
                    StructField("categoria", MapType(StringType(), StringType()), True)
                ])
            ), True),
            StructField("series", ArrayType(
                StructType([
                    StructField("localidade", StructType([
                        StructField("id", StringType(), True),
                        StructField("nivel", StructType([
                            StructField("id", StringType(), True),
                            StructField("nome", StringType(), True)
                        ]), True),
                        StructField("nome", StringType(), True)
                    ]), True),
                    StructField("serie", MapType(StringType(), StringType()), True)
                ])
            ), True)
        ])
    ), True)
])

# Convertendo os dados para um RDD e criando o DataFrame Spark
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd, schema=schema)

# Alterando as estruturas aninhadas para facilitar a análise
df_exploded = df.withColumn("resultados", explode(col("resultados"))) \
                .withColumn("classificacoes", explode(col("resultados.classificacoes"))) \
                .withColumn("series", explode(col("resultados.series"))) \
                .select(
                    "id", "variavel", "unidade",
                    col("classificacoes.id").alias("classificacao_id"),
                    col("classificacoes.nome").alias("classificacao_nome"),
                    col("classificacoes.categoria").alias("classificacao_categoria"),
                    col("series.localidade.id").alias("localidade_id"),
                    col("series.localidade.nivel.id").alias("nivel_id"),
                    col("series.localidade.nivel.nome").alias("nivel_nome"),
                    col("series.localidade.nome").alias("localidade_nome"),
                    col("series.serie").alias("serie")
                )


df_exploded.show(truncate=False)

# Finalizando a sessão do Spark
spark.stop()

[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

+---+------------------+--------+----------------+-------------------------+-----------------------+-------------+--------+----------+---------------+----------------+
|id |variavel          |unidade |classificacao_id|classificacao_nome       |classificacao_categoria|localidade_id|nivel_id|nivel_nome|localidade_nome|serie           |
+---+------------------+--------+----------------+-------------------------+-----------------------+-------------+--------+----------+---------------+----------------+
|630|Número de empresas|Unidades|319             |Faixas de pessoal ocupado|{104029 -> Total}      |1            |N1      |Brasil    |Brasil         |{2022 -> 346105}|
|630|Número de empresas|Unidades|9117            |Tipo de indústria        |{99735 -> Total}       |1            |N1      |Brasil    |Brasil         |{2022 -> 346105}|
+---+------------------+--------+----------------+-------------------------+-----------------------+-------------+--------+----------+---------------+----------