# Projeto Final Spark - Semantix Academy

Autor: [Eric Cunha](https://linkedin.com/in/ericscunha)

### Nível Básico

In [28]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

### 1. Enviar os dados para o hdfs

##### 1.1 Criando estrutura de diretórios do projeto

In [1]:
!hdfs dfs -mkdir -p /user/eric/projeto_spark/data

##### 1.2 Copiando os arquivos do namenode para o hdfs

In [7]:
# hdfs dfs -put /input/covid19/* /user/eric/projeto_spark/data 

In [9]:
# Verificando se os dados foram carregados corretamente
!hdfs dfs -ls /user/eric/projeto_spark/data

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2022-08-04 19:23 /user/eric/projeto_spark/data/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-08-04 19:23 /user/eric/projeto_spark/data/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-08-04 19:23 /user/eric/projeto_spark/data/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-08-04 19:23 /user/eric/projeto_spark/data/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [7]:
# Carregandos os dados
df_covid = spark.read.csv("/user/eric/projeto_spark/data/*.csv", sep=";", header=True, inferSchema=True)
df_covid.show(5)

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25 00:00:00|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null

In [8]:
# Verificando a estrutura dos dados
df_covid.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [85]:
# Visualizando apenas campos chaves
df_covid.select('data',
                'regiao', 
                'estado',
                'municipio',
                'populacaoTCU2019',
                'casosAcumulado',
                'casosNovos',
                'obitosNovos', 
                'obitosAcumulado', 
                'Recuperadosnovos', 
                'emAcompanhamentoNovos')\
        .sort(col('data').asc()).show(5)

+-------------------+------+------+---------+----------------+--------------+----------+-----------+---------------+----------------+---------------------+
|               data|regiao|estado|municipio|populacaoTCU2019|casosAcumulado|casosNovos|obitosNovos|obitosAcumulado|Recuperadosnovos|emAcompanhamentoNovos|
+-------------------+------+------+---------+----------------+--------------+----------+-----------+---------------+----------------+---------------------+
|2020-02-25 00:00:00| Norte|    AM|     null|         4144597|             0|         0|          0|              0|            null|                 null|
|2020-02-25 00:00:00| Norte|    PA|     null|         8602865|             0|         0|          0|              0|            null|                 null|
|2020-02-25 00:00:00|Brasil|  null|     null|       210147125|             0|         0|          0|              0|            null|                 null|
|2020-02-25 00:00:00| Norte|    AC|     null|          881935|  

##### [X] Atribui zero aos campos Nulo

In [86]:
df_notnull = df_covid.na.fill({'estado': 0, 'municipio': 0, 'Recuperadosnovos': 0, 'emAcompanhamentoNovos': 0})

##### [X] Tratar campo data e campo casosAcumulado para integer 

In [87]:
df_covid_final = df_notnull.withColumn("data", to_date("data", "dd/MM/yyyy"))\
                            .withColumn("casosAcumulado", col("casosAcumulado").cast(IntegerType()))

### 2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município.

In [4]:
# verifica os databases existentes
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [5]:
# cria o database projetofinal
spark.sql("create database projetofinal")

DataFrame[]

In [88]:
df_covid_final.write.mode("overwrite").partitionBy("municipio").saveAsTable("projetofinal.covid")

### 3. Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:

In [89]:
# Carregando os dados da tabela Hive "covid"
table_covid = spark.read.table("projetofinal.covid")

##### [X] Visualização de Casos Recuperados e em Acompanhamento

In [141]:
painel_recuperados_acompanhamento = table_covid.filter("regiao='Brasil'")\
                                               .select("Recuperadosnovos", "emAcompanhamentoNovos")\
                                               .agg(max("Recuperadosnovos").alias("Recuperadosnovos"),\
                                                    max("emAcompanhamentoNovos").alias("emAcompanhamentoNovos"))

painel_recuperados_acompanhamento.show()

+----------------+---------------------+
|Recuperadosnovos|emAcompanhamentoNovos|
+----------------+---------------------+
|        17262646|              1317658|
+----------------+---------------------+



##### [X] Visualização de Casos Confirmados

In [91]:
# função calcula a incidencia por habitantes
def calcula_incidencia(numero_casos, populacao, taxa_habitantes):
    return format_number(((numero_casos * taxa_habitantes)/populacao), 2)

In [134]:
# Identifica o número de casos novos pela data mais recente
df_casos_novos = table_covid.select("casosNovos").filter("regiao='Brasil'").sort(col("data").desc())
casos_novos = df_casos_novos.first()

In [135]:
casos_novos("casosNovos")

Row(62504='casosNovos')

In [154]:
# painel de casos confirmados com calculo de incidência por 100 mil/hab
painel_casos_confirmados = table_covid.filter("regiao='Brasil'")\
                                .select("casosAcumulado","populacaoTCU2019")\
                                .agg(max("casosAcumulado").alias("casosAcumulado"),\
                                     max("populacaoTCU2019").alias("populacaoTCU2019"))\
                                .withColumn("casosNovos", lit(casos_novos[0]))\
                                .withColumn("incidencia",
                                           calcula_incidencia(col("casosAcumulado").cast(FloatType()),
                                                             col("populacaoTCU2019").cast(FloatType()),
                                                             100000))

In [155]:
painel_casos_confirmados = painel_casos_confirmados.select("casosAcumulado", "casosNovos", "Incidencia")
painel_casos_confirmados.show()

+--------------+----------+----------+
|casosAcumulado|casosNovos|Incidencia|
+--------------+----------+----------+
|      18855015|     62504|  8,972.29|
+--------------+----------+----------+



##### [X] Visualização de Óbitos Confirmados

In [142]:
# função que calcula a letalidade
def calcula_letalidade(numero_casos, numero_obitos):
    return format_number(((numero_obitos * 100)/numero_casos), 2)

In [143]:
# função que calcula a mortalidade por habitantes
def calcula_mortalidade(numero_obitos, populacao, taxa_habitantes):
    return format_number(((numero_obitos * taxa_habitantes)/populacao), 2)

In [144]:
# Identifica o número de casos novos pela data mais recente
df_obitos_novos = table_covid.select("obitosNovos").filter("regiao='Brasil'").sort(col("data").desc())
obitos_novos = df_obitos_novos.first()
obitos_novos("obitosNovos")

Row(1780='obitosNovos')

In [156]:
# painel de casos confirmados com calculo de mortalidade por 100 mil/hab
painel_obitos = table_covid.filter("regiao='Brasil'")\
                                .select("obitosAcumulado","populacaoTCU2019", "casosAcumulado")\
                                .agg(max("obitosAcumulado").alias("obitosAcumulado"),\
                                     max("casosAcumulado").alias("casosAcumulado"),\
                                     max("populacaoTCU2019").alias("populacaoTCU2019"))\
                                .withColumn("obitosNovos", lit(obitos_novos[0]))\
                                .withColumn("mortalidade",
                                           calcula_mortalidade(col("obitosAcumulado").cast(FloatType()),
                                                               col("populacaoTCU2019").cast(FloatType()),
                                                               100000))\
                                .withColumn("letalidade",
                                           calcula_letalidade(col("casosAcumulado").cast(FloatType()),
                                                               col("obitosAcumulado").cast(FloatType())))

In [157]:
painel_obitos = painel_obitos.select("obitosAcumulado", "obitosNovos", "mortalidade", "letalidade")
painel_obitos.show()

+---------------+-----------+-----------+----------+
|obitosAcumulado|obitosNovos|mortalidade|letalidade|
+---------------+-----------+-----------+----------+
|         526892|       1780|     250.73|      2.79|
+---------------+-----------+-----------+----------+



### 4. Salvar a primeira visualização como tabela Hive

In [151]:
painel_recuperados_acompanhamento.write.mode("overwrite").saveAsTable("projetofinal.casos_recuperados")

In [153]:
spark.read.table("projetofinal.casos_recuperados").show()

+----------------+---------------------+
|Recuperadosnovos|emAcompanhamentoNovos|
+----------------+---------------------+
|        17262646|              1317658|
+----------------+---------------------+



In [161]:
spark.sql("desc formatted projetofinal.casos_recuperados").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|    Recuperadosnovos|                 int|   null|
|emAcompanhamentoN...|                 int|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|        projetofinal|       |
|               Table|   casos_recuperados|       |
|               Owner|                root|       |
|        Created Time|Mon Aug 08 19:01:...|       |
|         Last Access|Thu Jan 01 00:00:...|       |
|          Created By|         Spark 2.4.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|    Table Properties|[transient_lastDd...|       |
|            Location|file:/mnt/noteboo...|       |
|       Serde Library|org.apache.hadoop...|       |
|         InputFormat|org.apache.hadoop...|       |
|        Out

### 5. Salvar a segunda visualização com formato parquet e compressão snappy

In [163]:
# Salvando como parquet e compressão snappy com saveAsTable
painel_casos_confirmados.write.mode("overwrite")\
                              .option("compression", "snappy")\
                              .format("parquet")\
                              .saveAsTable("projetofinal.casos_confirmados")

In [164]:
spark.read.table("projetofinal.casos_confirmados").show()

+--------------+----------+----------+
|casosAcumulado|casosNovos|Incidencia|
+--------------+----------+----------+
|      18855015|     62504|  8,972.29|
+--------------+----------+----------+



In [166]:
spark.sql("desc extended projetofinal.casos_confirmados").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|      casosAcumulado|                 int|   null|
|          casosNovos|                 int|   null|
|          Incidencia|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|        projetofinal|       |
|               Table|   casos_confirmados|       |
|               Owner|                root|       |
|        Created Time|Mon Aug 08 19:18:...|       |
|         Last Access|Thu Jan 01 00:00:...|       |
|          Created By|         Spark 2.4.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|    Table Properties|[transient_lastDd...|       |
|            Location|file:/mnt/noteboo...|       |
|       Serde Library|org.apache.hadoop...|       |
|         In

In [167]:
# Visualizando os arquivos
!hdfs dfs -ls file:/mnt/notebooks/spark-warehouse/projetofinal.db/casos_confirmados

Found 2 items
-rw-r--r--   1 root root          0 2022-08-08 19:18 file:///mnt/notebooks/spark-warehouse/projetofinal.db/casos_confirmados/_SUCCESS
-rw-r--r--   1 root root        906 2022-08-08 19:18 file:///mnt/notebooks/spark-warehouse/projetofinal.db/casos_confirmados/part-00000-faa4b1d4-01cf-49d3-ba63-1d846039b090-c000.snappy.parquet


In [169]:
# salvando no namenode como parquet e compressão snappy
painel_casos_confirmados.write.mode("overwrite")\
                              .option("compression", "snappy")\
                              .parquet("/user/eric/projeto_spark/data/casos_confirmados")

In [170]:
# Visualizando os arquivos
!hdfs dfs -ls /user/eric/projeto_spark/data/casos_confirmados

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-08-08 19:22 /user/eric/projeto_spark/data/casos_confirmados/_SUCCESS
-rw-r--r--   2 root supergroup        906 2022-08-08 19:22 /user/eric/projeto_spark/data/casos_confirmados/part-00000-32432bb0-95b7-489b-8d5d-b548e9c1baf7-c000.snappy.parquet


### 6.  Salvar a terceira visualização em um tópico no Kafka

In [178]:
# Salva o tópico no kafka (key, value)
painel_obitos.selectExpr("CONCAT(1) AS key", "to_json(struct(*)) AS value")\
            .write\
            .format("kafka")\
            .option("kafka.bootstrap.servers","kafka:9092")\
            .option("topic","obitos-confirmados-br")\
            .option("value", "1")\
            .option("checkpointLocation","/user/eric/projeto_spark/data/kafka/checkpoint")\
            .save()

In [180]:
# Lê o tópico para conferência
topic_obitos = spark.read\
                    .format("kafka")\
                    .option("kafka.bootstrap.servers","kafka:9092")\
                    .option("subscribe","obitos-confirmados-br")\
                    .load()

In [182]:
# Formata o tópico para visualização
view_topic = topic_obitos.select(col("key").cast("string"), col("value").cast("string"))
view_topic.show(truncate=False)

+---+----------------------------------------------------------------------------------------+
|key|value                                                                                   |
+---+----------------------------------------------------------------------------------------+
|1  |{"obitosAcumulado":526892,"obitosNovos":1780,"mortalidade":"250.73","letalidade":"2.79"}|
+---+----------------------------------------------------------------------------------------+



### 7. Criar a visualização pelo Spark com os dados enviados para o HDFS

In [198]:
# Seleciona os dados de acordo com a apresentação solicitada
painel_sintese = table_covid.groupBy(["regiao", "estado"])\
                            .agg(max("casosAcumulado").alias("Casos"),
                                 max("obitosAcumulado").alias("Obitos"),
                                 max("populacaoTCU2019").alias("populacaoTCU2019"),
                                 max("data").alias("Atualizacao"))\
                            .withColumn("incidenciaPor100Mil",
                                           calcula_incidencia(col("Casos").cast(FloatType()),
                                                             col("populacaoTCU2019").cast(FloatType()),
                                                             100000))\
                            .withColumn("mortalidadePor100Mil",
                                           calcula_mortalidade(col("Obitos").cast(FloatType()),
                                                               col("populacaoTCU2019").cast(FloatType()),
                                                               100000))\
                            .drop("populacaoTCU2019")

In [200]:
painel_sintese.select("regiao", 
                      "estado", 
                      "Casos", 
                      "Obitos", 
                      "incidenciaPor100Mil", 
                      "mortalidadePor100Mil", 
                      "Atualizacao")\
              .sort(col("regiao").asc()).show(28)

+------------+------+--------+------+-------------------+--------------------+-----------+
|      regiao|estado|   Casos|Obitos|incidenciaPor100Mil|mortalidadePor100Mil|Atualizacao|
+------------+------+--------+------+-------------------+--------------------+-----------+
|      Brasil|     0|18855015|526892|           8,972.29|              250.73| 2021-07-06|
|Centro-Oeste|    MS|  339323|  8400|          12,210.32|              302.27| 2021-07-06|
|Centro-Oeste|    GO|  686433| 19485|           9,780.54|              277.63| 2021-07-06|
|Centro-Oeste|    DF|  434708|  9322|          14,416.89|              309.16| 2021-07-06|
|Centro-Oeste|    MT|  456155| 12000|          13,091.10|              344.39| 2021-07-06|
|    Nordeste|    AL|  220793|  5450|           6,615.80|              163.30| 2021-07-06|
|    Nordeste|    MA|  322052|  9190|           4,551.86|              129.89| 2021-07-06|
|    Nordeste|    PB|  402175|  8724|          10,009.02|              217.12| 2021-07-06|

### 8. Salvar a visualização do exercício 6 em um tópico no Elastic

### 9. Criar um dashboard no Elastic para visualização dos novos dados enviados