In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import requests
import os
from datetime import datetime

In [None]:
url = "https://servicodados.ibge.gov.br/api/v3/agregados/1839/periodos/2022/variaveis/630?localidades=N1[all]"

In [None]:
# Extração
response = requests.get(url)
response.raise_for_status()
data = response.json()


# Extração
response = requests.get(url)
response.raise_for_status()  # Levanta um erro se a requisição falhar
data = response.json()

# Cria uma sessão Spark
spark = SparkSession.builder.appName("PesquisaIndustrialAnual").getOrCreate()

# Define o schema
schema = StructType([
    StructField("variavel_id", IntegerType(), False),
    StructField("variavel", StringType(), False),
    StructField("categoria_id", IntegerType(), False),
    StructField("categoria_nome", StringType(), False),
    StructField("unidade", StringType(), False),
    StructField("valor", IntegerType(), False),
    StructField("classificacao_id", IntegerType(), False),
    StructField("classificacao_nome", StringType(), False),
    StructField("localidade_id", IntegerType(), False),
    StructField("localidade", StringType(), False),
    StructField("nivel_geografico", StringType(), False),
    StructField("data", StringType(), False),
])

# Função para extrair os dados
def extrair_dados(data):
    linhas = []
    
    for item in data:
        variavel_id = int(item["id"])
        variavel = item["variavel"]
        unidade = item["unidade"]
        
        resultado = item["resultados"][0]
        valor = int(resultado["series"][0]["serie"]["2022"])
        
        localidade_id = int(resultado["series"][0]["localidade"]["id"])
        localidade = resultado["series"][0]["localidade"]["nome"]
        nivel_geografico = resultado["series"][0]["localidade"]["nivel"]["id"]
        data_ano = "2022"
        
        classificacoes = resultado["classificacoes"]
        
        for classificacao in classificacoes:
            classificacao_id = int(classificacao["id"])
            classificacao_nome = classificacao["nome"]
            
            for categoria_id, categoria_nome in classificacao["categoria"].items():
                linhas.append((
                    variavel_id,
                    variavel,
                    int(categoria_id),
                    categoria_nome,
                    unidade,
                    valor,
                    classificacao_id,
                    classificacao_nome,
                    localidade_id,
                    localidade,
                    nivel_geografico,
                    data_ano
                ))
    
    # Cria DataFrame do Spark
    df = spark.createDataFrame(linhas, schema=schema)
    return df

# Instancia o DataFrame
df_spark = extrair_dados(data)

# Mostrando os resultados
df_spark.show()

# Cria o diretório de saída
output_dir = "/opt/airflow/dags/resultado"
os.makedirs(output_dir, exist_ok=True)

# Obtendo a data atual
current_date = datetime.now().strftime("%Y-%m-%d")

# Salvando o arquivo CSV com a data atual
output_path = os.path.join(output_dir, f"pesquisa_industrial_anual_empresas_{current_date}.csv")

# Configurando o Spark para salvar o DataFrame como CSV
df_spark.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_dir)