## SPARK_COVID - EDUARDO CRUZ

### Projeto para integrar dados públicos da COVID e gerar análises.

In [3]:
# Importar dependencias:
from pyspark.sql.functions import *
from pyspark.sql.types import *


# Setando configurações para permitir particionamento dinamico.
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.hadoop.hive.exec.max.dynamic.partitions", "30000")



In [4]:
# Criando schema
columnsList = [ StructField("regiao", StringType()), 
               StructField("estado", StringType()), 
               StructField("municipio", StringType()), 
               StructField("coduf", StringType()), 
               StructField("codmun", StringType()), 
               StructField("codRegiaoSaude", StringType()), 
               StructField("nomeRegiaoSaude", StringType()), 
               StructField("data", DateType()), 
               StructField("semanaEpi", StringType()), 
               StructField("populacaoTCU2019", LongType()), 
               StructField("casosAcumulado", LongType()), 
               StructField("casosNovos", LongType()), 
               StructField("obitosAcumulado", LongType()), 
               StructField("obitosNovos", LongType()), 
               StructField("Recuperadosnovos", LongType()), 
               StructField("emAcompanhamentoNovos", LongType()), 
               StructField("interiormetropolitana", StringType())]

covidSchema = StructType(columnsList)

# Fazendo a leitura do CSV e vinculando o schema criado.
covidDF = spark.read.option("header","true").option("delimiter", ";").schema(covidSchema).csv("/user/eduardo/spark_covid/tz/covid/*.csv")

In [None]:
# Salvar dados em uma tabela hive particionada por "municipio"
# Rotina executada com sucesso no spark-shell, porem aqui no pyspark não foi possível alterar o limite de partições, mesmo
# após várias tentativas.
covidDF.write.mode("overwrite").partitionBy("municipio").format("hive").saveAsTable("covid")


In [5]:
# Gerando a visualização do Total de:
# - Casos Recuperados 
# - Em Acompanhamento
casosRecAcom = covidDF.sort(desc("data")).filter(col("regiao") == "Brasil").limit(1).agg(format_number(max("Recuperadosnovos"), 1).alias("Casos_Recuperados"),format_number(max("emAcompanhamentoNovos"),1).alias("Em_Acompanhamento"))
casosRecAcom.show()

+-----------------+-----------------+
|Casos_Recuperados|Em_Acompanhamento|
+-----------------+-----------------+
|     17,262,646.0|      1,065,477.0|
+-----------------+-----------------+



In [6]:
# Gerando a visualização do Total de:
# - Casos Confirmados
#  - Acumulado
#  - Casos novos 
#  - Incidência 

casosConfirmados = covidDF.sort(desc("data")).filter(col("regiao") == "Brasil").limit(1).agg(format_number(max("casosAcumulado"),1).alias("Casos_Acumulados"),format_number(max("casosNovos"),1).alias("Casos_Novos"), format_number(sum((col("casosAcumulado") / col("populacaoTCU2019")) * 100000),1).alias("Incidencia"))
casosConfirmados.show()


+----------------+-----------+----------+
|Casos_Acumulados|Casos_Novos|Incidencia|
+----------------+-----------+----------+
|    18,855,015.0|   62,504.0|   8,972.3|
+----------------+-----------+----------+



In [7]:
# Gerando a visualização do Total de:
# - Óbitos Confirmados
#   - Acumulado
#   - Casos Novos
#   - Letalidade
#   - Mortalidade 

obitosConfirmados = covidDF.sort(desc("data")).filter(col("regiao") == "Brasil").limit(1).agg(format_number(max("obitosAcumulado"),1).alias("Obitos_Acumulados"),format_number(max("obitosNovos"),1).alias("Obitos_Novos"), format_number(sum((col("obitosAcumulado") / col("casosAcumulado")) * 100),1).alias("Letalidade"), format_number(sum((col("obitosAcumulado") / col("populacaoTCU2019")) * 100000),1).alias("Mortalidade"))
obitosConfirmados.show()


+-----------------+------------+----------+-----------+
|Obitos_Acumulados|Obitos_Novos|Letalidade|Mortalidade|
+-----------------+------------+----------+-----------+
|        526,892.0|     1,780.0|       2.8|      250.7|
+-----------------+------------+----------+-----------+



In [8]:
# Salvando a visualização de Casos Recuperados e Casos em acompanhamento em uma tabela Hive
casosRecAcom.write.mode("overwrite").format("hive").saveAsTable("casosRecAcom1")

In [9]:
!hdfs dfs -ls /user/hive/warehouse/casosrecacom1/

Found 1 items
-rwxrwxr-x   2 root supergroup         25 2021-11-09 00:38 /user/hive/warehouse/casosrecacom1/part-00000-bd624d4c-6a3d-4e59-b7f7-3002122a64d2-c000


In [10]:
# Salvando a visualização de Casos Confirmados em formato parquet e compressão snappy
casosConfirmados.write.mode("overwrite").option("compression", "snappy").parquet("/user/eduardo/casosConfirmados")

In [11]:
!hdfs dfs -ls /user/eduardo/casosConfirmados/

Found 2 items
-rw-r--r--   2 root supergroup          0 2021-11-09 00:39 /user/eduardo/casosConfirmados/_SUCCESS
-rw-r--r--   2 root supergroup       1036 2021-11-09 00:39 /user/eduardo/casosConfirmados/part-00000-cb80427e-9775-4216-8bbe-cb469e3949f0-c000.snappy.parquet


In [12]:
# Salvando a visualização de Obitos Confirmados em um topico no KAFKA

obitosConfirmados.selectExpr("to_json(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", "kafka:9092").option("topic", "topic-obitosConfirmados").save()

In [13]:
# Lendo os dados gravados no Kakfa e exibindo os valores

kafka_df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "topic-obitosConfirmados") \
.option("startingOffsets", "earliest") \
.load()

topic_string = kafka_df.select(col("value").cast("string"))
topic_string.show()


+--------------------+
|               value|
+--------------------+
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
|{"Obitos_Acumulad...|
+--------------------+



In [14]:
# Criar visualização da Sintese dos dados de casos, obitos, incidencia e mortalidade.
# - Nessa primeira etapa foi feito o agrupamento por estado e região. 
sinteseEstados = covidDF.filter(col("regiao").isNotNull()).groupBy("estado", "regiao").agg(max("casosAcumulado").alias("casosAcumulado"), max("obitosAcumulado").alias("obitosAcumulado"), max("populacaoTCU2019").alias("populacaoTCU2019"), last("data").alias("data")).sort(asc("regiao"),asc("estado"),desc("data"))
sinteseEstados.show()

+------+------------+--------------+---------------+----------------+----------+
|estado|      regiao|casosAcumulado|obitosAcumulado|populacaoTCU2019|      data|
+------+------------+--------------+---------------+----------------+----------+
|  null|      Brasil|      18855015|         526892|       210147125|2021-07-06|
|    DF|Centro-Oeste|        434708|           9322|         3015268|2021-07-06|
|    GO|Centro-Oeste|        686433|          19485|         7018354|2021-07-06|
|    MS|Centro-Oeste|        339323|           8400|         2778986|2021-07-06|
|    MT|Centro-Oeste|        456155|          12000|         3484466|2021-07-06|
|    AL|    Nordeste|        220793|           5450|         3337357|2021-07-06|
|    BA|    Nordeste|       1141612|          24428|        14873064|2021-07-06|
|    CE|    Nordeste|        894678|          22791|         9132078|2021-07-06|
|    MA|    Nordeste|        322052|           9190|         7075181|2021-07-06|
|    PB|    Nordeste|       

In [15]:
# Nessa etapa foi feito o resumo por Região
sinteseRegiao = sinteseEstados.groupBy("regiao").agg(sum("casosAcumulado").alias("Casos"), sum("obitosAcumulado").alias("Obitos"), sum("populacaoTCU2019").alias("populacaoRegiao"), last("data").alias("Atualizacao")).sort(asc("regiao"))
sinteseRegiao.show()

+------------+--------+------+---------------+-----------+
|      regiao|   Casos|Obitos|populacaoRegiao|Atualizacao|
+------------+--------+------+---------------+-----------+
|      Brasil|18855015|526892|      210147125| 2021-07-06|
|Centro-Oeste| 1916619| 49207|       16297074| 2021-07-06|
|    Nordeste| 4455737|107824|       57071654| 2021-07-06|
|       Norte| 1732862| 43845|       18430980| 2021-07-06|
|     Sudeste| 7138803|245311|       88371433| 2021-07-06|
|         Sul| 3611041| 80705|       29975984| 2021-07-06|
+------------+--------+------+---------------+-----------+



In [16]:
# Nessa é feito o resultado geral, incluindo os cálculos de incidencia e mortalidade.
sinteseGeral = sinteseRegiao.withColumn("Incidencia", (col("Casos") / col("populacaoRegiao")) * 100000).withColumn("Mortalidade", (col("Obitos") / col("populacaoRegiao")) * 100000)
sinteseGeral.show()

+------------+--------+------+---------------+-----------+------------------+------------------+
|      regiao|   Casos|Obitos|populacaoRegiao|Atualizacao|        Incidencia|       Mortalidade|
+------------+--------+------+---------------+-----------+------------------+------------------+
|      Brasil|18855015|526892|      210147125| 2021-07-06| 8972.292625940041|250.72529543290204|
|Centro-Oeste| 1916619| 49207|       16297074| 2021-07-06|11760.509892757436| 301.9376361670813|
|    Nordeste| 4455737|107824|       57071654| 2021-07-06| 7807.268035371816|188.92741394878797|
|       Norte| 1732862| 43845|       18430980| 2021-07-06| 9401.898325536678|237.88751330640042|
|     Sudeste| 7138803|245311|       88371433| 2021-07-06| 8078.179517582339| 277.5908363961915|
|         Sul| 3611041| 80705|       29975984| 2021-07-06|12046.446915637533| 269.2321960139824|
+------------+--------+------+---------------+-----------+------------------+------------------+



In [17]:
# Para finalizar, nessa etapa foi feita a formatação dos valores para uma melhor leitura. 
sinteseFinal = sinteseGeral.groupBy("regiao").agg(format_number(last("Casos"),1).alias("Casos"),format_number(last("Obitos"),1).alias("Obitos"),format_number(last("Incidencia"),1).alias("Incidencia"),format_number(last("Mortalidade"),1).alias("Mortalidade"),last("Atualizacao").alias("Atualizacao"))
sinteseFinal.show()

+------------+------------+---------+----------+-----------+-----------+
|      regiao|       Casos|   Obitos|Incidencia|Mortalidade|Atualizacao|
+------------+------------+---------+----------+-----------+-----------+
|      Brasil|18,855,015.0|526,892.0|   8,972.3|      250.7| 2021-07-06|
|Centro-Oeste| 1,916,619.0| 49,207.0|  11,760.5|      301.9| 2021-07-06|
|    Nordeste| 4,455,737.0|107,824.0|   7,807.3|      188.9| 2021-07-06|
|       Norte| 1,732,862.0| 43,845.0|   9,401.9|      237.9| 2021-07-06|
|     Sudeste| 7,138,803.0|245,311.0|   8,078.2|      277.6| 2021-07-06|
|         Sul| 3,611,041.0| 80,705.0|  12,046.4|      269.2| 2021-07-06|
+------------+------------+---------+----------+-----------+-----------+

