## BIG DATA ENGINEER SEMANTIX
### Projeto Final - Corona Vírus no Brasil
Fonte dos Dados: https://mobileapps.saude.gov.br/esusvepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

- Utilização do SPARK, HDFS, Hive, Python

### EXERCÍCIOS
### 01 - Dados enviados para o HDFS

In [2]:
# listando os arquivos no HDFS

!hdfs dfs -ls /user/feliciani/covid

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2021-07-15 23:46 /user/feliciani/covid/COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2021-07-15 23:48 /user/feliciani/covid/COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2021-07-15 23:51 /user/feliciani/covid/COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2021-07-15 23:51 /user/feliciani/covid/COVIDBR_2021_Parte2_06jul2021.csv


In [3]:
spark

In [4]:
sc

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import *

In [3]:
# setando os logs para INFO
spark.sparkContext.setLogLevel("INFO")

In [29]:
# Lendo os 4 arquivos CSV, com separador e cabeçalho
covid_df = spark.read.csv("/user/feliciani/covid/", sep=";", header="true")

In [30]:
# Trabalhando com DataFrame
type(covid_df)

pyspark.sql.dataframe.DataFrame

In [31]:
covid_df.show(10)

+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|      data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-26|        9|       2101471

In [32]:
# Schema do DataFrame
covid_df.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: string (nullable = true)
 |-- codmun: string (nullable = true)
 |-- codRegiaoSaude: string (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: string (nullable = true)
 |-- semanaEpi: string (nullable = true)
 |-- populacaoTCU2019: string (nullable = true)
 |-- casosAcumulado: string (nullable = true)
 |-- casosNovos: string (nullable = true)
 |-- obitosAcumulado: string (nullable = true)
 |-- obitosNovos: string (nullable = true)
 |-- Recuperadosnovos: string (nullable = true)
 |-- emAcompanhamentoNovos: string (nullable = true)
 |-- interior/metropolitana: string (nullable = true)



In [33]:
# Total de linhas do DataFrame
covid_df.count()

2624943

In [34]:
# listando as bases de dados
spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='hdfs://namenode:8020/user/hive/warehouse')]

In [35]:
# local para armazenar os dados
!hdfs dfs -ls /user/hive/warehouse/

Found 1 items
drwxr-xr-x   - root supergroup          0 2021-07-09 17:50 /user/hive/warehouse/juros


### 02 - Salvando os dados do DataFrame numa Tabela Hive com compressão e organizados em partições por municípios

In [36]:
covid_df.write.parquet("/user/hive/warehouse/covid_df/", mode="overwrite")

In [37]:
# Foi salvo no HDFS todo conteúdo dos arquivos CSV
!hdfs dfs -ls /user/hive/warehouse/covid_df/

Found 5 items
-rw-r--r--   2 root supergroup          0 2021-07-16 14:08 /user/hive/warehouse/covid_df/_SUCCESS
-rw-r--r--   2 root supergroup    1397966 2021-07-16 14:08 /user/hive/warehouse/covid_df/part-00000-e23dd73b-6ebe-48d0-89ec-199a669a4fda-c000.snappy.parquet
-rw-r--r--   2 root supergroup    2212384 2021-07-16 14:08 /user/hive/warehouse/covid_df/part-00001-e23dd73b-6ebe-48d0-89ec-199a669a4fda-c000.snappy.parquet
-rw-r--r--   2 root supergroup    2762691 2021-07-16 14:08 /user/hive/warehouse/covid_df/part-00002-e23dd73b-6ebe-48d0-89ec-199a669a4fda-c000.snappy.parquet
-rw-r--r--   2 root supergroup    2176695 2021-07-16 14:08 /user/hive/warehouse/covid_df/part-00003-e23dd73b-6ebe-48d0-89ec-199a669a4fda-c000.snappy.parquet


In [39]:
# criando um dataFrame com as colunas necessárias
covid_df.createOrReplaceTempView("covid_table")

In [41]:
covid_brasil_df = spark.sql("select regiao, data, populacaoTCU2019, casosAcumulado, casosNovos, obitosAcumulado, obitosNovos, Recuperadosnovos, emAcompanhamentoNovos from covid_table")

In [42]:
type(covid_brasil_df)

pyspark.sql.dataframe.DataFrame

In [43]:
covid_brasil_df.show(10)

+------+----------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+
|regiao|      data|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|
+------+----------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+
|Brasil|2020-02-25|       210147125|             0|         0|              0|          0|            null|                 null|
|Brasil|2020-02-26|       210147125|             1|         1|              0|          0|            null|                 null|
|Brasil|2020-02-27|       210147125|             1|         0|              0|          0|            null|                 null|
|Brasil|2020-02-28|       210147125|             1|         0|              0|          0|            null|                 null|
|Brasil|2020-02-29|       210147125|             2|         1|              0|          0|

In [44]:
# os dados estão como string
covid_brasil_df.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- data: string (nullable = true)
 |-- populacaoTCU2019: string (nullable = true)
 |-- casosAcumulado: string (nullable = true)
 |-- casosNovos: string (nullable = true)
 |-- obitosAcumulado: string (nullable = true)
 |-- obitosNovos: string (nullable = true)
 |-- Recuperadosnovos: string (nullable = true)
 |-- emAcompanhamentoNovos: string (nullable = true)



In [46]:
# ajustando a tipagem dos dados
# realizado cálculo para descobrir a incidência, mortalidade e letalidade

total_covid_brasil_df = covid_brasil_df.withColumn("populacaoTCU2019",col("populacaoTCU2019").cast(LongType())) \
    .withColumn("data",col("data").cast(DateType())) \
    .withColumn("casosAcumulado",col("casosAcumulado").cast(LongType())) \
    .withColumn("casosNovos",col("casosNovos").cast(LongType())) \
    .withColumn("obitosAcumulado",col("obitosAcumulado").cast(LongType())) \
    .withColumn("Recuperadosnovos",col("Recuperadosnovos").cast(LongType())) \
    .withColumn("obitosNovos",col("obitosNovos").cast(LongType())) \
    .withColumn("emAcompanhamentoNovos",col("emAcompanhamentoNovos").cast(LongType())) \
    .withColumn("incidencia", format_number((col("casosAcumulado") / col("populacaoTCU2019") * 100000),1)) \
    .withColumn("mortalidade", format_number((col("obitosAcumulado") / col("populacaoTCU2019") * 100000),1).cast(FloatType())) \
    .withColumn("letalidade", format_number((col("obitosAcumulado") / col("casosAcumulado") * 100),1).cast(FloatType())) \
    .withColumn('incidencia', regexp_replace(col("incidencia"),",",""))


In [48]:
# Ajustando o tipo de dado da coluna Incidência
total_covid_df = total_covid_brasil_df.withColumn("incidencia", total_covid_brasil_df["incidencia"].cast(FloatType()))

In [49]:
total_covid_df.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- data: date (nullable = true)
 |-- populacaoTCU2019: long (nullable = true)
 |-- casosAcumulado: long (nullable = true)
 |-- casosNovos: long (nullable = true)
 |-- obitosAcumulado: long (nullable = true)
 |-- obitosNovos: long (nullable = true)
 |-- Recuperadosnovos: long (nullable = true)
 |-- emAcompanhamentoNovos: long (nullable = true)
 |-- incidencia: float (nullable = true)
 |-- mortalidade: float (nullable = true)
 |-- letalidade: float (nullable = true)



In [50]:
total_covid_df.show(10)

+------+----------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------+-----------+----------+
|regiao|      data|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|incidencia|mortalidade|letalidade|
+------+----------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------+-----------+----------+
|Brasil|2020-02-25|       210147125|             0|         0|              0|          0|            null|                 null|       0.0|        0.0|      null|
|Brasil|2020-02-26|       210147125|             1|         1|              0|          0|            null|                 null|       0.0|        0.0|       0.0|
|Brasil|2020-02-27|       210147125|             1|         0|              0|          0|            null|                 null|       0.0|        0.0|       0.0|
|Brasil|2020-02-

In [51]:
# salvando no HDFS
total_covid_df.write.saveAsTable("covid_total", mode="overwrite")

In [52]:
!hdfs dfs -ls /user/hive/warehouse/

Found 3 items
drwxr-xr-x   - root supergroup          0 2021-07-16 14:08 /user/hive/warehouse/covid_df
drwxr-xr-x   - root supergroup          0 2021-07-16 14:48 /user/hive/warehouse/covid_total
drwxr-xr-x   - root supergroup          0 2021-07-09 17:50 /user/hive/warehouse/juros


In [53]:
!hdfs dfs -ls /user/hive/warehouse/covid_total

Found 5 items
-rw-r--r--   2 root supergroup          0 2021-07-16 14:48 /user/hive/warehouse/covid_total/_SUCCESS
-rw-r--r--   2 root supergroup    2361578 2021-07-16 14:48 /user/hive/warehouse/covid_total/part-00000-7a858c73-b143-4b9b-8757-1395a0aee288-c000.snappy.parquet
-rw-r--r--   2 root supergroup    3842760 2021-07-16 14:48 /user/hive/warehouse/covid_total/part-00001-7a858c73-b143-4b9b-8757-1395a0aee288-c000.snappy.parquet
-rw-r--r--   2 root supergroup    4998638 2021-07-16 14:48 /user/hive/warehouse/covid_total/part-00002-7a858c73-b143-4b9b-8757-1395a0aee288-c000.snappy.parquet
-rw-r--r--   2 root supergroup    3857992 2021-07-16 14:48 /user/hive/warehouse/covid_total/part-00003-7a858c73-b143-4b9b-8757-1395a0aee288-c000.snappy.parquet


### 03 - Visualização dos dados principais

In [54]:
# selecionando o dia 06/07/2021 que contém a última atualização com os totais dos dados

total_max_covid = total_covid_df.where(col("data") == "2021-07-06").limit(1)

total_max_covid.show()

+------+----------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------+-----------+----------+
|regiao|      data|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|incidencia|mortalidade|letalidade|
+------+----------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------+-----------+----------+
|Brasil|2021-07-06|       210147125|      18855015|     62504|         526892|       1780|        17262646|              1065477|    8972.3|      250.7|       2.8|
+------+----------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------+-----------+----------+



### 04 - Salvando o Total de Casos Recuperados e Em Acompanhamento numa Tabela Hive

In [55]:
recuperados_acompanhamento = total_max_covid.select(col("Recuperadosnovos").alias("Recuperados"), col("emAcompanhamentoNovos").alias("Acompanhamento"))

In [56]:
recuperados_acompanhamento.printSchema()

recuperados_acompanhamento.show()

root
 |-- Recuperados: long (nullable = true)
 |-- Acompanhamento: long (nullable = true)

+-----------+--------------+
|Recuperados|Acompanhamento|
+-----------+--------------+
|   17262646|       1065477|
+-----------+--------------+



In [57]:
# salvando da forma padrão do Hive
recuperados_acompanhamento.write.saveAsTable("covid_recup_acomp", mode="overwrite")

In [58]:
# diretórios do HDFS
!hdfs dfs -ls /user/hive/warehouse/

Found 4 items
drwxr-xr-x   - root supergroup          0 2021-07-16 14:08 /user/hive/warehouse/covid_df
drwxr-xr-x   - root supergroup          0 2021-07-16 15:40 /user/hive/warehouse/covid_recup_acomp
drwxr-xr-x   - root supergroup          0 2021-07-16 14:48 /user/hive/warehouse/covid_total
drwxr-xr-x   - root supergroup          0 2021-07-09 17:50 /user/hive/warehouse/juros


In [59]:
# diretório criado com arquivo parquet e compressão snappy
!hdfs dfs -ls /user/hive/warehouse/covid_recup_acomp/

Found 2 items
-rw-r--r--   2 root supergroup          0 2021-07-16 15:40 /user/hive/warehouse/covid_recup_acomp/_SUCCESS
-rw-r--r--   2 root supergroup        724 2021-07-16 15:40 /user/hive/warehouse/covid_recup_acomp/part-00000-907b05b0-cbb1-413b-aa3e-77aa1aca0857-c000.snappy.parquet


### 05 - Salvando o Total Geral de Casos Confirmados, Novos Casos e Incidência de Casos por 100 mil habitantes no formato parquet e compressão snappy

In [60]:
casos_confirmados = total_max_covid.select(col("casosAcumulado").alias("Total"), col("casosNovos").alias("Novos"), col("incidencia").alias("Incidência"))

In [61]:
casos_confirmados.show()

+--------+-----+----------+
|   Total|Novos|Incidência|
+--------+-----+----------+
|18855015|62504|    8972.3|
+--------+-----+----------+



In [62]:
casos_confirmados.printSchema()

root
 |-- Total: long (nullable = true)
 |-- Novos: long (nullable = true)
 |-- Incidência: float (nullable = true)



In [63]:
# salvando no formato parquet com compressão snappy
casos_confirmados.write.parquet("/user/hive/warehouse/casos_confirmados", mode="overwrite")

In [64]:
# diretórios do HDFS
!hdfs dfs -ls /user/hive/warehouse/

Found 5 items
drwxr-xr-x   - root supergroup          0 2021-07-16 16:05 /user/hive/warehouse/casos_confirmados
drwxr-xr-x   - root supergroup          0 2021-07-16 14:08 /user/hive/warehouse/covid_df
drwxr-xr-x   - root supergroup          0 2021-07-16 15:40 /user/hive/warehouse/covid_recup_acomp
drwxr-xr-x   - root supergroup          0 2021-07-16 14:48 /user/hive/warehouse/covid_total
drwxr-xr-x   - root supergroup          0 2021-07-09 17:50 /user/hive/warehouse/juros


In [9]:
obitos = spark.read.parquet("/user/hive/warehouse/obitos")
obitos.show(10)

+------------+------------+----------+-----------+
|Total_Óbitos|Novos_Óbitos|Letalidade|Mortalidade|
+------------+------------+----------+-----------+
|      526892|        1780|       2.8|      250.7|
+------------+------------+----------+-----------+

