In [None]:
from pyspark.sql.functions import *

In [2]:
spark = SparkSession\
.builder\
.appName('Projeto Final(Engenharia de Dados) - Painel Covid-19')\
.config('spark.some.config.option', 'some-value')\
.enableHiveSupport()\
.getOrCreate()

In [3]:
painelcovid = spark.read.csv('hdfs://namenode/user/Carlos/Projeto/Projeto', 
                                   sep=";", #sep(padrão ,): define o único caractere como separador para cada campo e valor.
                                   header=True, #header(padrão false): usa a primeira linha como nomes de colunas.
                                   inferSchema=True, #inferSchema(padrão false): infere o esquema de entrada automaticamente a partir dos dados. 
                                   ignoreLeadingWhiteSpace=True, #ignoreLeadingWhiteSpace(padrão false): define se os espaços em branco iniciais dos valores que estão sendo lidos devem ser ignorados.
                                   ignoreTrailingWhiteSpace=True) #ignoreTrailingWhiteSpace(padrão false): define se os espaços em branco à direita dos valores que estão sendo lidos devem ser ignorados.

In [4]:
#df_painel_covidbr.dtypes
painelcovid.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [9]:
#Otimizar dados por tabelas particionadas por município
spark.sql("CREATE DATABASE IF NOT EXISTS basecovid")
painelcovid.write\
.mode('overwrite')\
.partitionBy('municipio')\
.format('csv')\
.saveAsTable('basecovid.municipio', path='hdfs://namenode:8020/user/hive/warehouse/basecovid/')

In [10]:
#Confirmar se os dados foram salvos no diretório
!hdfs dfs -ls 'hdfs://namenode:8020/user/hive/warehouse/basecovid/'

Found 5299 items
-rw-r--r--   2 root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/_SUCCESS
drwxr-xr-x   - root supergroup          0 2022-08-08 05:13 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Abadia de Goiás
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Abadia dos Dourados
drwxr-xr-x   - root supergroup          0 2022-08-08 05:13 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Abadiânia
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Abaetetuba
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Abaeté
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Abaiara
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs:

drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Ananindeua
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Ananás
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Anapu
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Anapurus
drwxr-xr-x   - root supergroup          0 2022-08-08 05:13 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Anastácio
drwxr-xr-x   - root supergroup          0 2022-08-08 05:13 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Anaurilândia
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Anchieta
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hiv

drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Araguaçu
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Araguaína
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Araioses
drwxr-xr-x   - root supergroup          0 2022-08-08 05:13 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Aral Moreira
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Aramari
drwxr-xr-x   - root supergroup          0 2022-08-08 05:13 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Arambaré
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Arame
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive

drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Batalha
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Batatais
drwxr-xr-x   - root supergroup          0 2022-08-08 05:13 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Batayporã
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Baturité
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Bauru
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Bayeux
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Baía Formosa
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/w

drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Canavieira
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Canavieiras
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Canaã
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Canaã dos Carajás
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Candeal
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Candeias
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Candeias do Jamari
drwxr-xr-x   - root supergroup          0 2022-08-08 05:14 hdfs://name

drwxr-xr-x   - root supergroup          0 2022-08-08 05:14 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Derrubadas
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Descalvado
drwxr-xr-x   - root supergroup          0 2022-08-08 05:14 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Descanso
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Descoberto
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Desterro
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Desterro de Entre Rios
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Desterro do Melo
drwxr-xr-x   - root supergroup          0 2022-08-08 05:14 hdf

drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Duas Estradas
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Dueré
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Dumont
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Duque Bacelar
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Duque de Caxias
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Durandé
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Dário Meira
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8

drwxr-xr-x   - root supergroup          0 2022-08-08 05:14 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Guapó
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Guarabira
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Guaraci
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Guaraciaba
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Guaraciaba do Norte
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Guaraciama
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Guaramiranga
drwxr-xr-x   - root supergroup          0 2022-08-08 05:14 hdfs://namenode

drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Nova Timboteua
drwxr-xr-x   - root supergroup          0 2022-08-08 05:15 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Nova Trento
drwxr-xr-x   - root supergroup          0 2022-08-08 05:15 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Nova Ubiratã
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Nova União
drwxr-xr-x   - root supergroup          0 2022-08-08 05:15 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Nova Veneza
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Nova Venécia
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=Nova Viçosa
drwxr-xr-x   - root supergroup          0 2022-08-08 05:15 hdfs:/

drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=São Paulo
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=São Paulo das Missões
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=São Paulo de Olivença
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=São Paulo do Potengi
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=São Pedro
drwxr-xr-x   - root supergroup          0 2022-08-08 05:17 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=São Pedro da Aldeia
drwxr-xr-x   - root supergroup          0 2022-08-08 05:16 hdfs://namenode:8020/user/hive/warehouse/basecovid/municipio=São Pedro da Cipa
drwxr-xr-x   - root supergroup

In [11]:
# Acessar os dados da tabela municipio do Banco de dados basecovid pelo SparkSQL
spark.sql("SHOW DATABASES").show() #Mostrar os anco de dados

+------------+
|databaseName|
+------------+
|   basecovid|
|     default|
+------------+



In [12]:
# Selecionar o banco de dados basecovid 
spark.sql("USE basecovid")

# Acessar os dados da tabela municipio do Banco de dados basecovid pelo SparkSQL
spark.sql("SHOW TABLES").show() #Mostrar os anco de dados

+---------+---------+-----------+
| database|tableName|isTemporary|
+---------+---------+-----------+
|basecovid|municipio|      false|
+---------+---------+-----------+



In [14]:
## 1.1  Recuperados
recuperados = spark.sql("SELECT Recuperadosnovos as Recuperados FROM municipio order by 1 desc limit 1")
recuperados.show()

+-----------+
|Recuperados|
+-----------+
|   32511634|
+-----------+



In [15]:
## 1.2 Em Acompanhamento
em_acompanhamento = spark.sql("SELECT emAcompanhamentoNovos as Em_Acompanhamento FROM municipio WHERE order by 1 desc limit 1")
em_acompanhamento.show()

+-----------------+
|Em_Acompanhamento|
+-----------------+
|          3182910|
+-----------------+



In [16]:
#2.1 Acumulado
casos_acumulados = spark.sql("SELECT casosAcumulado as Casos_Acumulados FROM municipio order by 1 desc limit 1")
casos_acumulados.show()

#2.2 Casos novos
novos_casos = spark.sql("SELECT casosNovos as Casos_Novos FROM municipio order by 1 desc limit 1")
novos_casos.show()

#2.3 Incidência (Casos acumulados)
incidencia = spark.sql("SELECT cast(((casosAcumulado*100000)/populacaoTCU2019) as decimal(5,1)) as Incidencia FROM municipio order by 1 desc limit 1")
incidencia.show()

+----------------+
|Casos_Acumulados|
+----------------+
|        33890428|
+----------------+

+-----------+
|Casos_Novos|
+-----------+
|     298408|
+-----------+

+----------+
|Incidencia|
+----------+
|    9999.6|
+----------+



In [17]:
#3.1 Óbitos acumulados
obitos_acumulados = spark.sql("SELECT obitosAcumulado as Obitos_Acumulados FROM municipio order by 1 desc limit 1")
obitos_acumulados.show()

#3.2 Obitos novos
obitos_novos = spark.sql("SELECT obitosNovos as Obitos_Novos FROM municipio order by 1 desc limit 1")
obitos_novos.show()

#3.3 Letalidade
letalidade = spark.sql("SELECT cast(((obitosAcumulado * 100) / casosAcumulado) as decimal(5,1)) as Letalidade FROM municipio order by 1 desc limit 1")
letalidade.show()

#3.4 Mortalidade
mortalidade = spark.sql("SELECT cast(((obitosAcumulado * 100000) / populacaoTCU2019) as decimal(4,1)) as Mortalidade FROM municipio order by 1 desc limit 1")
mortalidade.show()

+-----------------+
|Obitos_Acumulados|
+-----------------+
|           679010|
+-----------------+

+------------+
|Obitos_Novos|
+------------+
|        1308|
+------------+

+----------+
|Letalidade|
+----------+
|     114.3|
+----------+

+-----------+
|Mortalidade|
+-----------+
|      992.9|
+-----------+



In [18]:
recuperados.write.format('csv').saveAsTable('Recuperados')

In [19]:
em_acompanhamento.write.format('csv').saveAsTable('Acompanhamento')

In [20]:
#Visualizar as tabelas salvas 
spark.sql('SHOW TABLES').show()

+---------+--------------+-----------+
| database|     tableName|isTemporary|
+---------+--------------+-----------+
|basecovid|acompanhamento|      false|
|basecovid|     municipio|      false|
|basecovid|   recuperados|      false|
+---------+--------------+-----------+



Salvar a segunda visualização com formato parquet e compressão snappy

In [23]:
casos_acumulados.write.option('compression', 'snappy').parquet('hdfs://namenode/user/Carlos/Projeto/visualizacao/Acumulados')

In [24]:
novos_casos.write.option('compression', 'snappy').parquet('hdfs://namenode/user/Carlos/Projeto/visualizacao/novos_casos')

In [25]:
incidencia.write.option('compression', 'snappy').parquet('hdfs://namenode/user/Carlos/Projeto/visualizacao/incidencia')

In [26]:
#Visualizar os dodos na pasta do HDFS
!hdfs dfs -ls 'hdfs://namenode/user/Carlos/Projeto/visualizacao'

Found 3 items
drwxr-xr-x   - root supergroup          0 2022-08-08 05:34 hdfs://namenode/user/Carlos/Projeto/visualizacao/Acumulados
drwxr-xr-x   - root supergroup          0 2022-08-08 05:36 hdfs://namenode/user/Carlos/Projeto/visualizacao/incidencia
drwxr-xr-x   - root supergroup          0 2022-08-08 05:35 hdfs://namenode/user/Carlos/Projeto/visualizacao/novos_casos


In [27]:
#3.1 Óbitos acumulados
obitos_acumulados_json = obitos_acumulados.select(to_json(struct('obitos_Acumulados')).alias('value'))
obitos_acumulados_json.show(n=1, truncate=False, vertical=False)

+----------------------------+
|value                       |
+----------------------------+
|{"obitos_Acumulados":679010}|
+----------------------------+



In [29]:
#3.2 Obitos novos
obitos_novos_json = obitos_novos.select(to_json(struct('obitos_Novos')).alias('value'))
obitos_novos_json.show(n=1, truncate=False, vertical=False)

+---------------------+
|value                |
+---------------------+
|{"obitos_Novos":1308}|
+---------------------+



In [31]:
#3.3 Letalidade
letalidade_json = letalidade.select(to_json(struct('Letalidade')).alias('value'))
letalidade_json.show(n=1, truncate=False, vertical=False)

+--------------------+
|value               |
+--------------------+
|{"Letalidade":114.3}|
+--------------------+



In [32]:
#3.4 Mortalidade
mortalidade_json = mortalidade.select(to_json(struct('Mortalidade')).alias('value'))
mortalidade_json.show(n=1, truncate=False, vertical=False)

+---------------------+
|value                |
+---------------------+
|{"Mortalidade":992.9}|
+---------------------+



In [33]:
obitos_acumulados.selectExpr('to_json(struct(*)) As value')\
.write.format('kafka')\
.option('kafka.bootstrap.servers', 'kafka:9092')\
.option('topic', 'topic-obtidos_acumulados')\
.save()

In [34]:
obitos_novos.selectExpr('to_json(struct(*)) As value')\
.write\
.format("kafka")\
.option("kafka.bootstrap.servers","kafka:9092")\
.option("topic","topic-obitos_novos")\
.save()

In [35]:
letalidade.selectExpr('to_json(struct(*)) As value')\
.write\
.format("kafka")\
.option("kafka.bootstrap.servers","kafka:9092")\
.option("topic","topic-letalidade")\
.save()

In [36]:
mortalidade.selectExpr('to_json(struct(*)) As value')\
.write\
.format("kafka")\
.option("kafka.bootstrap.servers","kafka:9092")\
.option("topic","topic-mortalidade")\
.save()

In [37]:
spark = spark.sql('''SELECT regiao as Regiao, 
                     MAX(casosAcumulado) as Casos, 
                     MAX(obitosAcumulado) as Obitos, 
                     MAX(data) as Atualizacao 
                     FROM municipio 
                     GROUP BY regiao 
                     ORDER BY regiao''')

spark.show()

+------------+--------+------+-------------------+
|      Regiao|   Casos|Obitos|        Atualizacao|
+------------+--------+------+-------------------+
|      Brasil|33890428|679010|2022-08-02 00:00:00|
|Centro-Oeste| 1631836| 27200|2022-08-02 00:00:00|
|    Nordeste| 1662253| 30377|2022-08-02 00:00:00|
|       Norte|  812659| 18548|2022-08-02 00:00:00|
|     Sudeste| 5927119|173036|2022-08-02 00:00:00|
|         Sul| 2694860| 44525|2022-08-02 00:00:00|
+------------+--------+------+-------------------+



In [38]:
ES_NODES='<ES_ENDPOINT>'
ES_PORT='<ES_PORT>'
ES_NET_HTTP_AUTH_USER='<USER>'
ES_NET_HTTP_AUTH_PASS='<PASSWORD>'
ES_NET_SLL='true'
ES_NODE_WAN_ONLY='true'
ES_WRITE_OPERATION = 'upsert'

def get_elastic_config_options() -> dict:
    return {
        'es.nodes': ES_NODES,
        'es.port': ES_PORT,
        'es.net.http.auth.user': ES_NET_HTTP_AUTH_USER,
        'es.net.http.auth.pass': ES_NET_HTTP_AUTH_PASS,
        'es.net.sll': ES_NET_SLL,
        'es.nodes.wan.only': ES_NODE_WAN_ONLY,
        'es.write.operation': ES_WRITE_OPERATION
    }

In [None]:
ELASTIC_OPTIONS = get_elastic_config_options()

recuperados.write \
    .format('org.elasticsearch.spark.sql') \
    .options(**ELASTIC_OPTIONS) \
    .mode('append') \
    .save('Recuperados')

casos_acumulados.write \
    .format('org.elasticsearch.spark.sql') \
    .options(**ELASTIC_OPTIONS) \
    .mode('append') \
    .save('Casos_Confirmados')

obitos_acumulados.write \
    .format('org.elasticsearch.spark.sql') \
    .options(**ELASTIC_OPTIONS) \
    .mode('append') \
    .save('Obitos_Confirmados')

IndentationError: unexpected indent (<ipython-input-40-364139d3e15a>, line 4)