# Projeto Semantix Covid-19

Curso de Big Data Engineer<br/>
Aluno: Saulo Deboni

### 1. Enviar os dados para o HDFS
O arquivo *docker-compose.yml* foi modificado para ler a pasta do projeto.<br/>
Foi criada no HDFS a pasta */user/saulo/projeto-covid* e adicionados os arquivos pelo terminal.

In [6]:
!hdfs dfs -ls /user/saulo/projeto-covid/data/

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2022-04-23 13:29 /user/saulo/projeto-covid/data/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-04-23 13:29 /user/saulo/projeto-covid/data/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-04-23 13:29 /user/saulo/projeto-covid/data/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-04-23 13:29 /user/saulo/projeto-covid/data/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


### 2. Otimizar os dados para Tabela Hive
Foi criado o banco de dados *covid_hive* no HDSF para salvar a tabela otimizada.<br/>
Antes foi necessário analisar e limpar o conteúdo da tabela para evitar erros.

In [1]:
hist_painel_covid = spark.read.option("sep",";").option("header","true").csv("/user/saulo/projeto-covid/data/")

In [2]:
# Mostrando apenas as primeiras colunas para facilitar a visualização
hist_painel_covid.select("regiao", "estado", "municipio", "data", "casosAcumulado").show(10)

+------+------+---------+----------+--------------+
|regiao|estado|municipio|      data|casosAcumulado|
+------+------+---------+----------+--------------+
|Brasil|  null|     null|2021-01-01|       7700578|
|Brasil|  null|     null|2021-01-02|       7716405|
|Brasil|  null|     null|2021-01-03|       7733746|
|Brasil|  null|     null|2021-01-04|       7753752|
|Brasil|  null|     null|2021-01-05|       7810400|
|Brasil|  null|     null|2021-01-06|       7873830|
|Brasil|  null|     null|2021-01-07|       7961673|
|Brasil|  null|     null|2021-01-08|       8013708|
|Brasil|  null|     null|2021-01-09|       8075998|
|Brasil|  null|     null|2021-01-10|       8105790|
+------+------+---------+----------+--------------+
only showing top 10 rows



In [34]:
# Renomendo a coluna interior/metropolitana para remover a barra
hist_painel_covid.withColumnRenamed("interior/metropolitana","interior_metropolitana").printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: string (nullable = true)
 |-- codmun: string (nullable = true)
 |-- codRegiaoSaude: string (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: string (nullable = true)
 |-- semanaEpi: string (nullable = true)
 |-- populacaoTCU2019: string (nullable = true)
 |-- casosAcumulado: string (nullable = true)
 |-- casosNovos: string (nullable = true)
 |-- obitosAcumulado: string (nullable = true)
 |-- obitosNovos: string (nullable = true)
 |-- Recuperadosnovos: string (nullable = true)
 |-- emAcompanhamentoNovos: string (nullable = true)
 |-- interior_metropolitana: string (nullable = true)



**OBSERVAÇÃO:** Não foi possível fazer o particionamento por *municipio* por falta de memória do computador.<br/>
Portanto, para poder seguir com o projeto, o particionamento foi feito com *estado*.

Foi criada uma nova tabela *clean_painel_estado* excluindo os dados nulos de *estado*.<br/>
A nova tabela foi salva no HDFS para ser otimizada no hive pelo terminal.

In [3]:
clean_painel_estado = hist_painel_covid.filter((hist_painel_covid.municipio.isNotNull()) & (hist_painel_covid.estado.isNotNull()))

In [4]:
clean_painel_estado.select("regiao", "estado", "municipio", "data", "casosAcumulado").show(10)

+------+------+--------------------+----------+--------------+
|regiao|estado|           municipio|      data|casosAcumulado|
+------+------+--------------------+----------+--------------+
| Norte|    RO|Alta Floresta D'O...|2021-01-01|          1291|
| Norte|    RO|Alta Floresta D'O...|2021-01-02|          1307|
| Norte|    RO|Alta Floresta D'O...|2021-01-03|          1326|
| Norte|    RO|Alta Floresta D'O...|2021-01-04|          1333|
| Norte|    RO|Alta Floresta D'O...|2021-01-05|          1342|
| Norte|    RO|Alta Floresta D'O...|2021-01-06|          1375|
| Norte|    RO|Alta Floresta D'O...|2021-01-07|          1392|
| Norte|    RO|Alta Floresta D'O...|2021-01-08|          1420|
| Norte|    RO|Alta Floresta D'O...|2021-01-09|          1468|
| Norte|    RO|Alta Floresta D'O...|2021-01-10|          1485|
+------+------+--------------------+----------+--------------+
only showing top 10 rows



In [10]:
clean_painel_estado.write.csv("/user/saulo/projeto-covid/data_estado")

A tabela *painel_estado_covid* foi criada no hive, particionada por *estado*.

In [12]:
spark.catalog.setCurrentDatabase("covid_hive")
spark.sql("select regiao, estado, municipio, data, casosAcumulado from painel_covid_estado").show(10)

+------------+------+---------+----------+--------------+
|      regiao|estado|municipio|      data|casosAcumulado|
+------------+------+---------+----------+--------------+
|Centro-Oeste|    MT| Acorizal|2021-01-01|           206|
|Centro-Oeste|    MT| Acorizal|2021-01-02|           206|
|Centro-Oeste|    MT| Acorizal|2021-01-03|           206|
|Centro-Oeste|    MT| Acorizal|2021-01-04|           206|
|Centro-Oeste|    MT| Acorizal|2021-01-05|           206|
|Centro-Oeste|    MT| Acorizal|2021-01-06|           206|
|Centro-Oeste|    MT| Acorizal|2021-01-07|           206|
|Centro-Oeste|    MT| Acorizal|2021-01-08|           208|
|Centro-Oeste|    MT| Acorizal|2021-01-09|           209|
|Centro-Oeste|    MT| Acorizal|2021-01-10|           209|
+------------+------+---------+----------+--------------+
only showing top 10 rows



### 3. Criar três visualizações pelo Spark com os dados do HDFS
Para criar as visualizações, foi criado um novo schema para dataframe original.

In [1]:
from pyspark.sql.types import *
from pyspark.sql import functions as f

header_lista = [
StructField("regiao", StringType()),
StructField("estado", StringType()),
StructField("municipio", StringType()),
StructField("coduf", StringType()),
StructField("codmun", StringType()),
StructField("codRegiaoSaude", StringType()),
StructField("nomeRegiaoSaude", StringType()),
StructField("data", DateType()),
StructField("semanaEpi", StringType()),
StructField("populacaoTCU2019", IntegerType()),
StructField("casosAcumulado", IntegerType()),
StructField("casosNovos", IntegerType()),
StructField("obitosAcumulado", IntegerType()),
StructField("obitosNovos", IntegerType()),
StructField("Recuperadosnovos", IntegerType()),
StructField("emAcompanhamentoNovos", IntegerType()),
StructField("interior_metropolitana", BooleanType())
]

header_schema = StructType(header_lista)
painel_analise = spark.read.option("sep",";").option("header","true").schema(header_schema).csv("/user/saulo/projeto-covid/data/")

In [2]:
# Removendo os dados nulos para estado e município
painel_analise_clean = painel_analise.filter((painel_analise.municipio.isNotNull()) & (painel_analise.estado.isNotNull()))

**3.1 Visualização 01:** Os estados com maior número de casos X Total de óbitos

In [41]:
casos_estado = painel_analise_clean.groupBy("estado").agg(f.sum("casosNovos").alias("Total de Casos"),\
                                                          f.sum("obitosNovos").alias("Total de Óbitos"))\
.sort(f.desc("Total de Casos"))
casos_estado.show(10)

+------+--------------+---------------+
|estado|Total de Casos|Total de Óbitos|
+------+--------------+---------------+
|    SP|       3808286|         130389|
|    MG|       1831932|          46820|
|    PR|       1301721|          31518|
|    RS|       1235914|          31867|
|    BA|       1130748|          24164|
|    SC|       1066484|          17146|
|    RJ|        970268|          56192|
|    CE|        884098|          22791|
|    GO|        686433|          19485|
|    PE|        561505|          17953|
+------+--------------+---------------+
only showing top 10 rows



**3.2 Visualização 02:** Cidades pequenas (até 50 mil habitantes) com maior número de óbitos

In [6]:
cidades_50 = painel_analise_clean.filter(painel_analise_clean.populacaoTCU2019 < 50000)

casos_cidades_50 = cidades_50.groupBy("municipio").agg(f.first("estado").alias("Estado"),\
                                                       f.sum("obitosNovos").alias("Total de Óbitos"),\
                                                       f.max("populacaoTCU2019").alias("População"))\
.sort(f.desc("Total de Óbitos"))
casos_cidades_50.show(10)

+--------------------+------+---------------+---------+
|           municipio|Estado|Total de Óbitos|População|
+--------------------+------+---------------+---------+
|              Guaíra|    SP|            269|    40790|
|             Dracena|    SP|            229|    46793|
|       Guajará-Mirim|    RO|            223|    46174|
|    Pontes e Lacerda|    MT|            220|    45436|
|               Jales|    SP|            210|    49107|
|São Gonçalo do Am...|    CE|            204|    48422|
|             Iturama|    MG|            203|    39263|
|Santa Helena de G...|    GO|            200|    38648|
|      Novo Horizonte|    BA|            193|    41052|
|Barra de São Fran...|    ES|            192|    44650|
+--------------------+------+---------------+---------+
only showing top 10 rows



**3.3 Visualização 03:** Épocas do ano (mês-ano) com maior número de novos casos

In [18]:
from pyspark.sql.functions import unix_timestamp, from_unixtime

mes_ano = painel_analise_clean.withColumn("mes_ano", from_unixtime(unix_timestamp\
                                                                   (f.col("data"), "yyy-MM-dd"), "MM-yyyy"))

casos_epocas = mes_ano.groupBy("mes_ano").agg(f.sum("casosNovos").alias("Total de Casos"),\
                                              f.max("casosNovos").alias("Máximo de Casos em um Dia"))\
.sort(f.desc("Total de Casos"))
casos_epocas.show(10)

+-------+--------------+-------------------------+
|mes_ano|Total de Casos|Máximo de Casos em um Dia|
+-------+--------------+-------------------------+
|03-2021|       2208681|                    40170|
|06-2021|       2008062|                    47815|
|04-2021|       1908520|                     8646|
|05-2021|       1884548|                    47101|
|01-2021|       1521819|                     3632|
|02-2021|       1342473|                     7592|
|12-2020|       1334194|                    14338|
|07-2020|       1254979|                     6800|
|08-2020|       1239204|                     7063|
|09-2020|        898596|                     3680|
+-------+--------------+-------------------------+
only showing top 10 rows



### 7. Criar a visualização pelo Spark dos dados do HDFS
Síntese de casos, óbitos, incidência e mortalidade

In [39]:
taxa_casos = (f.sum("casosNovos")/f.sumDistinct("populacaoTCU2019"))*100000
taxa_mortes = (f.sum("obitosNovos")/f.sumDistinct("populacaoTCU2019"))*100000

sintese = painel_analise_clean.groupBy("regiao").agg(f.sum("casosNovos").alias("Casos"),\
                f.sum("obitosNovos").alias("Óbitos"),\
                f.regexp_replace(f.format_number(taxa_casos,0), ",", ".").alias("Incidência/100k"),\
                f.format_number(taxa_mortes,0).alias("Mortalidade/100k"),\
                f.last("data").alias("Data"))

sintese.show()

+------------+-------+------+---------------+----------------+----------+
|      regiao|  Casos|Óbitos|Incidência/100k|Mortalidade/100k|      Data|
+------------+-------+------+---------------+----------------+----------+
|    Nordeste|4434293|107560|           7832|             190|2021-07-06|
|         Sul|3604119| 80531|          12081|             270|2021-07-06|
|     Sudeste|7129450|244771|           8103|             278|2021-07-06|
|Centro-Oeste|1916619| 49207|          11784|             303|2021-07-06|
|       Norte|1730197| 43830|           9404|             238|2021-07-06|
+------------+-------+------+---------------+----------------+----------+

