## Projeto final do curso Spark - Big Data Processing Semantix Academy

### Campannha Nacional de Vacinação contra Convid-19

In [50]:
# Importação das bibliotecas
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from  pyspark.sql.types import IntegerType
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import *

## 1. Enviar os dados para o hdfs

In [2]:
# Carregando os dados no hdfs
#!hdfs dfs -put /input/covid_br/ /user/allan/projeto_covid_br

In [3]:
# Consultando os dados no hdfs
!hdfs dfs -ls /user/allan/projeto_covid_br

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2022-04-21 20:00 /user/allan/projeto_covid_br/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-04-21 20:00 /user/allan/projeto_covid_br/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-04-21 20:00 /user/allan/projeto_covid_br/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-04-21 20:00 /user/allan/projeto_covid_br/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [4]:
# Verificando o formato dos dados
!hdfs dfs -cat /user/allan/projeto_covid_br/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv | head -n 5

regiao;estado;municipio;coduf;codmun;codRegiaoSaude;nomeRegiaoSaude;data;semanaEpi;populacaoTCU2019;casosAcumulado;casosNovos;obitosAcumulado;obitosNovos;Recuperadosnovos;emAcompanhamentoNovos;interior/metropolitana
Brasil;;;76;;;;2020-02-25;9;210147125;0;0;0;0;;;
Brasil;;;76;;;;2020-02-26;9;210147125;1;1;0;0;;;
Brasil;;;76;;;;2020-02-27;9;210147125;1;0;0;0;;;
Brasil;;;76;;;;2020-02-28;9;210147125;1;0;0;0;;;
cat: Unable to write to output stream.


In [5]:
!hdfs dfs -tail /user/allan/projeto_covid_br/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv

30010;53001;DISTRITO FEDERAL;2020-07-22;30;3015268;87801;1725;1176;18;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-23;30;3015268;90023;2222;1218;42;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-24;30;3015268;92414;2391;1244;26;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-25;30;3015268;94187;1773;1275;31;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-26;31;3015268;96332;2145;1308;33;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-27;31;3015268;98480;2148;1339;31;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-28;31;3015268;100726;2246;1391;52;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-29;31;3015268;102342;1616;1419;28;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-30;31;3015268;104442;2100;1444;25;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-31;31;3015268;1

## 2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município

Conectando com o hive-server
- docker exec -it hive-server bash

Conectando com o beeline
- beeline -u jdbc:hive2://localhost:10000

Criando o banco
- create database allan
- use allan

Criando uma tabela
- create table covid_br(regiao String, estado String, municipio String, coduf int, codmun int, codRegiaoSaude int, 
nomeRegiaoSaude String, data date, semanaEpi int, populacaoTCU2019 int, casosAcumulado int, casosNovos int,
obitosAcumulado int, obitosNovos int, Recuperadosnovos int, emAcompanhamentoNovos int, interiorMetropolitana int)
row format delimited
fields terminated by ';'
lines terminated by '\n'
stored as textfile
tblproperties("skip.header.line.count"="1");

Descrição da tabela
- desc formatted covid_br;

Carregando os dados do hdfs para a tabela criada
- load data inpath '/user/allan/projeto_covid_br' overwrite into table covid_br;

Visualizando os dados 
- select * from covid_br limit 10;

Contar o numero de registros
- select count(*) from covid_br;
- 2624943



Devido a grande quantidade de municipios e por limitação da maquina utilizada, o particionamento por municipios nao foi possivel. Dessa forma iremos utilizar a partição por estado

particionamento dinamico

create table covid_br_particao_UF(regiao String,municipio String,coduf int,codmun int,codRegiaoSaude int,nomeRegiaoSaude String,data date,semanaEpi int,populacaoTCU2019 int,casosAcumulado int,casosNovos int,obitosAcumulado int,obitosNovos int,
Recuperadosnovos int,emAcompanhamentoNovos int,interiorMetropolitana int)

partitioned by (estado String)

row format delimited

fields terminated by ';'

lines terminated by '\n'

stored as textfile

tblproperties("skip.header.line.count"="1");

Inserindo os dados da tabela particionada

INSERT OVERWRITE TABLE covid_br_particao_UF PARTITION(estado)

SELECT regiao,municipio,coduf,codmun,codRegiaoSaude,nomeRegiaoSaude,data,semanaEpi,populacaoTCU2019,casosAcumulado,
casosNovos,obitosAcumulado,obitosNovos,Recuperadosnovos,emAcompanhamentoNovos,interiorMetropolitana,estado
FROM covid_br;



In [6]:
# Acessando os dados salvos no hdfs

!hdfs dfs -ls /user/hive/warehouse/allan.db/covid_br_particao_uf

Found 28 items
drwxrwxr-x   - root supergroup          0 2022-04-21 19:23 /user/hive/warehouse/allan.db/covid_br_particao_uf/estado=AC
drwxrwxr-x   - root supergroup          0 2022-04-21 19:23 /user/hive/warehouse/allan.db/covid_br_particao_uf/estado=AL
drwxrwxr-x   - root supergroup          0 2022-04-21 19:23 /user/hive/warehouse/allan.db/covid_br_particao_uf/estado=AM
drwxrwxr-x   - root supergroup          0 2022-04-21 19:23 /user/hive/warehouse/allan.db/covid_br_particao_uf/estado=AP
drwxrwxr-x   - root supergroup          0 2022-04-21 19:23 /user/hive/warehouse/allan.db/covid_br_particao_uf/estado=BA
drwxrwxr-x   - root supergroup          0 2022-04-21 19:23 /user/hive/warehouse/allan.db/covid_br_particao_uf/estado=CE
drwxrwxr-x   - root supergroup          0 2022-04-21 19:23 /user/hive/warehouse/allan.db/covid_br_particao_uf/estado=DF
drwxrwxr-x   - root supergroup          0 2022-04-21 19:23 /user/hive/warehouse/allan.db/covid_br_particao_uf/estado=ES
drwxrwxr-x   - root super

In [7]:
!hdfs dfs -ls /user/hive/warehouse/allan.db/covid_br

Found 4 items
-rwxrwxr-x   3 root supergroup   62492959 2022-04-21 11:57 /user/hive/warehouse/allan.db/covid_br/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rwxrwxr-x   3 root supergroup   76520681 2022-04-21 11:57 /user/hive/warehouse/allan.db/covid_br/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rwxrwxr-x   3 root supergroup   91120916 2022-04-21 11:57 /user/hive/warehouse/allan.db/covid_br/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rwxrwxr-x   3 root supergroup    3046774 2022-04-21 11:57 /user/hive/warehouse/allan.db/covid_br/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [8]:
covid = spark.read.csv("/user/hive/warehouse/allan.db/covid_br", sep=";", header="True", inferSchema="True")

In [9]:
covid.take(2)

[Row(regiao='Brasil', estado=None, municipio=None, coduf=76, codmun=None, codRegiaoSaude=None, nomeRegiaoSaude=None, data=datetime.datetime(2020, 2, 25, 0, 0), semanaEpi=9, populacaoTCU2019=210147125, casosAcumulado=Decimal('0'), casosNovos=0, obitosAcumulado=0, obitosNovos=0, Recuperadosnovos=None, emAcompanhamentoNovos=None, interior/metropolitana=None),
 Row(regiao='Brasil', estado=None, municipio=None, coduf=76, codmun=None, codRegiaoSaude=None, nomeRegiaoSaude=None, data=datetime.datetime(2020, 2, 26, 0, 0), semanaEpi=9, populacaoTCU2019=210147125, casosAcumulado=Decimal('1'), casosNovos=1, obitosAcumulado=0, obitosNovos=0, Recuperadosnovos=None, emAcompanhamentoNovos=None, interior/metropolitana=None)]

In [10]:
covid.show(5)

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25 00:00:00|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null

In [11]:
print(covid.printSchema())

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)

None


In [12]:
# lendo oarquivo inferindo schema
estrutura_lista = [
    StructField("regiao", StringType()),
    StructField("estado", StringType()),
    StructField("municipio", StringType()),
    StructField("coduf", IntegerType()),
    StructField("codmun", StringType()),
    StructField("codRegiaoSaude", StringType()),
    StructField("nomeRegiaoSaude", StringType()),
    StructField("data", DateType()),
    StructField("semanaEpi", IntegerType()),
    StructField("populacaoTCU2019", IntegerType()),
    StructField("casosNovos", IntegerType()),
    StructField("obitosAcumulado", IntegerType()),
    StructField("obitosNovos", IntegerType()),
    StructField("Recuperadosnovos", IntegerType()),
    StructField("emAcompanhamentoNovos", IntegerType()),
    StructField("interiorMetropolitana", IntegerType())
]

schema_names = StructType(estrutura_lista)

In [13]:
covid = spark.read.csv("/user/hive/warehouse/allan.db/covid_br_particao_uf", sep=";", schema=schema_names)

In [14]:
covid.show(5)

+------+---------+-----+------+--------------+---------------+----+---------+----------------+----------+---------------+-----------+----------------+---------------------+---------------------+------+
|regiao|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|data|semanaEpi|populacaoTCU2019|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interiorMetropolitana|estado|
+------+---------+-----+------+--------------+---------------+----+---------+----------------+----------+---------------+-----------+----------------+---------------------+---------------------+------+
|  null|     null| null|  null|          null|           null|null|     null|            null|      null|           null|       null|            null|                 null|                 null|    MG|
|  null|     null| null|  null|          null|           null|null|     null|            null|      null|           null|       null|            null|                 null|                 nul

In [15]:
print(covid.printSchema())

root
 |-- regiao: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: string (nullable = true)
 |-- codRegiaoSaude: string (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: date (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interiorMetropolitana: integer (nullable = true)
 |-- estado: string (nullable = true)

None


In [16]:
covid.columns

['regiao',
 'municipio',
 'coduf',
 'codmun',
 'codRegiaoSaude',
 'nomeRegiaoSaude',
 'data',
 'semanaEpi',
 'populacaoTCU2019',
 'casosNovos',
 'obitosAcumulado',
 'obitosNovos',
 'Recuperadosnovos',
 'emAcompanhamentoNovos',
 'interiorMetropolitana',
 'estado']

## 3 - Criar 3 visualizações pelo Spark com os dados enviados para o HDFS

In [17]:
visual3 = covid.agg({'Recuperadosnovos': 'sum', 'casosNovos': 'sum','obitosNovos': 'sum'})
visual3.show()

+---------------------+----------------+---------------+
|sum(Recuperadosnovos)|sum(obitosNovos)|sum(casosNovos)|
+---------------------+----------------+---------------+
|              1580655|       274777592|     9998172092|
+---------------------+----------------+---------------+



In [18]:
visual3.columns

['sum(Recuperadosnovos)', 'sum(obitosNovos)', 'sum(casosNovos)']

In [19]:
visual3_ = visual3.withColumnRenamed('sum(Recuperadosnovos)', 'Recuperados')\
    .withColumnRenamed('sum(obitosNovos)', "Obitos")\
    .withColumnRenamed('sum(casosNovos)', "Novos")

visual3_.columns

['Recuperados', 'Obitos', 'Novos']

## 4 - Salvar a primeira visualização como tabela hive

In [20]:
visual4 = covid.agg({'Recuperadosnovos': 'sum', 'emAcompanhamentoNovos': 'sum'})

visual4.show()

+---------------------+--------------------------+
|sum(Recuperadosnovos)|sum(emAcompanhamentoNovos)|
+---------------------+--------------------------+
|               523208|                2920055795|
+---------------------+--------------------------+



In [21]:
visual4_ = visual4.withColumnRenamed('sum(Recuperadosnovos)', 'Recuperados')\
    .withColumnRenamed('sum(emAcompanhamentoNovos)', "Acompanhamento")

visual4_.show()

+-----------+--------------+
|Recuperados|Acompanhamento|
+-----------+--------------+
|     523208|    2920055795|
+-----------+--------------+



In [22]:
# Listando os bancos no hive
spark.catalog.listDatabases()

[Database(name='allan', description='', locationUri='hdfs://namenode:8020/user/hive/warehouse/allan.db'),
 Database(name='default', description='Default Hive database', locationUri='hdfs://namenode:8020/user/hive/warehouse')]

In [23]:
# Listando as tabelas existentes
!hdfs dfs -ls /user/hive/warehouse/allan.db

Found 4 items
drwxrwxr-x   - root supergroup          0 2022-04-21 11:57 /user/hive/warehouse/allan.db/covid_br
drwxrwxr-x   - root supergroup          0 2022-04-21 19:11 /user/hive/warehouse/allan.db/covid_br_particao
drwxrwxr-x   - root supergroup          0 2022-04-21 19:28 /user/hive/warehouse/allan.db/covid_br_particao_municipio
drwxrwxr-x   - root supergroup          0 2022-04-21 19:24 /user/hive/warehouse/allan.db/covid_br_particao_uf


In [24]:
visual4_.write.saveAsTable("exe4", mode="overwrite")

AnalysisException: "Can not create the managed table('`exe4`'). The associated location('hdfs://namenode:8020/user/hive/warehouse/exe4') already exists.;"

In [25]:
# Verificando se os dados foram salvos
!hdfs dfs -ls /user/hive/warehouse

Found 4 items
drwxrwxr-x   - root supergroup          0 2022-04-21 19:27 /user/hive/warehouse/allan.db
drwxr-xr-x   - root supergroup          0 2022-04-24 21:02 /user/hive/warehouse/exe4
drwxr-xr-x   - root supergroup          0 2022-04-24 22:50 /user/hive/warehouse/exe5
drwxr-xr-x   - root supergroup          0 2022-04-10 13:00 /user/hive/warehouse/juros2


In [26]:
!hdfs dfs -ls /user/hive/warehouse/exe4

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-24 21:02 /user/hive/warehouse/exe4/_SUCCESS
-rw-r--r--   2 root supergroup        724 2022-04-24 21:02 /user/hive/warehouse/exe4/part-00000-7b3228e7-f9c0-4841-bd09-cf8177656fad-c000.snappy.parquet


In [27]:
# Visualizando os dados salvos no hive
exe4 = spark.read.parquet("/user/hive/warehouse/exe4/part-00000-7b3228e7-f9c0-4841-bd09-cf8177656fad-c000.snappy.parquet")
exe4.show()

+-----------+--------------+
|Recuperados|Acompanhamento|
+-----------+--------------+
|     523208|    2920055795|
+-----------+--------------+



## 5 - Salvar a segunda visualização com formato parquet e compressão snappy

In [28]:
visual5 = covid.agg({'casosNovos': 'sum','populacaoTCU2019': 'sum'})
visual5 = visual5.withColumnRenamed('sum(casosNovos)', 'casosNovos').withColumnRenamed('sum(populacaoTCU2019)', 'populacao')
visual5.show()

+------------+----------+
|   populacao|casosNovos|
+------------+----------+
|307861595007|9987481628|
+------------+----------+



Calculando a letalidade: (número de óbitos x 100) / número de casos confirmados.

Calculando a mortalidade: (óbitos * 1.000.000) / população.

Calculando a incidencia: (casos confirmados * 1.000.000) / população

In [29]:
# Calculando a incidencia - (casos confirmados * 1.000.000) / população
def incidencia(populacao, casosNovos):
    resp = (casosNovos*1000000) / populacao
    return resp

In [30]:
# float datatype is defined
new_f = F.udf(incidencia, FloatType())

In [31]:
visual5 = visual5.withColumn("incidencia",
                          new_f("populacao", "casosNovos"))

In [32]:
# Showing and printing the schema of the Dataframe
visual5.printSchema()
visual5.show()

root
 |-- populacao: long (nullable = true)
 |-- casosNovos: long (nullable = true)
 |-- incidencia: float (nullable = true)

+------------+----------+----------+
|   populacao|casosNovos|incidencia|
+------------+----------+----------+
|307861595007|9987481628| 32441.467|
+------------+----------+----------+



In [33]:
visual5.write.saveAsTable("exe5", mode="overwrite")

AnalysisException: "Can not create the managed table('`exe5`'). The associated location('hdfs://namenode:8020/user/hive/warehouse/exe5') already exists.;"

In [34]:
# verificando se foi salvo
!hdfs dfs -ls /user/hive/warehouse/exe5

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-24 22:50 /user/hive/warehouse/exe5/_SUCCESS
-rw-r--r--   2 root supergroup        924 2022-04-24 22:50 /user/hive/warehouse/exe5/part-00000-06fc8048-588d-4c6e-a7c0-6efa8cef95b2-c000.snappy.parquet


In [35]:
# Visualizando os dados salvos no hive
exe5 = spark.read.parquet("/user/hive/warehouse/exe5/part-00000-06fc8048-588d-4c6e-a7c0-6efa8cef95b2-c000.snappy.parquet")
exe5.show()

+------------+----------+----------+
|   populacao|casosNovos|incidencia|
+------------+----------+----------+
|307861595007|9987481628| 3244.1467|
+------------+----------+----------+



## 6 - Salvar a terceira visualização em um topico no Kafka

In [36]:
visual6 = covid.agg({'casosNovos': 'sum', 'obitosAcumulado': 'sum', 'populacaoTCU2019': 'sum'})
visual6 = visual6.withColumnRenamed('sum(casosNovos)', 'casosNovos')\
            .withColumnRenamed('sum(obitosAcumulado)', 'obitos')\
            .withColumnRenamed('sum(populacaoTCU2019)', 'populacao')
            
visual6.show()

+------------+--------+----------+
|   populacao|  obitos|casosNovos|
+------------+--------+----------+
|307861595007|56523691|9987481628|
+------------+--------+----------+



In [37]:
# Calculando a letalidade - (número de óbitos x 100) / número de casos confirmados.
def letalidade(obitos, casosNovos):
    letal = (obitos * 100) / casosNovos
    return letal

In [38]:
# define mortalidade - (óbitos * 1.000.000) / população
def mortalidade(obitos, populacao):
    mortal = (obitos * 100000) / populacao
    return mortal

In [39]:
# float datatype is defined
new_f6 = F.udf(letalidade, FloatType())
new_f7 = F.udf(mortalidade, FloatType())

In [40]:
visual7 = visual6.withColumn("Letalidade",
                             new_f6("obitos", "casosNovos"))
visual8 = visual7.withColumn("Mortalidade",
                            new_f7("obitos", "populacao"))

In [41]:
# Showing and printing the schema of the Dataframe
visual8.printSchema()

root
 |-- populacao: long (nullable = true)
 |-- obitos: long (nullable = true)
 |-- casosNovos: long (nullable = true)
 |-- Letalidade: float (nullable = true)
 |-- Mortalidade: float (nullable = true)



In [42]:
visual8.show()

+------------+--------+----------+----------+-----------+
|   populacao|  obitos|casosNovos|Letalidade|Mortalidade|
+------------+--------+----------+----------+-----------+
|307861595007|56523691|9987481628| 0.5659454|  18.360098|
+------------+--------+----------+----------+-----------+



In [43]:
# salvando a visualização no hdfs
visual8.write.json("/user/allan/visual-spark-kafka", mode="overwrite")

In [44]:
# Verificando se o arquivo foi salvo
!hdfs dfs -ls /user/allan

Found 14 items
drwxr-xr-x   - root supergroup          0 2022-04-09 18:00 /user/allan/data
drwxr-xr-x   - root supergroup          0 2022-04-16 11:51 /user/allan/juros_selic
drwxr-xr-x   - root supergroup          0 2022-04-15 13:47 /user/allan/logs_count_word
drwxr-xr-x   - root supergroup          0 2022-04-15 13:46 /user/allan/logs_count_word3
drwxr-xr-x   - root supergroup          0 2022-04-15 14:03 /user/allan/logs_count_word_5
drwxr-xr-x   - root supergroup          0 2022-04-15 14:36 /user/allan/names_us_orc
drwxr-xr-x   - root supergroup          0 2022-04-21 11:13 /user/allan/projeto
drwxr-xr-x   - root supergroup          0 2022-04-21 20:00 /user/allan/projeto_covid_br
drwxr-xr-x   - root supergroup          0 2022-04-17 14:51 /user/allan/projeto_python
drwxr-xr-x   - root supergroup          0 2022-04-16 12:25 /user/allan/relatorio_anual
drwxr-xr-x   - root supergroup          0 2022-04-17 16:21 /user/allan/stream
drwxr-xr-x   - root supergroup          0 2022-0

In [45]:
!hdfs dfs -ls /user/allan/visual-spark-kafka

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-25 19:06 /user/allan/visual-spark-kafka/_SUCCESS
-rw-r--r--   2 root supergroup        116 2022-04-25 19:06 /user/allan/visual-spark-kafka/part-00000-47c99506-8b5f-43fa-98c2-4c274443479c-c000.json


In [46]:
# Enviando o arquivo para o HDFS
# hdfs dfs -get /user/allan/visual-spark-kafka /input

# enviando o arquivo do HDFS para o container do kafka
# docker cp /home/allan/docker/spark/docker-bigdata/input/visual-spark-kafka kafka:/home

# Criar um topico
# kafka-topics.sh --bootstrap-server kafka:9092 --topic KafkaSpark --create --partitions 1 --replication-factor 1

# Listar os topicos
# kafka-topics.sh --bootstrap-server kafka:9092 --list

# criar o consumidor
# kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic KafkaSpark -- group topicArquivo

# criar um produtor com o arquivo
# kafka-console-producer.sh --broker-list kafka:9092 --topic KafkaSpark < /home/visual-spark-kafka/part-00000-bcea425b-bfc5-4f2a-9cde-0ed48ffa900b-c000.json

# Resultado exibido na tela do consumidor
# {"populacao":307861595007,"obitos":56523691,"casosNovos":9987481628,"Letalidade":565.9454,"Mortalidade":18.360098}


## 7 - Criar a visualização pelo Spark com os dados enviados para o HDFS

In [47]:
# Sintese de casos, obitos, incidenciam e mortalidade por região
visual9 = covid.groupBy("regiao").agg({'casosNovos': 'sum', 'obitosAcumulado': 'sum', 'populacaoTCU2019': 'sum'})

visual9 = visual9.withColumnRenamed('sum(casosNovos)', 'casosNovos')\
            .withColumnRenamed('sum(obitosAcumulado)', 'obitos')\
            .withColumnRenamed('sum(populacaoTCU2019)', 'populacao')
            
visual9.show()

+------------+------------+--------+----------+
|      regiao|   populacao|  obitos|casosNovos|
+------------+------------+--------+----------+
|    Nordeste| 55070808753| 8889245|1630021092|
|        null|        null|    null|      null|
|         Sul| 28926824560| 7215160|1148363624|
|     Sudeste| 85278432845|14268253|2414880019|
|Centro-Oeste| 15726676410| 3833238| 715156998|
|      Brasil|105073562500|18855015|3343282900|
|       Norte| 17785289939| 3462780| 735776995|
+------------+------------+--------+----------+



## 8 - Salvar a visualização do exercicio 6 em um topico no Elastic

In [48]:
visual8.show()

+------------+--------+----------+----------+-----------+
|   populacao|  obitos|casosNovos|Letalidade|Mortalidade|
+------------+--------+----------+----------+-----------+
|307861595007|56523691|9987481628| 0.5659454|  18.360098|
+------------+--------+----------+----------+-----------+

