## /SEMANTIX_ACADEMY 

### /PROJETO_FINAL_SPARK // NÍVEL BÁSICO // CAMPANHA NACIONAL DE VACINAÇÃO CONTRA CONVID-19

    /ALUNO: FELIPE CHAGAS DE SOUZA
    /E-MAIL: souza.chagas.felipe@gmail.com
    /TREINAMENTO: BIG DATA ENGINNER // 2022
    

#### IMPORTS

In [41]:
import os
import requests
import shutil
import logging
import rarfile
from pyspark.sql.functions import *
from pyspark.sql.types import *

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('LOGGER')
logger.info('Imports e setup do logger realizados!')

INFO:LOGGER:Imports e setup do logger realizados!


#### DEFININDO FUNÇÕES PARA DOWNLOAD E EXTRAÇÃO DOS ARQUIVOS

In [45]:
def download_file(url):
    logger.info('Realizando download do arquivo da origem saude.gov.br')
    url = url

    request = requests.get(url)
    
    return request

def get_file_name(url):
    file_name = url.split('/')[-1]
    
    return file_name


def extract_rar_files(diretorio_destino, diretorio_origem, url_arquivo):
    url = download_file(url_arquivo)
    logger.info('Download dos arquivos da origem saude.gov.br realizado')
    file_name = get_file_name(url_arquivo)


    diretorio_destino = diretorio_destino
    arquivo_diretorio_origem = diretorio_origem + file_name
    arquivo_extracao = diretorio_destino + file_name

    if (os.path.exists(arquivo_extracao)) != True:

        with open(file_name,'wb') as output_file:
            output_file.write(request.content)

        shutil.move(arquivo_diretorio_origem, diretorio_destino)

    rf = rarfile.RarFile(arquivo_extracao, "r")
    rf.extractall(diretorio_destino)
    rf.close()

    logger.info('Extração completa, arquivos movidos e extraídos para a pasta ' + diretorio_destino)
    
logger.info("Funções para download e extração dos arquivos csv criadas com sucesso")
logger.info("Para utilizar o extrator de RAR files: extract_rar_files(diretorio_destino, diretorio_origem, url_arquivo)")

INFO:LOGGER:Funções para download e extração dos arquivos csv criadas com sucesso
INFO:LOGGER:Para utilizar o extrator de RAR files: extract_rar_files(diretorio_destino, diretorio_origem, url_arquivo)


#### DOWNLOAD E EXTRAÇÃO DOS ARQUIVOS

In [46]:
extract_rar_files('/mnt/notebooks/data/','/mnt/notebooks/project/', 'https://mobileapps.saude.gov.br/esus-vepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar')

INFO:LOGGER:Realizando download do arquivo da origem saude.gov.br
INFO:LOGGER:Download dos arquivos da origem saude.gov.br realizado
INFO:LOGGER:Extração completa, arquivos movidos e extraídos para a pasta /mnt/notebooks/data/


#### 1. Enviar os dados para o hdfs

In [5]:
!hdfs dfs -mkdir -p /user/felipe/desafio_spark_data_raw
!hdfs dfs -put /mnt/notebooks/data/*.csv /user/felipe/desafio_spark_data_raw
!hdfs dfs -ls /user/felipe/desafio_spark_data_raw/*.csv

logger.info('Arquivos Painel Covidbr da origem saude.gov.br copiados para o HDFS')

-rw-r--r--   2 root supergroup   62492959 2022-04-21 01:33 /user/felipe/desafio_spark_data_raw/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   2 root supergroup   76520681 2022-04-21 01:33 /user/felipe/desafio_spark_data_raw/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   2 root supergroup   91120916 2022-04-21 01:33 /user/felipe/desafio_spark_data_raw/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   2 root supergroup    3046774 2022-04-21 01:33 /user/felipe/desafio_spark_data_raw/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


INFO:LOGGER:Arquivos Painel Covidbr da origem saude.gov.br copiados para o HDFS


#### 2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município.

    EXPLORAÇÃO E AJUSTE DO SCHEMA

In [48]:
#Criação do df base
df_covid_brasil_base = spark.read.csv("/user/felipe/desafio_spark_data_raw/*.csv", sep=';'\
                                      , header='true', mode='DROPMALFORMED')

df_covid_brasil_base.printSchema()
logger.info("Dataframe df_covid_brasil_base criado com sucesso")

INFO:LOGGER:Dataframe df_covid_brasil_base criado com sucesso


root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: string (nullable = true)
 |-- codmun: string (nullable = true)
 |-- codRegiaoSaude: string (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: string (nullable = true)
 |-- semanaEpi: string (nullable = true)
 |-- populacaoTCU2019: string (nullable = true)
 |-- casosAcumulado: string (nullable = true)
 |-- casosNovos: string (nullable = true)
 |-- obitosAcumulado: string (nullable = true)
 |-- obitosNovos: string (nullable = true)
 |-- Recuperadosnovos: string (nullable = true)
 |-- emAcompanhamentoNovos: string (nullable = true)
 |-- interior/metropolitana: string (nullable = true)



In [49]:
#criando o df com o schema transformado
df_covid_brasil = df_covid_brasil_base.withColumn("coduf", col("coduf").cast(IntegerType()))\
                                        .withColumn("codmun", col("codmun").cast(IntegerType()))\
                                        .withColumn("codRegiaoSaude", col("codRegiaoSaude").cast(IntegerType()))\
                                        .withColumn("data", col("data").cast(TimestampType()))\
                                        .withColumn("semanaEpi", col("semanaEpi").cast(IntegerType()))\
                                        .withColumn("populacaoTCU2019", col("populacaoTCU2019").cast(IntegerType()))\
                                        .withColumn("casosAcumulado", col("casosAcumulado").cast(IntegerType()))\
                                        .withColumn("casosNovos", col("casosNovos").cast(IntegerType()))\
                                        .withColumn("obitosAcumulado", col("obitosAcumulado").cast(IntegerType()))\
                                        .withColumn("obitosNovos", col("obitosNovos").cast(IntegerType()))\
                                        .withColumn("Recuperadosnovos", col("Recuperadosnovos").cast(IntegerType()))\
                                        .withColumn("emAcompanhamentoNovos", col("emAcompanhamentoNovos").cast(IntegerType()))\
                                        .withColumn("interior/metropolitana", col("interior/metropolitana").cast(IntegerType()))

df_covid_brasil.printSchema()
logger.info("Dataframe df_covid_brasil criado com sucesso com schema alterado, baseado no df_covid_brasil_base")

INFO:LOGGER:Dataframe df_covid_brasil criado com sucesso com schema alterado, baseado no df_covid_brasil_base


root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [6]:
#Check Data
#quantidade de dados no df
print("Total Registros: {}".format(df_covid_brasil.count()))

#checando quantidade de dados com municipio nulo
print("Total registros com coluna municipio nula: {}".format(df_covid_brasil.filter(df_covid_brasil.municipio.isNull()).count()))


Total Registros: 2624943
Total registros com coluna municipio nula: 23753


    LIMPEZA DOS DADOS

In [50]:
#Aplicando 0 para as colunas numéricas que estão nulas
df_covid_brasil_fillna = df_covid_brasil.fillna(value=0, subset=["coduf","codmun",\
                                                                   "codRegiaoSaude", "semanaEpi",\
                                                                   "populacaoTCU2019", "casosAcumulado",\
                                                                   "casosNovos", "obitosAcumulado",\
                                                                   "obitosNovos", "Recuperadosnovos",\
                                                                   "emAcompanhamentoNovos", "interior/metropolitana"])

logger.info("Dataframe df_covid_brasil_fillna criado com sucesso, colunas do tipo int com valor nulo, recebe valor 0")

#Eliminando espaços no começo e fim das colunas do tipo string
df_covid_brasil_cleaned = df_covid_brasil_fillna.withColumn("regiao", rtrim(ltrim(lower(col("regiao")))))\
                                        .withColumn("estado", rtrim(ltrim(lower(col("estado")))))\
                                        .withColumn("municipio", rtrim(ltrim(lower(col("municipio")))))\
                                        .withColumn("nomeRegiaoSaude", rtrim(ltrim(lower(col("nomeRegiaoSaude")))))

logger.info("Dataframe df_covid_brasil_cleaned criado com sucesso, espaços no início e fim em colunas String foram removidos")
df_covid_brasil_cleaned.printSchema()

INFO:LOGGER:Dataframe df_covid_brasil_fillna criado com sucesso, colunas do tipo int com valor nulo, recebe valor 0
INFO:LOGGER:Dataframe df_covid_brasil_cleaned criado com sucesso, espaços no início e fim em colunas String foram removidos


root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



    INGERINDO OS DADOS NO APACHE HIVE

In [9]:
df_covid_brasil_cleaned.cache()
df_covid_brasil_cleaned.write.saveAsTable("tb_casos_covid", mode="overwrite", partitionBy="municipio")

logger.info("A tabela tb_casos_covid foi criada com sucesso no Hive")

INFO:LOGGER:A tabela tb_casos_covid foi criada com sucesso no Hive


In [52]:
logger.info("Validando criação Tabela tb_casos_covid no Hive")

tb_casos_covid = spark.read.table("tb_casos_covid")
tb_casos_covid.show(4)
tb_casos_covid.printSchema()

!hdfs dfs -ls /user/hive/warehouse/tb_casos_covid | head -4

INFO:LOGGER:Validando criação Tabela tb_casos_covid no Hive


+--------+------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+---------+
|  regiao|estado|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|municipio|
+--------+------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+---------+
|  brasil|  null|   76|     0|             0|           null|2021-01-01 00:00:00|       53|       210147125|       7700578|     24605|         195411|        462|         6756284|               748883|                     0|     null|
|nordeste|    pi|   22|220000|             0|           null

##### 3. Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:

![](https://raw.githubusercontent.com/chagasfelipe/semantix_spark_covid_challenge/main/img/painel_covid_escopo.PNG)

In [53]:
painel_casos_recuperados = tb_casos_covid.agg(sum("Recuperadosnovos").alias("casos_ecuperados")\
                                              , sum("emAcompanhamentoNovos").alias("em_acompanhamento"))

print("/CASOS RECUPERADOS")
painel_casos_recuperados.show(5)

/CASOS RECUPERADOS
+----------------+-----------------+
|casos_ecuperados|em_acompanhamento|
+----------------+-----------------+
|      2920055795|        320793426|
+----------------+-----------------+



In [54]:
confirmados_base = tb_casos_covid.agg(sum("casosAcumulado").alias("acumulado")\
                                      , sum("casosNovos").alias("casos_novos")
                                      , sum("populacaoTCU2019").alias("populacao"))

painel_casos_confirmados = confirmados_base.withColumn("incidencia", regexp_replace(format_number(\
                                        ((col('acumulado') * 100000) / col('populacao')), 1), ",", "")).drop('populacao')

print("/CASOS CONFIRMADOS")
painel_casos_confirmados.show()

/CASOS CONFIRMADOS
+----------+-----------+----------+
| acumulado|casos_novos|incidencia|
+----------+-----------+----------+
|9998172092|   56565045|    3247.6|
+----------+-----------+----------+



In [55]:
obitos_base = tb_casos_covid.agg(sum("casosAcumulado").alias("casos_acumulados")\
                                 , sum("obitosAcumulado").alias("acumulado")\
                                 , sum("obitosNovos").alias("casos_novos")\
                                 , sum("populacaoTCU2019").alias("populacao"))

painel_obitos_confirmados = obitos_base.withColumn("letalidade"\
                            , format_number(((col('acumulado') * 100) / col('casos_acumulados')), 2))\
                            .withColumn("mortalidade", format_number(((col('Acumulado') * 100000) / col('populacao')), 2))\
                            .drop("casos_acumulados","populacao")


print("/ÓBITOS CONFIRMADOS")
painel_obitos_confirmados.show(5)

/ÓBITOS CONFIRMADOS
+---------+-----------+----------+-----------+
|acumulado|casos_novos|letalidade|mortalidade|
+---------+-----------+----------+-----------+
|274784085|    1580676|      2.75|      89.25|
+---------+-----------+----------+-----------+



#### 4. Salvar a primeira visualização como tabela Hive

In [58]:
painel_casos_recuperados.write.saveAsTable("tb_casos_recuperados_covid", mode="overwrite", header="true")

!hdfs dfs -ls /user/hive/warehouse/tb_casos_recuperados_covid | head -5

logger.info("A visualização painel_casos_recuperados foi criada como tabela com sucesso no Hive")

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-21 16:34 /user/hive/warehouse/tb_casos_recuperados_covid/_SUCCESS
-rw-r--r--   2 root supergroup        748 2022-04-21 16:34 /user/hive/warehouse/tb_casos_recuperados_covid/part-00000-5acd4f20-f15c-40f7-9cad-bda449908ac3-c000.snappy.parquet


INFO:LOGGER:A visualização painel_casos_recuperados foi criada como tabela com sucesso no Hive


#### 5. Salvar a segunda visualização com formato parquet e compressão snappy

In [59]:
painel_casos_confirmados.write.parquet("/user/felipe/desafio_spark_data_raw/casos_confirmados", compression="snappy")

!hdfs dfs -ls /user/felipe/desafio_spark_data_raw/casos_confirmados | head -5

logger.info("A visualização painel_casos_confirmados foi criada como tabela com sucesso no Hive")

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-21 16:35 /user/felipe/desafio_spark_data_raw/casos_confirmados/_SUCCESS
-rw-r--r--   2 root supergroup        954 2022-04-21 16:35 /user/felipe/desafio_spark_data_raw/casos_confirmados/part-00000-c47a583a-d155-4cf9-b014-1a279ca04021-c000.snappy.parquet


INFO:LOGGER:A visualização painel_casos_confirmados foi criada como tabela com sucesso no Hive


#### 6. Salvar a terceira visualização em um tópico no Kafka

In [61]:
view_obitos_confirmados = painel_obitos_confirmados.select("acumulado","casos_novos","mortalidade")\
                                       .withColumnRenamed("mortalidade","value")\
                                       .withColumn("value",col("value").cast(StringType()))
logger.info("A visualização view_obitos_confirmados foi criada com sucesso")

INFO:LOGGER:A visualização view_obitos_confirmados foi criada com sucesso


In [62]:
view_obitos_confirmados.write.format('kafka')\
              .option("kafka.bootstrap.servers","kafka:9092")\
              .option("topic","topic_obitos_confirmados")\
              .save()

logger.info("A visualização painel_obitos_confirmados foi criada em um tópico kafka com sucesso!")

INFO:LOGGER:A visualização painel_obitos_confirmados foi criada em um tópico kafka com sucesso!


#### No terminal do container do kafka
##### docker exec -it kafka bash
##### kafka-topics.sh --bootstrap-server kafka:9092 --list

![](https://raw.githubusercontent.com/chagasfelipe/semantix_spark_covid_challenge/main/img/kafka_evidencia_criacao_topico.PNG)

#### 7. Criar a visualização pelo Spark com os dados enviados para o HDFS:

In [63]:
base_cases_view = tb_casos_covid.filter(col("data") == "2021-07-06")

casos_visualizacao = base_cases_view.groupBy("regiao", "data")\
    .agg(format_number(sum("casosAcumulado"), 0).alias("Casos")\
    , format_number(sum("obitosAcumulado"), 0).alias("Óbitos")\
    , format_number(sum("populacaoTCU2019"), 0).alias("População")\
    , format_number((sum("casosAcumulado")/(sum("populacaoTCU2019") / 100000)), 1).alias("Incidência")\
    , format_number((sum("obitosAcumulado")/(sum("populacaoTCU2019") / 100000)), 1).alias("Mortalidade"))\
    .sort(asc("regiao"))

print("/CASOS_COVID_BR")
casos_visualizacao.select(col("regiao")\
                          , col("Casos")\
                          , col("Óbitos")\
                          , col("Incidência")\
                          , col("Mortalidade")\
                          , date_format(col("data"), "yyyy-MM-dd HH:mm").alias("Atualização")).show()

/CASOS_COVID_BR
+------------+----------+-------+----------+-----------+----------------+
|      regiao|     Casos| Óbitos|Incidência|Mortalidade|     Atualização|
+------------+----------+-------+----------+-----------+----------------+
|      brasil|18,855,015|526,892|   8,972.3|      250.7|2021-07-06 00:00|
|centro-oeste| 3,833,238| 98,414|  11,760.5|      301.9|2021-07-06 00:00|
|    nordeste| 8,911,474|215,648|   7,807.3|      188.9|2021-07-06 00:00|
|       norte| 3,465,630| 87,690|   9,401.6|      237.9|2021-07-06 00:00|
|     sudeste|14,277,606|490,622|   8,078.2|      277.6|2021-07-06 00:00|
|         sul| 7,222,082|161,410|  12,046.4|      269.2|2021-07-06 00:00|
+------------+----------+-------+----------+-----------+----------------+



#### 8. Salvar a visualização do exercício 6 em um tópico no Elastic

##### NÃO CONSEGUI ENCONTRAR A BIBLIOTECA CORRETA PARA REALIZAR A CONEXÃO COM O ELASTIC COM O CÓDIGO ABAIXO:
``` python
view_obitos_confirmados.write.format("org.elasticsearch.spark.sql")\
                                .option("es.index.auto.create", "true")\
                                .option("es.nodes", "localhost")\
                                .option("es.port", "9243")\
                                .option("es.resource", "covid/elastic_obitos_confirmados")\
                                .option("es.nodes.wan.only", "true")\
                                .save()
```

##### Tentei com o seguintes jars:
    https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/7.13.3/elasticsearch-hadoop-7.13.3.jar
    https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.4.1/elasticsearch-hadoop-2.4.1.jar
    https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.11/7.9.2/elasticsearch-spark-20_2.11-7.9.2.jar
    https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/7.9.2/elasticsearch-hadoop-7.9.2.jar
    

In [37]:
#COMO NÃO CONSEGUI ENCONTRAR A BIBLIOTECA CORRETA E REALIZAR A GRAVAÇÃO DA VIEW EM UM TÓPICO DO ELASTIC, 
#FIZ A GERAÇÃO DA VIEW EM CSV E REALIZEI A IMPORTAÇÃO MANUAL DOS DADOS ATRAVÉS DO KIBANA: MACHINE LEARNING > DATA ANALYZER

view_obitos_confirmados.write.csv("/user/felipe/desafio_spark_data_raw/elastic/obitos_confirmados", mode="overwrite"\
                                  , sep=",", header="true")
!hdfs dfs -ls /user/felipe/desafio_spark_data_raw/elastic/obitos_confirmados/

logger.info("A visualização view_obitos_confirmados foi criada em CSV com sucesso!")

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-21 15:25 /user/felipe/desafio_spark_data_raw/elastic/obitos_confirmados/_SUCCESS
-rw-r--r--   2 root supergroup         52 2022-04-21 15:25 /user/felipe/desafio_spark_data_raw/elastic/obitos_confirmados/part-00000-f887e8c9-8717-44a9-aef0-61d47cc0113b-c000.csv


INFO:LOGGER:A visualização view_obitos_confirmados foi criada em CSV com sucesso!


In [38]:
!hdfs dfs -get /user/felipe/desafio_spark_data_raw/elastic/obitos_confirmados/part-00000-f887e8c9-8717-44a9-aef0-61d47cc0113b-c000.csv /mnt/notebooks/data/

logger.info("CSV disponível em /mnt/notebooks/data/")

INFO:LOGGER:CSV disponível em /mnt/notebooks/data/


#### 9. Criar um dashboard no Elastic para visualização dos novos dados enviados

#### /EVIDÊNCIA_1_IMPORT_CSV
![](https://raw.githubusercontent.com/chagasfelipe/semantix_spark_covid_challenge/main/img/elastic_evidencia_import.PNG)

#### /EVIDÊNCIA_2_CRIACAO_INDICE
![](https://raw.githubusercontent.com/chagasfelipe/semantix_spark_covid_challenge/main/img/elastic_evidencia_criacao_indice.PNG)

#### /EVIDÊNCIA_3_DASHBOARD
![](https://raw.githubusercontent.com/chagasfelipe/semantix_spark_covid_challenge/img/files/elastic_evidencia_dash.PNG)