# Exercício 1 - Enviar os dados para o hdfs

> OBS: Arquivos baixados no SO em "data/"

> OBS: Realizao no bash
    
```bash
sudo docker exec namenode bash -c "mkdir /projeto_semantix"
sudo docker cp data/. namenode:/projeto_semantix

sudo docker exec namenode bash -c "hdfs dfs -mkdir -p /projeto_semantix/data"
sudo docker exec namenode bash -c "hdfs dfs -moveFromLocal /projeto_semantix/*.csv /projeto_semantix/data/"
```

# Exercício 2 - Otimizar todos os dados do hdfs para uma tabela Hive particionada por município

> OBS: Realizado no bash

```bash
sudo docker exec -it hive-server bash
beeline -u jdbc:hive2://localhost:10000
```

> OBS: Realizado no beeline CLI

```sql
create database projeto_semantix;
use projeto_semantix;

create external table landing_painel_covidbr (
    regiao string,
    estado string,
    municipio string,
    coduf tinyint,
    codmun int,
    codRegiaoSaude int,
    nomeRegiaoSaude string,
    data date,
    semanaEpi tinyint,
    populacaoTCU2019 int,
    casosAcumulado int,
    casosNovos int,
    obitosAcumulado int,
    obitosNovos int,
    Recuperadosnovos int,
    emAcompanhamentoNovos int,
    interior_metropolitada tinyint
)
row format delimited
fields terminated by ';'
stored as textfile
location '/projeto_semantix/data'
tblproperties (
    "skip.header.line.count"="1");

create table painel_covidbr_brasil (
    regiao string,
    estado string,
    municipio string,
    coduf tinyint,
    codRegiaoSaude int,
    nomeRegiaoSaude string,
    data date,
    semanaEpi tinyint,
    populacaoTCU2019 int,
    casosAcumulado int,
    casosNovos int,
    obitosAcumulado int,
    obitosNovos int,
    Recuperadosnovos int,
    emAcompanhamentoNovos int,
    interior_metropolitada tinyint
)
stored as orc;

set hive.exec.dynamic.partition = true;
set hive.exec.dynamic.partition.mode=nonstrict;

from landing_painel_covidbr
    insert into table painel_covidbr_brasil
        select 
            regiao,
            estado,
            municipio,
            coduf,
            codRegiaoSaude,
            nomeRegiaoSaude,
            data,
            semanaEpi,
            populacaoTCU2019,
            casosAcumulado,
            casosNovos,
            obitosAcumulado,
            obitosNovos,
            Recuperadosnovos,
            emAcompanhamentoNovos,
            interior_metropolitada
            where regiao = 'Brasil' and year(data) = 2021;
```

In [1]:
spark.catalog.setCurrentDatabase('projeto_semantix')
spark.catalog.listTables()

[Table(name='landing_painel_covidbr', database='projeto_semantix', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='painel_covidbr', database='projeto_semantix', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='painel_covidbr_brasil', database='projeto_semantix', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='visualizacao_item_4', database='projeto_semantix', description=None, tableType='MANAGED', isTemporary=False)]

## Exercício 3 (1/3) - Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:

In [11]:
sql_spark='''
select 
    max(data)
from painel_covidbr_brasil
'''

spark.sql(sql_spark).show()

+----------+
| max(data)|
+----------+
|2021-07-06|
+----------+



> OBS: Ao contrário do que o nome sugere, os campos Recuperados Novos e Em Acompanhamento Novos apresentam os valores acumulados.

In [7]:
sql_spark='''
select 
    recuperadosnovos, emacompanhamentonovos
from painel_covidbr_brasil
where data >= date('2021-07-06')
order by data
'''

df_item_4 = spark.sql(sql_spark)

## Exercício 4 - Salvar a primeira visualização como tabela Hive

In [9]:
df_item_4.write.format("orc").mode("overwrite")\
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec")\
    .saveAsTable("visualizacao_item_4")

## Exercício 3 (2/3) - Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:

In [10]:
sql_spark='''
select 
    casosacumulado, casosnovos, round(casosacumulado/populacaoTCU2019*100000,2) incidencia
from painel_covidbr_brasil
where data >= date('2021-07-06')
order by data
'''

df_item_5 = spark.sql(sql_spark)

In [13]:
df_item_5.show()

+--------------+----------+----------+
|casosacumulado|casosnovos|incidencia|
+--------------+----------+----------+
|      18855015|     62504|   8972.29|
+--------------+----------+----------+



## Exercício 5 - Salvar a segunda visualização com formato parquet e compressão snappy

In [None]:
df_item_5.write.format("parquet").mode("overwrite")\
    .option("compression","snappy")\
    .saveAsTable("visualizacao_item_5")

> OBS: A lib do parquet está dando problema na instalação do "BigData" que usamos no curso, infelizmente não pude dedicar tempo suficiente para fazer o troubleshooting de infraestrutura. Cheguei a tentar substituir a versão tanto em hive/lib quanto em spark/jars, mas sem sucesso.

## Exercício 3 (3/3) - Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:

In [15]:
sql_spark='''
select 
    regiao as key, cast(obitosacumulado as string) as value
from painel_covidbr_brasil
where data >= date('2021-07-06')
order by data
'''

df_item_6 = spark.sql(sql_spark)

In [16]:
df_item_6.show()

+------+------+
|   key| value|
+------+------+
|Brasil|526892|
+------+------+



## Exercício 6 - Salvar a terceira visualização em um tópico no Kafka

In [29]:
df_item_6.write.format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('topic', 'topic-item-6').save()

## Exercício 7 - Criar a visualização pelo Spark com os dados enviados para o HDFS: 

In [2]:
sql_spark='''
select 
    regiao, sum(casosacumulado) CasosAcumulados, sum(obitosacumulado) ObitosAcumulados, 
    round(sum(casosacumulado)/sum(populacaoTCU2019)*100000,2) Incidencia,
    round(sum(obitosacumulado)/sum(populacaoTCU2019)*100000,2) Mortalidade,
    date_format(data, 'dd/MM/yyyy') Atualizacao
from landing_painel_covidbr
where data >= date('2021-07-06') and codmun is null
group by regiao, data
'''

df_item_7 = spark.sql(sql_spark)

In [3]:
df_item_7.show()

+------------+---------------+----------------+----------+-----------+-----------+
|      regiao|CasosAcumulados|ObitosAcumulados|Incidencia|Mortalidade|Atualizacao|
+------------+---------------+----------------+----------+-----------+-----------+
|         Sul|        3611041|           80705|  12046.45|     269.23| 06/07/2021|
|Centro-Oeste|        1916619|           49207|  11760.51|     301.94| 06/07/2021|
|       Norte|        1732815|           43845|   9401.64|     237.89| 06/07/2021|
|      Brasil|       18855015|          526892|   8972.29|     250.73| 06/07/2021|
|     Sudeste|        7138803|          245311|   8078.18|     277.59| 06/07/2021|
|    Nordeste|        4455737|          107824|   7807.27|     188.93| 06/07/2021|
+------------+---------------+----------------+----------+-----------+-----------+



## Exercício 8 - Salvar a visualização do exercício 6 em um tópico no Elastic

### Transforma e prepara RDD

In [4]:
import json
import hashlib

rdd = df_item_7.toJSON()

def addId(data):
    j=json.dumps(data).encode('ascii', 'ignore')
    doc_id = hashlib.sha224(j).hexdigest()
    return (doc_id, json.dumps(data))

rdd2 = rdd.map(addId)

In [None]:
es_write_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : 'semantix/_doc',
    "es.input.json": "yes",
    "es.mapping.id": "doc_id"
}

rdd2.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)

> OBS: Não consegui estabelecer a conexão entre os containers Spark/ElasticSearch mesmo tentando connectá-los usando o "docker network connect". Infelizmente não pude dedicar tempo suficiente para fazer o troubleshooting de infraestrutura.

## Exercício 9 - Criar um dashboard no Elastic para visualização dos novos dados enviados

> OBS: Ficou prejudicado pelo problema de conexão acima.