In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
spark = SparkSession.builder.appName("projeto_final") \
.enableHiveSupport() \
.getOrCreate()

In [3]:
# criando diretório pai HDFS para o projeto 
!hdfs dfs -mkdir -p /projeto-final

In [4]:
# criando diretório para receber os arquivos de dados
!hdfs dfs -mkdir /projeto-final/dados

mkdir: `/projeto-final/dados': File exists


In [5]:
# Listando diretórios pai no HDFS
!hdfs dfs -ls /

Found 5 items
drwxr-xr-x   - root supergroup          0 2023-07-31 14:17 /hbase
drwxr-xr-x   - root supergroup          0 2023-07-26 13:37 /home
drwxr-xr-x   - root supergroup          0 2023-07-19 13:05 /projeto-final
drwxrwxr-x   - root supergroup          0 2023-07-24 02:44 /tmp
drwxr-xr-x   - root supergroup          0 2023-07-16 15:20 /user


In [6]:
# Enviando arquivos para hdfs na pasta criada
!hdfs dfs -put HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv /projeto-final/dados
!hdfs dfs -put HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv /projeto-final/dados
!hdfs dfs -put HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv /projeto-final/dados
!hdfs dfs -put HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv /projeto-final/dados

put: `/projeto-final/dados/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv': File exists
put: `/projeto-final/dados/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv': File exists
put: `/projeto-final/dados/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv': File exists
put: `/projeto-final/dados/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv': File exists


In [7]:
# checando arquivos enviados na pasta alvo
!hdfs dfs -ls /projeto-final/dados

Found 4 items
-rw-r--r--   2 root supergroup   62492959 2023-07-15 00:14 /projeto-final/dados/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   2 root supergroup   76520681 2023-07-15 00:18 /projeto-final/dados/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   2 root supergroup   91120916 2023-07-15 00:18 /projeto-final/dados/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   2 root supergroup    3046774 2023-07-15 00:18 /projeto-final/dados/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [8]:
#Carregar cada arquivo CSV em um DataFrame
df1 = spark.read.csv("/projeto-final/dados/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv", header=True, sep=';')
df2 = spark.read.csv("/projeto-final/dados/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv", header=True, sep=';')
df3 = spark.read.csv("/projeto-final/dados/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv", header=True, sep=';')
df4 = spark.read.csv("/projeto-final/dados/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv", header=True, sep=';')

In [9]:
# checand quantidade de registros a serem carregados
print('arquivo 1 : ', df1.count())
print('arquivo 2 : ', df2.count())
print('arquivo 3 : ', df3.count())
print('arquivo 3 : ', df4.count())
print('Total:', df1.count() + df2.count() + df3.count() + df4.count())

arquivo 1 :  714481
arquivo 2 :  859708
arquivo 3 :  1017040
arquivo 3 :  33714
Total: 2624943


In [10]:
# Combinando os DataFrames em um único DataFrame spark
df_total = df1.union(df2).union(df3).union(df4)

In [11]:
# checando schema e primeiros 5 registros
df_total.printSchema()
df_total.show(5)

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: string (nullable = true)
 |-- codmun: string (nullable = true)
 |-- codRegiaoSaude: string (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: string (nullable = true)
 |-- semanaEpi: string (nullable = true)
 |-- populacaoTCU2019: string (nullable = true)
 |-- casosAcumulado: string (nullable = true)
 |-- casosNovos: string (nullable = true)
 |-- obitosAcumulado: string (nullable = true)
 |-- obitosNovos: string (nullable = true)
 |-- Recuperadosnovos: string (nullable = true)
 |-- emAcompanhamentoNovos: string (nullable = true)
 |-- interior/metropolitana: string (nullable = true)

+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|co

In [12]:
# checando se todos os registros foram carregados após union
df_total.count()

2624943

In [13]:
# Popular a tabelsla Hive criada  com os dados do dataframe spark #demorou 25 min p/popular
df_total.write.partitionBy("municipio").saveAsTable("covid_por_local", mode="overwrite")

In [13]:
!hdfs dfs -ls /user/hive/warehouse/

Found 2 items
drwxr-xr-x   - root supergroup          0 2023-07-31 11:36 /user/hive/warehouse/covid19_recuperados_acompanhamento
drwxr-xr-x   - root supergroup          0 2023-07-26 00:48 /user/hive/warehouse/covid_por_local


In [14]:
spark.read.table("covid_por_local").show(2)

+------+------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+---------+
|regiao|estado|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|      data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|municipio|
+------+------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+---------+
|Brasil|  null|   76|  null|          null|           null|2021-01-01|       53|       210147125|       7700578|     24605|         195411|        462|         6756284|               748883|                  null|     null|
|Brasil|  null|   76|  null|          null|           null|2021-01-02|       53|       210147125|       

In [15]:
#Visualizações

In [16]:
from pyspark.sql.functions import *
import json

In [17]:
#Visualizacao 1 - KPI de "casos recuperados" e KPI "em acompanhamento"
#Calcular os KPIs
kpi_recuperados = df_total.agg(sum("recuperadosNovos")).collect()[0][0]
kpi_em_acompanhamento = df_total.agg(sum("emAcompanhamentoNovos")).collect()[0][0]

In [18]:
#Exibir os KPIs
print("Visualização 1 - KPIs de Casos Recuperados e Em Acompanhamento:")
print("Casos Recuperados:", kpi_recuperados)
print("Em Acompanhamento:", kpi_em_acompanhamento)

Visualização 1 - KPIs de Casos Recuperados e Em Acompanhamento:
Casos Recuperados: 2920055795.0
Em Acompanhamento: 320793426.0


In [19]:
#Visualização 2 -"CASOS CONFIRMADOS" com KPIs "acumulados", "casos novos" e "incidência"
#Calcular os KPIs
kpi_acumulados = df_total.agg(sum("casosAcumulado")).collect()[0][0]
kpi_casos_novos = df_total.agg(avg("casosNovos")).collect()[0][0]
kpi_incidencia = kpi_casos_novos / kpi_acumulados

In [20]:
# Exibir os KPIs
print("Visualização 2 - CASOS CONFIRMADOS:")
print("Acumulados:", kpi_acumulados)
print("Casos Novos:", kpi_casos_novos)
print("Incidência:", kpi_incidencia)

Visualização 2 - CASOS CONFIRMADOS:
Acumulados: 9998472092.0
Casos Novos: 21.54905649379815
Incidência: 2.1552349494519297e-09


In [21]:
#Visualização 3-"ÓBITOS CONFIRMADOS" com KPIs "óbitos acumulados", "casos novos", "letalidade" e "mortalidade":
#Calcular os KPIs
kpi_obitos_acumulados = df_total.agg(sum("obitosAcumulado")).collect()[0][0]
kpi_obitos_novos = df_total.agg(avg("obitosNovos")).collect()[0][0]
kpi_letalidade = (kpi_obitos_acumulados / kpi_acumulados) * 100
kpi_mortalidade = kpi_obitos_acumulados / kpi_acumulados

In [22]:
# Exibir os KPIs
print("Visualização 3 - ÓBITOS CONFIRMADOS:")
print("Óbitos Acumulados:", kpi_obitos_acumulados)
print("Óbitos Novos:", kpi_obitos_novos)
print("Letalidade (%):", kpi_letalidade)
print("Mortalidade:", kpi_mortalidade)

Visualização 3 - ÓBITOS CONFIRMADOS:
Óbitos Acumulados: 274784085.0
Óbitos Novos: 0.6021753615221359
Letalidade (%): 2.74826075895997
Mortalidade: 0.0274826075895997


In [23]:
#Salvar a primeira visualização como tabela Hive
df_total.select(sum("recuperadosNovos").alias("totalRecuperadosNovos"), sum("emAcompanhamentoNovos").alias("totalEmAcompanhamentoNovos")).write.saveAsTable("covid19_recuperados_acompanhamento", mode="overwrite")


In [24]:
#Checando
spark.read.table("covid19_recuperados_acompanhamento").show()

+---------------------+--------------------------+
|totalRecuperadosNovos|totalEmAcompanhamentoNovos|
+---------------------+--------------------------+
|        2.920055795E9|              3.20793426E8|
+---------------------+--------------------------+



In [25]:
#Salvar a segunda visualização como formato Parquet com compressão Snappy
# Criar DataFrame com os KPIs da segunda visualização
kpi_visualizacao2 = spark.createDataFrame([(kpi_acumulados, kpi_casos_novos, kpi_incidencia)], ["casos_acumulados", "casos_novos", "incidencia"])


In [26]:
# Salvar como formato Parquet com compressão Snappy
kpi_visualizacao2.write.parquet("user/kpi_visualizacao2.parquet", compression="snappy", mode="overwrite")

In [27]:
df_viz2 = spark.read.parquet("user/kpi_visualizacao2.parquet")
df_viz2.show()

+----------------+-----------------+--------------------+
|casos_acumulados|      casos_novos|          incidencia|
+----------------+-----------------+--------------------+
|   9.998472092E9|21.54905649379815|2.155234949451929...|
+----------------+-----------------+--------------------+



In [28]:
#Salvar visualicao 3 no topipo no kafka
#Criação do tópico kafka
# Feito no terminal : kafka-topics --bootstrap-server localhost:9092 --topic obitoscvd19 --create --partitions 3 --replication-factor 1

In [33]:
# 7 - Criar a visualização pelo Spark com os dados enviados para o HDFS:
#Síntese de casos, óbitos, incidência de mortalidade

In [36]:
# Casos | Óbitos | Indicencia 100k/hab | Mortalidade | Atualização
# Incidência por 100K habitantes Método de cálculo: (casos confirmados * 100.000) / população
# Mortalidade -  Método de cálculo: (óbitos * 100.000) / população  -> por 100K habitantes

viz_spark = df_total.groupBy('regiao', 'municipio') \
    .agg(
        sum('casosNovos').alias('casos'),
        sum('obitosNovos').alias('obitos'),
        last('data').alias('atualizacao'),
        last('populacaoTCU2019').alias('agg_pop'),
    ) \
    .withColumn('incidencia', round((col('casos') * 100_000) / col('agg_pop'), 1)) \
    .withColumn('mortalidade', round((col('obitos') * 100_000) / col('agg_pop'), 1)) \
    .select('regiao', 'municipio', 'casos', 'obitos', 'incidencia', 'mortalidade', 'atualizacao') \
    .orderBy(col('municipio').asc_nulls_first(), 'regiao')

viz_spark.show()
     

+------------+-------------------+-----------+--------+----------+-----------+-----------+
|      regiao|          municipio|      casos|  obitos|incidencia|mortalidade|atualizacao|
+------------+-------------------+-----------+--------+----------+-----------+-----------+
|      Brasil|               null|1.8855015E7|526892.0|    8972.3|      250.7| 2021-07-06|
|Centro-Oeste|               null|  1916619.0| 49207.0|      null|       null| 2021-07-06|
|    Nordeste|               null|  4477181.0|108088.0|      null|       null| 2021-07-06|
|       Norte|               null|  1735433.0| 43860.0|      null|       null| 2021-07-06|
|     Sudeste|               null|  7148156.0|245851.0|      null|       null| 2021-07-06|
|         Sul|               null|  3617963.0| 80879.0|      null|       null| 2021-07-06|
|Centro-Oeste|    Abadia de Goiás|     1508.0|    34.0|   17189.1|      387.6| 2021-07-06|
|     Sudeste|Abadia dos Dourados|      444.0|    14.0|    6352.8|      200.3| 2021-07-06|

In [None]:
# Salvar a visualização que foi salva em tópico kafka em um tópico no Elastic 