# Projeto Final de Spark

*Diego Silva Cunha*

**Nível Básico:**

Dados: https://mobileapps.saude.gov.br/esus-vepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

Referência das Visualizações:
* Site: https://covid.saude.gov.br/
* Guia do Site: Painel Geral

**1. Enviar os dados para o hdfs**

Salvei os arquivos dentro da pasta /input/dados e faço o envio dos arquivos para o HDFS onde será mapeada a tabela Hive 'painel_covid_temp'.
```
hdfs dfs -put /input/dados /user/diego/trabalho-final/data/painel_covid_temp
```

**2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município.**

Conectar ao Hive com o comando abaixo:
```
beeline -u jdbc:hive2://localhost:10000
```
Criando o banco de dados Hive
```
create database trabalhofinal;
use trabalhofinal;
```
Criando tabela Hive 'painel_covid_temp'
```
create external table painel_covid_temp (
    regiao string,
    estado string,
    municipio string,
    coduf int,
    codmun int,
    codRegiaoSaude int,
    nomeRegiaoSaude string,
    data date,
    semanaEpi int,
    populacaoTCU2019 int,
    casosAcumulado int,
    casosNovos int,
    obitosAcumulado int,
    obitosNovos int,
    Recuperadosnovos int,
    emAcompanhamentoNovos int,
    interior_metropolitana int
)
row format delimited
fields terminated by ';'
location'/user/diego/trabalho-final/data/painel_covid_temp'
tblproperties("skip.header.line.count"="1");
```
Realizando a configuração do Hive para suportar particionamento dinâmico.
```
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
```
Criando a tabela 'painel_covid' particionada por regiões do Brasil.
```
create table painel_covid (
    estado string,
    municipio string,
    coduf int,
    codmun int,
    codRegiaoSaude int,
    nomeRegiaoSaude string,
    data date,
    semanaEpi int,
    populacaoTCU2019 int,
    casosAcumulado int,
    casosNovos int,
    obitosAcumulado int,
    obitosNovos int,
    Recuperadosnovos int,
    emAcompanhamentoNovos int,
    interior_metropolitana int
)
partitioned by (regiao string);
```
Carregando os dados da tabela 'painel_covid_temp' para a tabela particionada por região 'painel_covid'.
```
insert overwrite table painel_covid partition(regiao) select estado,municipio,coduf,codmun,codRegiaoSaude,nomeRegiaoSaude,data,semanaEpi,populacaoTCU2019,casosAcumulado,casosNovos,obitosAcumulado,obitosNovos,Recuperadosnovos,emAcompanhamentoNovos,interior_metropolitana,regiao from painel_covid_temp;
```

In [15]:
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType

def calcular_incidencia(casos, populacao, por_habitantes):
    return (casos * por_habitantes) / populacao

def calcular_mortalidade(obitos, populacao, por_habitantes):
    return (obitos * por_habitantes) / populacao

def calcular_letalidade(obitos, casos):
    return (obitos * 100) / casos

spark.sparkContext.setLogLevel("INFO")
spark.catalog.setCurrentDatabase(dbName="trabalhofinal")

**3. Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:**

In [2]:
painel_covid = spark.read.table("painel_covid")

Vizualização 1 - Casos recuperados e em acompahamento

In [3]:
vizualizacao_1 = painel_covid.select("recuperadosnovos","emacompanhamentonovos")\
            .agg({"recuperadosnovos": "max", "emacompanhamentonovos": "max"})\
            .withColumnRenamed("max(recuperadosnovos)","recuperadosnovos")\
            .withColumnRenamed("max(emacompanhamentonovos)","emacompanhamentonovos")
vizualizacao_1.show()

+----------------+---------------------+
|recuperadosnovos|emacompanhamentonovos|
+----------------+---------------------+
|        17262646|              1317658|
+----------------+---------------------+



Vizualização 2 - Casos confirmados

In [18]:
vizualizacao_2 = painel_covid.select("casosacumulado","casosnovos","populacaoTCU2019")\
            .agg({"casosacumulado":"max","casosnovos":"max","populacaoTCU2019":"max"})\
            .withColumnRenamed("max(casosacumulado)","casosacumulado")\
            .withColumnRenamed("max(casosnovos)","casosnovos")\
            .withColumnRenamed("max(populacaoTCU2019)","populacaoTCU2019")\
            .withColumn("incidencia*", calcular_incidencia(col("casosacumulado").cast(FloatType()), col("populacaoTCU2019").cast(FloatType()),1000000))
vizualizacao_2.show()

+----------------+--------------+----------+-----------------+
|populacaoTCU2019|casosacumulado|casosnovos|      incidencia*|
+----------------+--------------+----------+-----------------+
|       210147125|      18855015|    115228|89722.93068467462|
+----------------+--------------+----------+-----------------+



Vizualização 3 - Óbitos confirmados

In [17]:
vizualizacao_3 = painel_covid.select("casosacumulado","obitosacumulado","obitosnovos","populacaoTCU2019")\
            .agg({"casosacumulado":"max","obitosacumulado":"max","obitosnovos":"max","populacaoTCU2019":"max"})\
            .withColumnRenamed("max(casosacumulado)","casosacumulado")\
            .withColumnRenamed("max(obitosacumulado)","obitosacumulado")\
            .withColumnRenamed("max(obitosnovos)","obitosnovos")\
            .withColumnRenamed("max(populacaoTCU2019)","populacaoTCU2019")\
            .withColumn("letalidade", calcular_letalidade(col("obitosacumulado").cast(FloatType()), col("casosacumulado").cast(FloatType())))\
            .withColumn("mortalidade", calcular_mortalidade(col("obitosacumulado").cast(FloatType()), col("populacaoTCU2019").cast(FloatType()),1000000))
vizualizacao_3.show()

+----------------+-----------+---------------+--------------+-----------------+------------------+
|populacaoTCU2019|obitosnovos|obitosacumulado|casosacumulado|       letalidade|       mortalidade|
+----------------+-----------+---------------+--------------+-----------------+------------------+
|       210147125|       4249|         526892|      18855015|2.794439421318974|2507.2530493113586|
+----------------+-----------+---------------+--------------+-----------------+------------------+



**4. Salvar a primeira visualização como tabela Hive**

In [210]:
vizualizacao_1.write.saveAsTable("trabalhofinal.v1_casos_recuperados")

**5. Salvar a segunda visualização com formato parquet e compressão snappy**

In [215]:
vizualizacao_2.write.option("compression", "snappy")\
                    .parquet("/user/diego/trabalho-final/data/v2_casos_confirmados")

**6. Salvar a terceira visualização em um tópico no Kafka**

In [46]:
vizualizacao_3.selectExpr("CONCAT(1) AS value", "CAST(obitosacumulado AS STRING)","CAST(obitosnovos AS STRING)","CAST(letalidade AS STRING)","CAST(mortalidade AS STRING)")\
            .write\
            .format("kafka")\
            .option("kafka.bootstrap.servers","kafka:9092")\
            .option("topic","v3-obitos-confirmados")\
            .option("value", "1")\
            .option("checkpointLocation","/user/diego/trabalho-final/data/kafka/checkpoint")\
            .save()

**7. Criar a visualização pelo Spark com os dados enviados para o HDFS:**

In [20]:
vizualizacao_regioes = painel_covid.where((col("municipio")!="") | (col("regiao")=="Brasil"))\
            .groupBy("regiao","municipio").agg({"casosacumulado":"max","obitosacumulado":"max","populacaoTCU2019":"max"})\
            .withColumnRenamed("max(casosacumulado)","casosacumulado")\
            .withColumnRenamed("max(obitosacumulado)","obitosacumulado")\
            .withColumnRenamed("max(populacaoTCU2019)","populacaoTCU2019")\
            .groupBy("regiao").agg({"casosacumulado":"sum","obitosacumulado":"sum","populacaoTCU2019":"max"})\
            .withColumnRenamed("sum(casosacumulado)","casosacumulado")\
            .withColumnRenamed("sum(obitosacumulado)","obitosacumulado")\
            .withColumnRenamed("max(populacaoTCU2019)","populacaoTCU2019")\
            .withColumn("incidencia100mil", calcular_incidencia100mil(col("casosacumulado").cast(FloatType()), col("populacaoTCU2019").cast(FloatType()),100000))\
            .withColumn("mortalidade100mil", calcular_mortalidade100mil(col("obitosacumulado").cast(FloatType()), col("populacaoTCU2019").cast(FloatType()),100000))
vizualizacao_regioes.show()

+------------+----------------+---------------+--------------+------------------+------------------+
|      regiao|populacaoTCU2019|obitosacumulado|casosacumulado|  incidencia100mil| mortalidade100mil|
+------------+----------------+---------------+--------------+------------------+------------------+
|    Nordeste|         2872347|         115502|       4426773|154116.92847417112|4021.1714698815986|
|         Sul|         1933105|          85038|       3780833|195583.43267230698|4399.0366834703755|
|     Sudeste|        12252023|         244856|       7129027|58186.530106252656|1998.4943900284875|
|Centro-Oeste|         3015268|          49490|       1916634|63564.302444757814|1641.3135389623742|
|      Brasil|       210147125|         526892|      18855015| 8972.293567439801|250.72529713469308|
|       Norte|         2182763|          43929|       1730152| 79264.31034427467|2012.5410298781865|
+------------+----------------+---------------+--------------+------------------+----------

**8. Salvar a visualização do exercício 6 em um tópico no Elastic**

**9. Criar um dashboard no Elastic para visualização dos novos dados enviados**