# Desafio Big Data Engineer - Semantix Academy

## 1. Enviar os dados para o hdfs

### no prompt

sudo cd spark/input

sudo mkdir desafio

sudo cd desafio

sudo curl -O https://mobileapps.saude.gov.br/esus-vepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

sudo unrar x 04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

sudo rm 04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

docker exec -it namenode bash

hdfs dfs -mkdir -p /user/eric/desafio

hdfs dfs -put /input/desafio/* /user/eric/desafio

In [None]:
!hdfs dfs -ls /user/eric/desafio

## 2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município.

Criando Database beeline 

docker exec -it hive-server bash

beeline -u jdbc:hive2://localhost:10000

create database COVID19 comment 'Desafio Semantix';

SET hive.exec.dynamic.partition = true;

SET hive.exec.dynamic.partition.mode = nonstrict ;

create table covid19Brutos(
    regiao string,
    estado string,
    municipio string,    
    coduf int,
    codmun int,
    codRegiaoSaude int,
    nomeRegiaoSaude string,
    data string,
    semanaEpi int,
    populacaoTCU2019 int,
    casosAcumulado int,
    casosNovos int,
    obitosAcumulado string,
    obitosNovos int,
    Recuperadosnovos int,
    emAcompanhamentoNovos int,
    interior_metropolitana string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ';'
    LINES TERMINATED BY '\n'
    STORED AS TEXTFILE;

LOAD DATA INPATH '/user/eric/desafio/*.csv' OVERWRITE INTO TABLE covid19Brutos;

create table covid19(
    regiao string,
    estado string,    
    coduf int,
    codmun int,
    codRegiaoSaude int,
    nomeRegiaoSaude string,
    data string,
    semanaEpi int,
    populacaoTCU2019 int,
    casosAcumulado int,
    casosNovos int,
    obitosAcumulado string,
    obitosNovos int,
    Recuperadosnovos int,
    emAcompanhamentoNovos int,
    interior_metropolitana string)
    partitioned by (municipio string);
	
insert overwrite table covid19 partition (municipio) select * from covid19Brutos;

# Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS

In [1]:
spark

In [2]:
sc

In [3]:
spark.sparkContext.setLogLevel("INFO")

In [4]:
spark.catalog.listDatabases()

[Database(name='covid19', description='', locationUri='hdfs://namenode:8020/user/hive/warehouse/covid19.db'),
 Database(name='default', description='Default Hive database', locationUri='hdfs://namenode:8020/user/hive/warehouse')]

In [5]:
# set Database de trabalho
spark.catalog.setCurrentDatabase("covid19")

In [6]:
# Verificando se esta conectado no database correto
spark.catalog.currentDatabase()

'covid19'

In [7]:
#listando as telas
spark.catalog.listTables()

[Table(name='dadospainel_1', database='covid19', description=None, tableType='MANAGED', isTemporary=False)]

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta

estrutura_lista = [
    
    StructField("regiao", StringType()),
    StructField("estado", StringType()),
    StructField("municipio", StringType()),
    StructField("coduf", StringType()),
    StructField("codmun", IntegerType()),
    StructField("codRegiaoSaude", IntegerType()),
    StructField("nomeRegiaoSaude", StringType()),
    StructField("data", StringType()),
    StructField("semanaEpi", IntegerType()),
    StructField("populacaoTCU2019", IntegerType()),
    StructField("casosAcumulado", IntegerType()),
    StructField("casosNovos", IntegerType()),
    StructField("obitosAcumulado", StringType()),
    StructField("obitosNovos", IntegerType()),
    StructField("Recuperadosnovos", IntegerType()),
    StructField("emAcompanhamentoNovos", IntegerType()),
    StructField("interior/metropolitana", StringType())
]

schema_names = StructType(estrutura_lista)

dados = spark.read.csv("/user/eric/desafio/*.csv", header='true',sep=';', schema = schema_names)
dados = dados.withColumn("incidenciaCasos", format_number(col("casosAcumulado")/(col("populacaoTCU2019")/100000),2))
dados = dados.withColumn("incidenciaObitos", format_number(col("obitosAcumulado")/(col("populacaoTCU2019")/100000),2))
dados = dados.withColumn("letalidade", format_number((col("obitosAcumulado")/col("casosAcumulado"))*100,2))

dados.show(5)
dados.printSchema()
dados.count()

+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+---------------+----------------+----------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|      data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|incidenciaCasos|incidenciaObitos|letalidade|
+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+---------------+----------------+----------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25|        9|       210147125|             0|         0|              0|          0|            null|                 n

2624943

import timem
ini = time.time()
#dados.write.saveAsTable("covid19.DadosCovid19",partitionBy="municipio")
dados.write.mode("overwrite").partitionBy("municipio").saveAsTable("covid19.DadosCovid")
fim = time.time()
print ("\nTempo de Processamento: {0:4.2f} min".format((fim-ini)/60))

!hdfs dfs -ls /user/hive/warehouse/covid19.db

In [9]:
maxData = dados.select("regiao","data").filter(col("regiao") == "Brasil").agg(max("data")).collect()[0][0]


In [10]:
DadosPainel_1 = dados.filter(col("regiao") == "Brasil").\
            filter(col("data") == maxData).\
            select("Recuperadosnovos","emAcompanhamentoNovos") 
DadosPainel_2 = dados.filter(col("regiao") == "Brasil").\
            filter(col("data") == maxData).\
            select("casosAcumulado","casosNovos","incidenciaCasos")
DadosPainel_3 = dados.filter(col("regiao") == "Brasil").\
            filter(col("data") == maxData).\
            select("obitosAcumulado","obitosNovos","incidenciaObitos","letalidade")
DadosPainel_1.show()
DadosPainel_2.show()
DadosPainel_3.show()



+----------------+---------------------+
|Recuperadosnovos|emAcompanhamentoNovos|
+----------------+---------------------+
|        17262646|              1065477|
+----------------+---------------------+

+--------------+----------+---------------+
|casosAcumulado|casosNovos|incidenciaCasos|
+--------------+----------+---------------+
|      18855015|     62504|       8,972.29|
+--------------+----------+---------------+

+---------------+-----------+----------------+----------+
|obitosAcumulado|obitosNovos|incidenciaObitos|letalidade|
+---------------+-----------+----------------+----------+
|         526892|       1780|          250.73|      2.79|
+---------------+-----------+----------------+----------+




## 3 Salvar a primeira visualização como tabela Hive

In [None]:
DadosPainel_1.write.mode("overwrite").saveAsTable('DadosPainel_1')

In [None]:
!hdfs dfs -ls /user/hive/warehouse/covid19.db

## Salvar a segunda visualização com formato parquet e compressão snappy

In [None]:
DadosPainel_2.write.parquet("/user/covid19/DadosPainel_2", compression='snappy')

In [None]:
!hdfs dfs -ls /user/covid19/DadosPainel_2

## Salvar a terceira visualização em um tópico no Kafka

In [13]:
kafka_obitos = DadosPainel_3.select("obitosAcumulado")\
                       .withColumnRenamed("obitosAcumulado","value")\
                       .withColumn("value",col("value").cast(StringType()))

kafka_obitos.write.format('kafka')\
              .option("kafka.bootstrap.servers","kafka:9092")\
              .option("topic","topic-kafka-obitos")\
              .save()

kafka_novos_obitos = DadosPainel_3.select("obitosNovos")\
                       .withColumnRenamed("obitosNovos","value")\
                       .withColumn("value",col("value").cast(StringType()))

kafka_novos_obitos.write.format('kafka')\
              .option("kafka.bootstrap.servers","kafka:9092")\
              .option("topic","topic-kafka-NovosObitos")\
              .save()

kafka_incidenciaObitos = DadosPainel_3.select("incidenciaObitos")\
                       .withColumnRenamed("incidenciaObitos","value")\
                       .withColumn("value",col("value").cast(StringType()))

kafka_incidenciaObitos.write.format('kafka')\
              .option("kafka.bootstrap.servers","kafka:9092")\
              .option("topic","topic-kafka-incidenciaObitos")\
              .save()

kafka_letalidade = DadosPainel_3.select("letalidade")\
                       .withColumnRenamed("letalidade","value")\
                       .withColumn("value",col("value").cast(StringType()))

kafka_incidenciaObitos.write.format('kafka')\
              .option("kafka.bootstrap.servers","kafka:9092")\
              .option("topic","topic-kafka-letalidade")\
              .save()

## Salvar a visualização do exercício 6 em um tópico no Elastic

In [11]:
from pandas.io.json import json_normalize
import json
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SQLContext
from elasticsearch import Elasticsearch


In [19]:
URL = "https://desafiosemantix.kb.us-west1.gcp.cloud.es.io/"
es = Elasticsearch.put_script()
kafka_obitos.write \
           .format("org.elasticsearch.spark.sql") \
           .option("es.nodes.wan.only","true") \
           .option("es.port","9243") \
           .option("es.net.ssl","true") \
           .option("es.nodes", URL) \
           .option("es.net.http.auth.user", "elastic") \
           .option("es.net.http.auth.pass", "gPbGxozAcdl5rHBeXFdlnJdY") \
           .option("es.resource", "covid19/_doc") \
           .save()


Py4JJavaError: An error occurred while calling o330.save.
: java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
	at scala.util.Try.orElse(Try.scala:84)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
	... 12 more
