### Arquivos no hdfs

In [5]:
# Importando as bibliotecas.
from pyspark.sql.functions import *

In [6]:
from pyspark.sql.types import *

In [3]:
# Verificando as pastas dentro do hdfs.
!hdfs dfs -ls /user

Found 2 items
drwxr-xr-x   - root supergroup          0 2022-12-03 17:45 /user/hive
drwxr-xr-x   - root supergroup          0 2022-12-03 18:05 /user/jessica


In [None]:
# Criação da pasta para envio dos arquivos
!hdfs dfs -mkdir /user/jessica

In [None]:
!hdfs dfs -mkdir /user/jessica/dados_covid

**Observação:** O envio dos arquivos foram feitos pelo shell do Spark no terminal. O comando utilizado foi:
```bash
hdfs dfs -put /input/*.csv /user/jessica/dados_covid
```

In [4]:
# Verificando o envio dos dados paro o hdfs.
!hdfs dfs -ls /user/jessica/dados_covid

Found 5 items
-rw-r--r--   3 root supergroup   62492959 2022-12-03 18:07 /user/jessica/dados_covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-12-03 18:07 /user/jessica/dados_covid/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-12-03 18:08 /user/jessica/dados_covid/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-12-03 18:08 /user/jessica/dados_covid/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv
drwxr-xr-x   - root supergroup          0 2022-12-03 18:36 /user/jessica/dados_covid/casos_confirmados


### Trabalhando com a base de dados

In [7]:
# Leitura dos quatro arquivos de dados como um dataframe.
df_covid = spark.read.csv("/user/jessica/dados_covid/*.csv", sep = ";", header = True, inferSchema = True)

In [6]:
# Visualização dos dados.
df_covid.show(10)

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25 00:00:00|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null

In [7]:
# Visualização do Schema.
df_covid.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



###  Otimizar todos os dados do hdfs para uma tabela Hive particionada por município


In [None]:
df_covid.write.partitionBy('municipio').saveAsTable('jessica.dados_covid')

## 3. Criar as 3 visualizações pelo Spark com os dados enviados para o HDFS

### Visualização 1 - Casos Recuperados

In [8]:
# Como temos dentro do dataframe registros que apresentam o resultado geral do Brasil, vamos pegar este registro para encontrar
# todos os casos recuperados.
# Pegando o registro referente a última data.
df_covid.agg({'data': 'max'}).show()

+-------------------+
|          max(data)|
+-------------------+
|2021-07-06 00:00:00|
+-------------------+



In [9]:
# Atribuindo a data a uma variável.
last_date = df_covid.agg({'data': 'max'}).head()[0]
print(last_date)

2021-07-06 00:00:00


In [10]:
type(last_date)

datetime.datetime

In [10]:
# Filtrando o dataframe com a última data para todo o Brasil.
df_covid_filter_date = df_covid.where((df_covid['regiao'] == 'Brasil') & (df_covid['data'] == last_date))

In [12]:
df_covid_filter_date.show()

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2021-07-06 00:00:00|       27|       210147125|      18855015|     62504|         526892|       1780|        17262646|              1065477|                  null|
+------+------+---------+-----+------+--------------+---------------

In [13]:
# Selecionando apenas as colunas importantes para essa visualização.
df_covid_recovered = df_covid_filter_date['regiao', 'Recuperadosnovos', 'emAcompanhamentoNovos']
df_covid_recovered.show()

+------+----------------+---------------------+
|regiao|Recuperadosnovos|emAcompanhamentoNovos|
+------+----------------+---------------------+
|Brasil|        17262646|              1065477|
+------+----------------+---------------------+



In [14]:
# Renomeando as colunas conforme a visualização.
df_covid_recovered = df_covid_recovered.withColumnRenamed("regiao","Regiao").\
withColumnRenamed("Recuperadosnovos","Recuperados_Covid").withColumnRenamed("emAcompanhamentoNovos","Em_Acompanhamento_Covid")

In [15]:
# Apresentação do primeira visualização de casos recuperados e em acompanhamento.
df_covid_recovered.show()

+------+-----------------+-----------------------+
|Regiao|Recuperados_Covid|Em_Acompanhamento_Covid|
+------+-----------------+-----------------------+
|Brasil|         17262646|                1065477|
+------+-----------------+-----------------------+



### 4. Salvar a primeira visualização como tabela Hive

In [23]:
# Salvando a visualização como tabela hive.
df_covid_recovered.write.saveAsTable('visualizacao_recuperados')

In [16]:
# Confirmando que o arquivo está salvo no hdfs.
!hdfs dfs -ls /user/hive/warehouse/visualizacao_recuperados

Found 3 items
-rw-r--r--   2 root supergroup          0 2022-12-03 18:10 /user/hive/warehouse/visualizacao_recuperados/_SUCCESS
-rw-r--r--   2 root supergroup        493 2022-12-03 18:10 /user/hive/warehouse/visualizacao_recuperados/part-00000-eaa09b15-678d-4804-80e5-95ead80aa4f5-c000.snappy.parquet
-rw-r--r--   2 root supergroup        932 2022-12-03 18:10 /user/hive/warehouse/visualizacao_recuperados/part-00007-eaa09b15-678d-4804-80e5-95ead80aa4f5-c000.snappy.parquet


### Visualização 2 - Casos Confirmados

Essa visualização apresentará os casos confirmados acumulado, casos novos e a incidência até a última data dos dados. 
Na tabela não possui uma coluna com a taxa de incidência. Dessa forma, o calculo dessa taxa é realizado considerando os novos casos de covid em um determinado período dividido pela população em risco, ou seja, a população brasileira menos o número de casos confirmados.


In [17]:
# Dataframe filtrado apenas com o registro da última data para a região do Brasil.
df_covid_filter_date.show()

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2021-07-06 00:00:00|       27|       210147125|      18855015|     62504|         526892|       1780|        17262646|              1065477|                  null|
+------+------+---------+-----+------+--------------+---------------

In [18]:
# Criando a coluna Incidência no dataframe.
df_covid_confirmed_incidence = df_covid_filter_date.withColumn('Incidencia', (df_covid_filter_date\
['casosAcumulado'] / (df_covid_filter_date['populacaoTCU2019'] - df_covid_filter_date['casosAcumulado'])*100000))

In [19]:
# Estabelecendo o número de casas decimais na coluna nova Incidência.
df_covid_confirmed_incidence = df_covid_confirmed_incidence.withColumn('Incidencia', \
format_number(col("Incidencia").cast(FloatType()),1))

In [20]:
df_covid_confirmed_incidence.show()

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+----------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|Incidencia|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+----------+
|Brasil|  null|     null|   76|  null|          null|           null|2021-07-06 00:00:00|       27|       210147125|      18855015|     62504|         526892|       1780|        17262646|              1065477|                  null|   9,856.7|
+------+------+---------

In [21]:
# Criando um dataframe apenas com as informações das colunas importantes para essa visualização.
df_covid_confirmed_incidence = df_covid_confirmed_incidence["regiao","casosAcumulado", "casosNovos", "Incidencia"]
df_covid_confirmed_incidence.show()

+------+--------------+----------+----------+
|regiao|casosAcumulado|casosNovos|Incidencia|
+------+--------------+----------+----------+
|Brasil|      18855015|     62504|   9,856.7|
+------+--------------+----------+----------+



In [22]:
# Renomeando as colunas.
df_covid_confirmed_incidence = df_covid_confirmed_incidence.withColumnRenamed("regiao","Regiao").\
withColumnRenamed("casosAcumulado","Casos_Acumulado").withColumnRenamed("casosNovos","Casos_Novos")

In [23]:
# Apresentação da segunda visualização de casos confirmados.
df_covid_confirmed_incidence.show()

+------+---------------+-----------+----------+
|Regiao|Casos_Acumulado|Casos_Novos|Incidencia|
+------+---------------+-----------+----------+
|Brasil|       18855015|      62504|   9,856.7|
+------+---------------+-----------+----------+



### 5. Salvar a segunda visualização com formato parquet e compressão snappy

In [None]:
# Por padrão já está com o formato parquet e com compressão snappy. Dessa forma, foi aplicado diretamente o método save().
df_covid_confirmed_incidence.write.save('/user/jessica/dados_covid/casos_confirmados')

In [24]:
# Confirmando que a visualização foi salva.
!hdfs dfs -ls /user/jessica/dados_covid/casos_confirmados

Found 3 items
-rw-r--r--   2 root supergroup          0 2022-12-03 18:36 /user/jessica/dados_covid/casos_confirmados/_SUCCESS
-rw-r--r--   2 root supergroup        567 2022-12-03 18:36 /user/jessica/dados_covid/casos_confirmados/part-00000-1878708b-0b74-4ddc-885a-fb5e40be998d-c000.snappy.parquet
-rw-r--r--   2 root supergroup       1194 2022-12-03 18:35 /user/jessica/dados_covid/casos_confirmados/part-00007-1878708b-0b74-4ddc-885a-fb5e40be998d-c000.snappy.parquet


### Visualização 3 - Óbitos Confirmados

Essa visualização apresentará os óbitos confirmados com as seguintes colunas, óbitos acumulados, casos novos, letalidade e mortalidade.
- **Letalidade** -> total de óbito acumulado durante determinado período dividido pelo total de casos acumulado no período
- **Mortalidade** -> total de óbito acumulado durante determinado período dividido pelo população e multiplicado por 1.000.000

In [25]:
# Dataframe filtrado apenas com o registro da última data para a região do Brasil
df_covid_filter_date.show()

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2021-07-06 00:00:00|       27|       210147125|      18855015|     62504|         526892|       1780|        17262646|              1065477|                  null|
+------+------+---------+-----+------+--------------+---------------

In [26]:
# Criação da nova coluna de letalidade.
df_covid_lethality = df_covid_filter_date.withColumn('Letalidade',\
(df_covid_filter_date.obitosAcumulado / df_covid_filter_date.casosAcumulado)*100)

In [27]:
# Criação da nova coluna de mortalidade.
df_covid_lethality = df_covid_lethality.withColumn('Mortalidade',\
(df_covid_filter_date.obitosAcumulado / df_covid_filter_date.populacaoTCU2019)*100000)

In [28]:
# Criando um dataframe apenas com as informações das colunas importantes para essa visualização.
df_covid_lethality = df_covid_lethality["regiao","obitosAcumulado", "obitosNovos", "Letalidade", "Mortalidade" ]

In [29]:
# Renomeando as colunas.
df_covid_lethality = df_covid_lethality.withColumnRenamed("regiao","Regiao").\
withColumnRenamed("obitosAcumulado","Obitos_Acumulado").withColumnRenamed("obitosNovos","Obitos_Novos")

In [30]:
# Estabelecendo o número de casas decimais na coluna nova letalidade.
df_covid_lethality = df_covid_lethality.withColumn('Letalidade',\
format_number(col("Letalidade").cast(FloatType()),1))

In [31]:
# Estabelecendo o número de casas decimais na coluna nova mortalidade.
df_covid_lethality = df_covid_lethality.withColumn('Mortalidade',\
format_number(col("Mortalidade").cast(FloatType()),1))

In [32]:
# Apresentação da terceira visualização de casos confirmados.
df_covid_lethality.show()

+------+----------------+------------+----------+-----------+
|Regiao|Obitos_Acumulado|Obitos_Novos|Letalidade|Mortalidade|
+------+----------------+------------+----------+-----------+
|Brasil|          526892|        1780|       2.8|      250.7|
+------+----------------+------------+----------+-----------+



### 6. Salvar a terceira visualização em um tópico do Kafka

Criação de um topico pelo terminal no container do Kafka:
```bash
kafka-topics.sh --bootstrap-server kafka:9092 --topic obitos-covid --create --partitions 2 --replication-factor 1
```

In [33]:
df_covid_lethality.selectExpr("to_json(struct(*)) AS value").write.format('kafka')\
.option('kafka.bootstrap.servers', 'kafka:9092').option('topic', 'obitos-covid').save()

### 7. Criar a visualização pelo Spark com os dados enviados para o HDFS

In [34]:
# Dataframe filtrado apenas com o registro da última data para a região do Brasil.
df_covid.show(5)

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25 00:00:00|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null

In [11]:
# Os dados apresentam um registo com as informações gerais de determinada região, dessa forma, vamos utilizar esse registro.
# Agrupando o dataframe pela região e pelos valores máximos das colunas.
df_covid_region = df_covid.groupBy('regiao').agg({\
'casosAcumulado':'max', 'obitosAcumulado': 'max', 'populacaoTCU2019': 'max','data': 'max'})

In [12]:
df_covid_region.show(5)

+------------+---------------------+-------------------+-------------------+--------------------+
|      regiao|max(populacaoTCU2019)|max(casosAcumulado)|          max(data)|max(obitosAcumulado)|
+------------+---------------------+-------------------+-------------------+--------------------+
|    Nordeste|             14873064|            1141612|2021-07-06 00:00:00|               24428|
|         Sul|             11433957|            1308643|2021-07-06 00:00:00|               31867|
|     Sudeste|             45919049|            3809222|2021-07-06 00:00:00|              130389|
|Centro-Oeste|              7018354|             686433|2021-07-06 00:00:00|               19485|
|      Brasil|            210147125|           18855015|2021-07-06 00:00:00|              526892|
+------------+---------------------+-------------------+-------------------+--------------------+
only showing top 5 rows



In [13]:
# Criação da nova coluna de mortalidade.
df_covid_region_mortality = df_covid_region.withColumn('Mortalidade',\
(df_covid_region["max(obitosAcumulado)"] / df_covid_region["max(populacaoTCU2019)"])*100000)

In [14]:
df_covid_region_mortality.show(5)

+------------+---------------------+-------------------+-------------------+--------------------+------------------+
|      regiao|max(populacaoTCU2019)|max(casosAcumulado)|          max(data)|max(obitosAcumulado)|       Mortalidade|
+------------+---------------------+-------------------+-------------------+--------------------+------------------+
|    Nordeste|             14873064|            1141612|2021-07-06 00:00:00|               24428|164.24322520228515|
|         Sul|             11433957|            1308643|2021-07-06 00:00:00|               31867|278.70491379318634|
|     Sudeste|             45919049|            3809222|2021-07-06 00:00:00|              130389| 283.9540514003241|
|Centro-Oeste|              7018354|             686433|2021-07-06 00:00:00|               19485|277.62919909711025|
|      Brasil|            210147125|           18855015|2021-07-06 00:00:00|              526892|250.72529543290204|
+------------+---------------------+-------------------+--------

In [15]:
# Criação da nova coluna de incidência.
df_covid_region_mortality_incidence = df_covid_region_mortality.withColumn('Incidencia', (df_covid_region_mortality\
['max(casosAcumulado)'] / (df_covid_region_mortality['max(populacaoTCU2019)'] -\
                           df_covid_region_mortality['max(casosAcumulado)'])*100000))

In [16]:
df_covid_region_mortality_incidence.show(5)

+------------+---------------------+-------------------+-------------------+--------------------+------------------+-----------------+
|      regiao|max(populacaoTCU2019)|max(casosAcumulado)|          max(data)|max(obitosAcumulado)|       Mortalidade|       Incidencia|
+------------+---------------------+-------------------+-------------------+--------------------+------------------+-----------------+
|    Nordeste|             14873064|            1141612|2021-07-06 00:00:00|               24428|164.24322520228515| 8313.84765400000|
|         Sul|             11433957|            1308643|2021-07-06 00:00:00|               31867|278.70491379318634|12924.46831800000|
|     Sudeste|             45919049|            3809222|2021-07-06 00:00:00|              130389| 283.9540514003241| 9045.92175100000|
|Centro-Oeste|              7018354|             686433|2021-07-06 00:00:00|               19485|277.62919909711025|10840.83329500000|
|      Brasil|            210147125|           18855015

In [17]:
# Estabelecendo o número de casas decimais na coluna nova mortalidade.
df_covid_region_mortality_incidence = df_covid_region_mortality_incidence.withColumn('Mortalidade',\
format_number(col("Mortalidade").cast(FloatType()),1))

In [18]:
# Estabelecendo o número de casas decimais na coluna nova incidência.
df_covid_region_mortality_incidence = df_covid_region_mortality_incidence.withColumn('Incidencia',\
format_number(col("Incidencia").cast(FloatType()),1))

In [19]:
df_covid_region_mortality_incidence.show(5)

+------------+---------------------+-------------------+-------------------+--------------------+-----------+----------+
|      regiao|max(populacaoTCU2019)|max(casosAcumulado)|          max(data)|max(obitosAcumulado)|Mortalidade|Incidencia|
+------------+---------------------+-------------------+-------------------+--------------------+-----------+----------+
|    Nordeste|             14873064|            1141612|2021-07-06 00:00:00|               24428|      164.2|   8,313.8|
|         Sul|             11433957|            1308643|2021-07-06 00:00:00|               31867|      278.7|  12,924.5|
|     Sudeste|             45919049|            3809222|2021-07-06 00:00:00|              130389|      284.0|   9,045.9|
|Centro-Oeste|              7018354|             686433|2021-07-06 00:00:00|               19485|      277.6|  10,840.8|
|      Brasil|            210147125|           18855015|2021-07-06 00:00:00|              526892|      250.7|   9,856.7|
+------------+------------------

In [20]:
# Renomeando as colunas.
df_covid_region_mortality_incidence = df_covid_region_mortality_incidence.withColumnRenamed("regiao","Regiao").\
withColumnRenamed("max(populacaoTCU2019)","Populacao").withColumnRenamed("max(casosAcumulado)","Casos").\
withColumnRenamed("max(data)","Data").withColumnRenamed("max(obitosAcumulado)","Obitos")

In [21]:
# Apresentação da visualização.
df_covid_region_mortality_incidence.show(5)

+------------+---------+--------+-------------------+------+-----------+----------+
|      Regiao|Populacao|   Casos|               Data|Obitos|Mortalidade|Incidencia|
+------------+---------+--------+-------------------+------+-----------+----------+
|    Nordeste| 14873064| 1141612|2021-07-06 00:00:00| 24428|      164.2|   8,313.8|
|         Sul| 11433957| 1308643|2021-07-06 00:00:00| 31867|      278.7|  12,924.5|
|     Sudeste| 45919049| 3809222|2021-07-06 00:00:00|130389|      284.0|   9,045.9|
|Centro-Oeste|  7018354|  686433|2021-07-06 00:00:00| 19485|      277.6|  10,840.8|
|      Brasil|210147125|18855015|2021-07-06 00:00:00|526892|      250.7|   9,856.7|
+------------+---------+--------+-------------------+------+-----------+----------+
only showing top 5 rows

