In [None]:
import requests
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

"""
Função principal para orquestrar o pipeline de dados.
"""
# --- 1. Inicialização do Spark ---
# Cria uma sessão Spark, que é o ponto de entrada para qualquer funcionalidade do Spark.
# O appName ajuda a identificar sua aplicação na UI do Spark.
spark = SparkSession.builder \
    .appName("ETL_SuicidioXDepressao") \
    .getOrCreate()

print("Sessão Spark iniciada com sucesso.")

# --- 2. Extração (Extract) ---
# Coletamos os dados da API usando a biblioteca 'requests' do Python.
# Esta etapa é executada no Driver do Spark.
both = "SEX_BTSX"
female = "SEX_FMLE"
male = "SEX_MLE"

URL_SUICIDIO_B = f"https://ghoapi.azureedge.net/api/SDGSUICIDE?$filter=TimeDim eq 2021 and SpatialDimType eq 'COUNTRY' and Dim1 eq '{both}' and Dim2 eq 'AGEGROUP_YEARSALL'"
URL_SUICIDIO_F = f"https://ghoapi.azureedge.net/api/SDGSUICIDE?$filter=TimeDim eq 2021 and SpatialDimType eq 'COUNTRY' and Dim1 eq '{female}' and Dim2 eq 'AGEGROUP_YEARSALL'"
URL_SUICIDIO_M = f"https://ghoapi.azureedge.net/api/SDGSUICIDE?$filter=TimeDim eq 2021 and SpatialDimType eq 'COUNTRY' and Dim1 eq '{male}' and Dim2 eq 'AGEGROUP_YEARSALL'"

URL_DEPRESSAO_B = f"https://ghoapi.azureedge.net/api/MH_12?$filter=TimeDim eq 2021 and SpatialDimType eq 'COUNTRY' and Dim1 eq '{both}'"
URL_DEPRESSAO_F = f"https://ghoapi.azureedge.net/api/MH_12?$filter=TimeDim eq 2021 and SpatialDimType eq 'COUNTRY' and Dim1 eq '{female}'"
URL_DEPRESSAO_M = f"https://ghoapi.azureedge.net/api/MH_12?$filter=TimeDim eq 2021 and SpatialDimType eq 'COUNTRY' and Dim1 eq '{male}'"

# É uma boa prática definir um cabeçalho (header) com uma chave de API.
# Para este exemplo, a API funciona sem chave, mas em produção, você deve usar uma.
headers = {
    "Content-Type": "application/json"
}

def extrair_api(url):
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # Lança um erro para respostas com status 4xx/5xx
        dados = response.json().get("value",[])
        print(f"Dados coletados com sucesso! Total de registros retornados: {len(dados)}")
        return dados
        
    except Exception as e:
        print(f"Erro ao chamar a API: {e}")
        return []

print("Extraindo dados de cada api:")
dados_suicidio_b = extrair_api(URL_SUICIDIO_B)
dados_suicidio_f = extrair_api(URL_SUICIDIO_F)
dados_suicidio_m = extrair_api(URL_SUICIDIO_M)

dados_depressao_b = extrair_api(URL_DEPRESSAO_B)
dados_depressao_f = extrair_api(URL_DEPRESSAO_F)
dados_depressao_m = extrair_api(URL_DEPRESSAO_M)

# Se a resposta estiver vazia, não há o que processar.
if not dados_suicidio_b and dados_suicidio_f and dados_suicidio_m and dados_depressao_b and dados_depressao_f and dados_depressao_m:
    print("Nenhum dado retornado pela API. Encerrando o processo...")
    spark.stop()

# --- 3. Transformação (Transform) ---
# Agora, vamos converter os dados JSON em um DataFrame Spark e tratá-los.

# O Spark pode inferir o schema, mas definir explicitamente é mais seguro e performático.
schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("IndicatorCode", StringType(), True),

    StructField("SpatialDimType", StringType(), True),
    StructField("SpatialDim", StringType(), True),

    StructField("ParentLocationCode", StringType(), True),
    StructField("TimeDimType", StringType(), True),
    StructField("ParentLocation", StringType(), True),

    StructField("Dim1Type", StringType(), True),
    StructField("TimeDim", IntegerType(), True),
    StructField("Dim1", StringType(), True),

    StructField("Dim2Type", StringType(), True),
    StructField("Dim2", StringType(), True),

    StructField("Dim3Type", StringType(), True),
    StructField("Dim3", StringType(), True),

    StructField("DataSourceDimType", StringType(), True),
    StructField("DataSourceDim", StringType(), True),

    StructField("Value", StringType(), True),
    StructField("NumericValue", DoubleType(), True),

    StructField("Low", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Comments", StringType(), True),

    StructField("Date", StringType(), True),
    StructField("TimeDimensionValue", StringType(), True),
    StructField("TimeDimensionBegin", StringType(), True),
    StructField("TimeDimensionEnd", StringType(), True)
])

# Criamos um RDD (Resilient Distributed Dataset) a partir do JSON e depois um DataFrame.
# Isso permite que o Spark distribua o processamento dos dados.
def criar_df_tratado(dados):
    json_strings = [json.dumps(item) for item in dados]
    rdd = spark.sparkContext.parallelize(json_strings)
    df = spark.read.json(rdd, schema=schema)
    
    # Aplicando transformações para limpar e organizar os dados
    df_tratado = df.select(
        col("SpatialDim").alias("Pais"),
        col("TimeDim").alias("Ano"),
        col("Dim1").alias("Sexo"),
        col("NumericValue").alias("Valor")
    )
    
    return df_tratado

df_suicidio_b = criar_df_tratado(dados_suicidio_b)
df_suicidio_f = criar_df_tratado(dados_suicidio_f)
df_suicidio_m = criar_df_tratado(dados_suicidio_m)

df_depressao_b = criar_df_tratado(dados_depressao_b)
df_depressao_f = criar_df_tratado(dados_depressao_f)
df_depressao_m = criar_df_tratado(dados_depressao_m)

print("Mostrando Tabelas de Suicidio de cada Sexo:")
#Ambos
df_suicidio_b.printSchema()
df_suicidio_b.show(3, False)

#Mulher
df_suicidio_f.printSchema()
df_suicidio_f.show(3, False)

#Homem
df_suicidio_m.printSchema()
df_suicidio_m.show(3, False)

print("Mostrando Tabelas de Depressao de cada Sexo:")
#Ambos
df_depressao_b.printSchema()
df_depressao_b.show(3, False)

#Mulher
df_depressao_f.printSchema()
df_depressao_f.show(3, False)

#Homem
df_depressao_m.printSchema()
df_depressao_m.show(3, False)

# --- 4. Carga (Load) ---
# Salvamos o DataFrame tratado no HDFS em formato Parquet.
# Parquet é um formato colunar otimizado para performance em análises com Spark.

# O caminho no HDFS onde os dados serão salvos.
HDFS_SILVER_SB = "hdfs://namenode:9000/datalake/silver/suicidio_b"
HDFS_SILVER_SF = "hdfs://namenode:9000/datalake/silver/suicidio_f"
HDFS_SILVER_SM = "hdfs://namenode:9000/datalake/silver/suicidio_m"

HDFS_SILVER_DB = "hdfs://namenode:9000/datalake/silver/depressao_b"
HDFS_SILVER_DF = "hdfs://namenode:9000/datalake/silver/depressao_f"
HDFS_SILVER_DM = "hdfs://namenode:9000/datalake/silver/depressao_m"

print(f"Salvando dados tratados no HDFS...")

# 'overwrite' substitui os dados se o diretório já existir.
# 'partitionBy' é útil para organizar os dados, mas como já filtramos por ano_mes,
# incluímos ele no caminho para criar uma partição manual.
df_suicidio_b.write.mode("overwrite").parquet(HDFS_SILVER_SB)
df_suicidio_f.write.mode("overwrite").parquet(HDFS_SILVER_SF)
df_suicidio_m.write.mode("overwrite").parquet(HDFS_SILVER_SM)

df_depressao_b.write.mode("overwrite").parquet(HDFS_SILVER_DB)
df_depressao_f.write.mode("overwrite").parquet(HDFS_SILVER_DF)
df_depressao_m.write.mode("overwrite").parquet(HDFS_SILVER_DM)

print("Dados salvos com sucesso no HDFS!")

def renomear_coluna(df, coluna, nome_novo):
    return df.withColumnRenamed(coluna, nome_novo)

print("Renomeando coluna Valor de acordo com a tabela:")

df_suicidio_b = renomear_coluna(df_suicidio_b, "Valor", "Suicidio_Ambos")
df_suicidio_f = renomear_coluna(df_suicidio_f, "Valor", "Suicidio_Mulher")
df_suicidio_m = renomear_coluna(df_suicidio_m, "Valor", "Suicidio_Homem")

df_depressao_b = renomear_coluna(df_depressao_b, "Valor", "Depressao_Ambos")
df_depressao_f = renomear_coluna(df_depressao_f, "Valor", "Depressao_Mulher")
df_depressao_m = renomear_coluna(df_depressao_m, "Valor", "Depressao_Homem")

print("Colunas renomeadas!")

print("DataFrames Silver carregados:")

print("Mostrando Tabelas de Suicidio de cada Sexo:")
df_suicidio_b.show(3)
df_suicidio_f.show(3)
df_suicidio_m.show(3)

print("Mostrando Tabelas de Depressao de cada Sexo:")
df_depressao_b.show(3)
df_depressao_f.show(3)
df_depressao_m.show(3)

# ======================
# 5. JOIN (Camada GOLD)
# ======================
df_gold = (df_suicidio_b
           .drop("Sexo")
           .join(df_depressao_b.drop("Sexo"), ["Pais", "Ano"], "outer")
           .join(df_suicidio_f.drop("Sexo"), ["Pais", "Ano"], "outer")
           .join(df_depressao_f.drop("Sexo"), ["Pais", "Ano"], "outer")
           .join(df_suicidio_m.drop("Sexo"), ["Pais", "Ano"], "outer")
           .join(df_depressao_m.drop("Sexo"), ["Pais", "Ano"], "outer")
    )

print("Resultado do join GOLD:")
df_gold.show(10, truncate=False)

# ======================
# 6. Caminho GOLD
# ======================

CAMINHO_GOLD = "hdfs://namenode:9000/datalake/gold/suicidio_depressao"

# ======================
# 7. Salvando em PARQUET
# ======================

df_gold.write.mode("overwrite").parquet(CAMINHO_GOLD)

print("Arquivo GOLD salvo em Parquet com sucesso!")

# ======================
# 8. (Opcional) Salvar também em CSV – ideal para Power BI
# ======================
url = "jdbc:postgresql://postgres:5432/superset"
tabela = "fato_suicidio_depressao"
props = {
    "user": "superset",
    "password": "superset",
    "driver": "org.postgresql.Driver"
}

df_gold.write \
    .mode("overwrite") \
    .jdbc(url=url, table=tabela, properties=props)

print("Tabela salva com sucesso no PostgreSQL!")

# --- 5. Finalização ---
spark.stop()
print("Sessão Spark finalizada.")

Sessão Spark iniciada com sucesso.
Dados coletados com sucesso! Total de registros retornados: 185
Dados coletados com sucesso! Total de registros retornados: 185
root
 |-- Pais: string (nullable = true)
 |-- Ano: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Valor: double (nullable = true)
 |-- dataset_origem: string (nullable = false)

+----+----+--------+-----------+--------------+
|Pais|Ano |Sexo    |Valor      |dataset_origem|
+----+----+--------+-----------+--------------+
|GHA |2021|SEX_BTSX|5.316740355|suicidio      |
|JPN |2021|SEX_BTSX|17.43068584|suicidio      |
|TGO |2021|SEX_BTSX|9.340400499|suicidio      |
|LSO |2021|SEX_BTSX|28.66132363|suicidio      |
|AZE |2021|SEX_BTSX|1.558857505|suicidio      |
+----+----+--------+-----------+--------------+
only showing top 5 rows

root
 |-- Pais: string (nullable = true)
 |-- Ano: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Valor: double (nullable = true)
 |-- dataset_origem: string (nul