<a href="https://colab.research.google.com/github/AntonioDeFA/etl_pyspark/blob/develop/Dados_IBGE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark findspark

In [None]:
import findspark
import unicodedata
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Row
from pyspark.sql.window import Window

In [28]:
def remove_accents(input_str):
    nfkd_form = unicodedata.normalize('NFKD', input_str)
    only_ascii = nfkd_form.encode('ASCII', 'ignore')
    return only_ascii.decode('ASCII')

In [56]:
findspark.init()


udf_remove_accents = udf(remove_accents,
                         StringType())

spark = SparkSession.builder \
    .appName("Dados IBGE") \
    .getOrCreate()

spark.udf.register("sql_remove_accents",
                   remove_accents,
                   StringType())

input_path_ibge = "/content/cities-brazil-ibge.csv"
data_ibge = spark.read.csv(input_path_ibge,
                           header=True,
                           inferSchema=True,
                           sep=",",
                           encoding="UTF-8")

input_path_covid = "/content/cases-brazil-cities-time_2022.csv"
data_covid = spark.read.csv(input_path_covid,
                            header=True,
                            inferSchema=True,
                            sep=",",
                            encoding="UTF-8")

In [57]:
data_ibge.createOrReplaceTempView("tb_ibge")
data_covid.createOrReplaceTempView("tb_covid")

spark.sql("SELECT epi_week AS epi_mes, \
                  date AS data,  \
                  state AS estado, \
                  upper(sql_remove_accents(city)) AS cidade, \
                  ibgeID,  \
                  newDeaths AS novas_mortes,  \
                  deaths AS mortes,  \
                  newCases AS novos_casos, \
                  totalCases AS total_casos, \
                  deaths_per_100k_inhabitants AS morte_por_100k_habitantes, \
                  totalCases_per_100k_inhabitants AS total_morte_por_100k_habitantes, \
                  deaths_by_totalCases AS mortes_por_total_de_casos \
                  FROM tb_covid")\
                  .createOrReplaceTempView("tb_covid_traduzido")

spark.sql("SELECT epi_mes, \
                  data, \
                  estado, \
                  upper(sql_remove_accents(cidade)) AS cidade, \
                  ibgeID, \
                  novas_mortes, \
                  mortes, \
                  novos_casos, \
                  total_casos, \
                  morte_por_100k_habitantes, \
                  total_morte_por_100k_habitantes, \
                  mortes_por_total_de_casos \
                  FROM tb_covid_traduzido")\
                  .createOrReplaceTempView("tb_covid_sem_acento")

spark.sql("SELECT epi_mes, \
                  data, \
                  estado, \
                  SUBSTR(cidade, 1, INSTR(cidade, '/') - 1) AS cidade, \
                  ibgeID, \
                  novas_mortes, \
                  mortes, \
                  novos_casos, \
                  total_casos, \
                  morte_por_100k_habitantes, \
                  total_morte_por_100k_habitantes, \
                  mortes_por_total_de_casos \
                  FROM tb_covid_sem_acento")\
                  .createOrReplaceTempView("tb_covid_sem_estado")



tb_ibge_covid_merged = spark.sql("SELECT * FROM tb_covid_sem_estado \
                                JOIN tb_ibge ON LocalCidade = cidade")

tb_ibge_covid_merged.createOrReplaceTempView("tb_data_atualizada")

In [58]:
window_spec = Window.partitionBy("cidade").orderBy(desc("data"))

df_ordenado_data_recente = tb_ibge_covid_merged.withColumn("row",
                                                           row_number().
                                                           over(window_spec))
df_filtrado_data_recentes = df_ordenado_data_recente.filter(col("row") == 1)\
                                                            .drop("row")

df_filtrado_PB = df_filtrado_data_recentes.filter(col("estado") == "PB")
pd_filtrado_PB = df_filtrado_PB.orderBy(desc('mortes')).toPandas()

In [59]:
df_filtrado_PB.show()

+-------+----------+------+--------------------+-------+------------+------+-----------+-----------+-------------------------+-------------------------------+-------------------------+--------+--------------------+-------+-----------------+------------+--------+---------+--------------------+----------------+-------------+-----+-----------------------+-----------------------+--------+
|epi_mes|      data|estado|              cidade| ibgeID|novas_mortes|mortes|novos_casos|total_casos|morte_por_100k_habitantes|total_morte_por_100k_habitantes|mortes_por_total_de_casos|IBGECode|         LocalCidade|LocalUF|      LocalEstado|RegiaoBrasil|Latitude|Longitude|           Gentilico|PopEstimada_2018|PopCenso 2010| IDHM|ReceitasRealizadas_2014|DespesasEmpenhadas_2014|Pib_2014|
+-------+----------+------+--------------------+-------+------------+------+-----------+-----------+-------------------------+-------------------------------+-------------------------+--------+--------------------+-------+--

In [None]:
plt.figure(figsize=(10, 50))
plot = sns.barplot(x="mortes", y="cidade", data=pd_filtrado_PB)
plt.title('Quantidade de morte por cidade')

for p in plot.patches:
    width = p.get_width()
    plt.text(width + 3,
             p.get_y() + p.get_height()/2,
             int(width),
             ha="left", va="center")

In [61]:
cities = [row['cidade'] for row in df_filtrado_PB]
percentages = [row['total_casos'] for row in df_filtrado_PB]

# Total de pessoas por cidade
total_by_city = df_filtrado_PB['cidade'].value_counts()

# Total de pessoas infectadas por cidade
infected_by_city = df_filtrado_PB[df_filtrado_PB['estado'] == 'total_casos']['cidade'].value_counts()

# Calcular a porcentagem de pessoas infectadas por cidade
infected_percentage = (infected_by_city / total_by_city) * 100

# Plotando o gráfico
plt.figure(figsize=(10, 6))
sns.barplot(x=infected_percentage.index, y=infected_percentage.values)
plt.xticks(rotation=90)  # rotaciona os nomes das cidades para melhor visualização
plt.ylabel('Percentage of Infected People (%)')
plt.title('Percentage of Infected People by City')
plt.tight_layout()
plt.show()

TypeError: ignored

In [49]:
# .createOrReplaceTempView("tb_data_atualizada")

# spark.sql("SELECT * FROM tb_data_atualizada WHERE city = 'JOAO PESSOA'").show()

# df = tb_ibge_covid_merged.orderBy(col('date').desc())
# registro_recente = df.limit(1)



# output_path = "caminho/para/saida.csv"
# data_transformed.write.csv(output_path, header=True)

# # Encerrar a sessão Spark
spark.stop()