<a href="https://colab.research.google.com/github/Avyllaa/Trabalho_Big_Data/blob/main/Analise_Dados_Obtidos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Instalar PySpark e dependências

!pip install pyspark -q

In [None]:
# SparkSession é a entrada para usar Spark, aqui ele cria e configura a SparkSession
# local[*] usa todos os núcleos disponíveis do notebook.


from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Ranking_Municipios_RJ") \
    .master("local[*]") \
    .getOrCreate()

spark

In [None]:
# Ler o csv como Spark DataFrame
# 'inferSchema=True' tenta determinar tipos numéricos automaticamente e,
# df.printSchema() mostra os tipos de cada coluna.

csv_path = '/content/dados_limpos_rj.csv'

df = spark.read.csv(csv_path, header=True, inferSchema=True)
df.printSchema()
df.show(5, truncate=False)

root
 |-- Município: string (nullable = true)
 |-- PIB per capita 2021: double (nullable = true)
 |-- Cobertura Vacinal (Poliomielite): double (nullable = true)
 |-- Expectativa de Vida: double (nullable = true)
 |-- Mortalidade Infantil até 5 Anos: double (nullable = true)
 |-- Hospitalizações por Condições Sensíveis à Atenção Primária: double (nullable = true)
 |-- Acesso ao Conhecimento Básico: double (nullable = true)
 |-- Acesso à Educação Superior: double (nullable = true)
 |-- Nota Mediana do Enem: double (nullable = true)
 |-- Assassinatos de Jovens: double (nullable = true)
 |-- Assassinatos de Mulheres: double (nullable = true)
 |-- Homicídios: double (nullable = true)
 |-- Abastecimento de Água via Rede de Distribuição: double (nullable = true)
 |-- Esgotamento Sanitário Adequado: double (nullable = true)
 |-- Domicílios com Coleta de Resíduos Adequada: double (nullable = true)
 |-- Domicílios com Iluminação Elétrica Adequada: double (nullable = true)
 |-- Áreas Verdes Urban

In [None]:
# número de municípios
print("Total de municípios:", df.count())

# colunas disponíveis
print("Colunas:", df.columns)


Total de municípios: 92
Colunas: ['Município', 'PIB per capita 2021', 'Cobertura Vacinal (Poliomielite)', 'Expectativa de Vida', 'Mortalidade Infantil até 5 Anos', 'Hospitalizações por Condições Sensíveis à Atenção Primária', 'Acesso ao Conhecimento Básico', 'Acesso à Educação Superior', 'Nota Mediana do Enem', 'Assassinatos de Jovens', 'Assassinatos de Mulheres', 'Homicídios', 'Abastecimento de Água via Rede de Distribuição', 'Esgotamento Sanitário Adequado', 'Domicílios com Coleta de Resíduos Adequada', 'Domicílios com Iluminação Elétrica Adequada', 'Áreas Verdes Urbanas', 'Emissões de CO₂e por Habitante', 'Índice de Vulnerabilidade Climática dos Municípios (IVCM)', 'Acesso a Programas de Direitos Humanos', 'Paridade de Gênero na Câmara Municipal', 'Paridade de Negros na Câmara Municipal', 'Inclusão Social']


In [None]:
# Converter coluna de interesse para RDD de floats e somar (map -> reduce)
rdd_pib = df.select('Assassinatos de Mulheres').rdd.map(lambda row: row[0] if row[0] is not None else 0.0)
soma_pib = rdd_pib.reduce(lambda a, b: a + b)
print("Total de feminicídios em 2025):", soma_pib)


Total de feminicídios em 2025): 19.699999999999996


In [None]:
# Aqui começa o processo de calculo do score e do ranking, definindo pesos para cada
# indicador que fizer parte dele utilizando a Spark DataFrame API.
# No final, a somatória de todos os pesos precisa ser = 1

weights = {
    'PIB per capita 2021': 0.10,
    'Cobertura Vacinal (Poliomielite)': 0.08,
    'Expectativa de Vida': 0.12,
    'Mortalidade Infantil até 5 Anos': 0.12,   # indicador negativo
    'Hospitalizações por Condições Sensíveis à Atenção Primária': 0.06,
    'Acesso ao Conhecimento Básico': 0.08,
    'Acesso à Educação Superior': 0.06,
    'Nota Mediana do Enem': 0.06,
    'Homicídios': 0.10,  # indicador negativo
    'Abastecimento de Água via Rede de Distribuição': 0.06,
    'Esgotamento Sanitário Adequado': 0.06,
    'Domicílios com Coleta de Resíduos Adequada': 0.04,
    'Áreas Verdes Urbanas': 0.06
}

print("Soma dos pesos:", sum(weights.values()))


# Observação: alguns indicadores são positivos (quanto maior, melhor)
# e outros negativos (quanto maior, pior — ex: homicídios, mortalidade).
# Para negativos, vamos usar (1 - valor) porque os dados já estão normalizados entre 0 e 1.


Soma dos pesos: 1.0


In [None]:
# Criar expressão do score no Spark

from pyspark.sql import functions as F

# construir expressão do score
exprs = []
for col, w in weights.items():
    # determinar quais colunas são "negativas"
    if col in ['Mortalidade Infantil até 5 Anos', 'Homicídios', 'Hospitalizações por Condições Sensíveis à Atenção Primária']:

        exprs.append((1 - F.col(col)) * w)
    else:
        exprs.append(F.col(col) * w)

score_expr = sum(exprs)

df_score = df.withColumn('score', score_expr)


In [None]:
# Ordenar e ver top 10

top10 = df_score.select('Município', 'score').orderBy(F.desc('score')).limit(10)

top10.show(truncate=False)


+-------------------+------------------+
|Município          |score             |
+-------------------+------------------+
|Niterói (RJ)       |0.6844            |
|Rio de Janeiro (RJ)|0.6802000000000001|
|Nova Friburgo (RJ) |0.6544000000000001|
|Valença (RJ)       |0.6512            |
|Porto Real (RJ)    |0.6474000000000001|
|Carmo (RJ)         |0.6458            |
|Itaperuna (RJ)     |0.6347999999999999|
|Teresópolis (RJ)   |0.6346            |
|Resende (RJ)       |0.6254            |
|Volta Redonda (RJ) |0.6147999999999999|
+-------------------+------------------+



In [None]:
# Reduzir score para 3 casas decimais
# converter para pandas para visualização


df_score = df_score.withColumn('score', F.round(F.col('score'), 3))

df_pandas = df_score.toPandas()
df_pandas[['Município','score']].sort_values(by='score', ascending=False).head(10)

Unnamed: 0,Município,score
47,Niterói (RJ),0.684
67,Rio de Janeiro (RJ),0.68
48,Nova Friburgo (RJ),0.654
88,Valença (RJ),0.651
58,Porto Real (RJ),0.647
19,Carmo (RJ),0.646
85,Teresópolis (RJ),0.635
32,Itaperuna (RJ),0.635
62,Resende (RJ),0.625
91,Volta Redonda (RJ),0.615


In [None]:
# Salvar resultados como CSV local para uso no dashboard
# Localizar o CSV gerado e renomear para um arquivo único

out_path = '/content/ranking_municipios_rj'
df_score.select('Município','score').orderBy(F.desc('score')).coalesce(1).write.mode('overwrite').option('header',True).csv(out_path)

print("Resultados salvos em:", out_path)


import os, glob, shutil

csv_file = glob.glob('/content/ranking_municipios_rj/part-*.csv')[0]
shutil.copy(csv_file, '/content/ranking_municipios_rj.csv')
print("Arquivo final em /content/ranking_municipios_rj.csv")



Resultados salvos em: /content/ranking_municipios_rj
Arquivo final em /content/ranking_municipios_rj.csv
