# Projeto Final de Spark - Nível Básico

## Otimizar todos os dados do hdfs para uma tabela Hive particionada por município

In [35]:
#Importe as bibliotecas necessárias
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql import functions as functions
import org.apache.spark.sql.SparkSession

ModuleNotFoundError: No module named 'org'

In [2]:
#Acesse os arquivos pelo HDFS
!hdfs dfs -ls "hdfs:///user/cicero/projeto-final-spark/"

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2021-11-08 22:43 hdfs:///user/cicero/projeto-final-spark/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2021-11-08 22:43 hdfs:///user/cicero/projeto-final-spark/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2021-11-08 22:43 hdfs:///user/cicero/projeto-final-spark/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2021-11-08 22:43 hdfs:///user/cicero/projeto-final-spark/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [4]:
#Acessar os arquivos e mostrar a primeira linha 
rdd = sc.textFile("hdfs:///user/cicero/projeto-final-spark/")
rdd.first()

'regiao;estado;municipio;coduf;codmun;codRegiaoSaude;nomeRegiaoSaude;data;semanaEpi;populacaoTCU2019;casosAcumulado;casosNovos;obitosAcumulado;obitosNovos;Recuperadosnovos;emAcompanhamentoNovos;interior/metropolitana'

In [5]:
#Mostrar as primieras linhas para entender os dados
rdd.take(100)

['regiao;estado;municipio;coduf;codmun;codRegiaoSaude;nomeRegiaoSaude;data;semanaEpi;populacaoTCU2019;casosAcumulado;casosNovos;obitosAcumulado;obitosNovos;Recuperadosnovos;emAcompanhamentoNovos;interior/metropolitana',
 'Brasil;;;76;;;;2020-02-25;9;210147125;0;0;0;0;;;',
 'Brasil;;;76;;;;2020-02-26;9;210147125;1;1;0;0;;;',
 'Brasil;;;76;;;;2020-02-27;9;210147125;1;0;0;0;;;',
 'Brasil;;;76;;;;2020-02-28;9;210147125;1;0;0;0;;;',
 'Brasil;;;76;;;;2020-02-29;9;210147125;2;1;0;0;;;',
 'Brasil;;;76;;;;2020-03-01;10;210147125;2;0;0;0;;;',
 'Brasil;;;76;;;;2020-03-02;10;210147125;2;0;0;0;;;',
 'Brasil;;;76;;;;2020-03-03;10;210147125;2;0;0;0;;;',
 'Brasil;;;76;;;;2020-03-04;10;210147125;3;1;0;0;;;',
 'Brasil;;;76;;;;2020-03-05;10;210147125;7;4;0;0;;;',
 'Brasil;;;76;;;;2020-03-06;10;210147125;13;6;0;0;;;',
 'Brasil;;;76;;;;2020-03-07;10;210147125;19;6;0;0;;;',
 'Brasil;;;76;;;;2020-03-08;11;210147125;25;6;0;0;;;',
 'Brasil;;;76;;;;2020-03-09;11;210147125;25;0;0;0;;;',
 'Brasil;;;76;;;;2020-03-

In [6]:
'''
#Criar um Data Frame com Schema organizado.
colunas_painel_covidbr = [ StructField("regiao", StringType()),
                          StructField("estado", StringType()),
                          StructField("municipio", StringType()),
                          StructField("coduf", StringType()),
                          StructField("codmun", StringType()),
                          StructField("codRegiaoSaude", StringType()),
                          StructField("nomeRegiaoSaude", StringType()),
                          StructField("data", DateType()),
                          StructField("semanaEpi", StringType()),
                          StructField("populacaoTCU2019", IntegerType()),
                          StructField("casosAcumulado", IntegerType()),
                          StructField("casosNovos", IntegerType()),
                          StructField("obitosAcumulado", IntegerType()),
                          StructField("obitosNovos", IntegerType()),
                          StructField("Recuperadosnovos", IntegerType()),
                          StructField("emAcompanhamentoNovos", IntegerType()),
                          StructField("interior/metropolitana", IntegerType())]

schema = StructType(colunas_painel_covidbr)
df_painel_covidbr = spark.read.option("header","true").option("delimiter",";").csv("hdfs:///user/cicero/projeto-final-spark/")
'''

#Utilizando o option("inferSchema","true") para que automaticamente o spark identifique os tipos de dados das colunas
df_painel_covidbr = spark.read.option("sep",";").option("header","true").option("inferSchema","true").csv("hdfs:///user/cicero/projeto-final-spark/")

In [10]:
#Visualize o schema
df_painel_covidbr.printSchema()

#Visualizar a primeira linha
#df_painel_covidbr.head()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [11]:
#Visualizar dados na vertical para melhor entendimento
df_painel_covidbr.show(3, vertical=True)

-RECORD 0-------------------------------------
 regiao                 | Brasil              
 estado                 | null                
 municipio              | null                
 coduf                  | 76                  
 codmun                 | null                
 codRegiaoSaude         | null                
 nomeRegiaoSaude        | null                
 data                   | 2020-02-25 00:00:00 
 semanaEpi              | 9                   
 populacaoTCU2019       | 210147125           
 casosAcumulado         | 0                   
 casosNovos             | 0                   
 obitosAcumulado        | 0                   
 obitosNovos            | 0                   
 Recuperadosnovos       | null                
 emAcompanhamentoNovos  | null                
 interior/metropolitana | null                
-RECORD 1-------------------------------------
 regiao                 | Brasil              
 estado                 | null                
 municipio   

In [15]:
# Ajustando os dados e removendo informações desnecessárias que são diárias. Ex: Hora
df_painel_covidbr_limpo = df_painel_covidbr.withColumn('data', functions.from_unixtime(functions.unix_timestamp(df_painel_covidbr.data), 'yyyy-MM-dd'))

In [16]:
#Mostrando o novo dataframe com o campo data ajustado
df_painel_covidbr_limpo.show(3, vertical=True)

-RECORD 0----------------------------
 regiao                 | Brasil     
 estado                 | null       
 municipio              | null       
 coduf                  | 76         
 codmun                 | null       
 codRegiaoSaude         | null       
 nomeRegiaoSaude        | null       
 data                   | 2020-02-25 
 semanaEpi              | 9          
 populacaoTCU2019       | 210147125  
 casosAcumulado         | 0          
 casosNovos             | 0          
 obitosAcumulado        | 0          
 obitosNovos            | 0          
 Recuperadosnovos       | null       
 emAcompanhamentoNovos  | null       
 interior/metropolitana | null       
-RECORD 1----------------------------
 regiao                 | Brasil     
 estado                 | null       
 municipio              | null       
 coduf                  | 76         
 codmun                 | null       
 codRegiaoSaude         | null       
 nomeRegiaoSaude        | null       
 data       

In [None]:
#Inserindo os dados no Apache Hive e separando os dados por municípios
# write.mode - Sobrescreve o diretório de saída. 
# saveAsTable - Por padrão, se não especificado, ele salva como arquivo Hive no caminho /user/hive/warehouse/
# partitionBy - É usado para particionar o grande conjunto de dados (DataFrame)...
# (...) em arquivos menores com base em uma ou várias colunas durante a gravação no disco.

df_painel_covidbr_limpo.write.mode('overwrite').partitionBy('municipio').saveAsTable('covidbr.municipio')

In [None]:
#Verificando se a tabela Hive está no diretório
!hdfs dfs -ls /user/hive/warehouse/covidbr

In [None]:
#Visualizando a tabela Hive

## Criar as 3 visualizações pelo Spark com os dados enviados para o HDFS 

In [None]:
#Visualização 1 - Casos Recuperados

#Visualização 2 - Casos confirmados

#Visualização 3 - Óbitos Confirmados


## Salvar a primeira visualização como tabela Hive

## Salvar a segunda visualização com formato parquet e compressão snappy

## Salvar a terceira visualização em um tópico no Kafka

## Criar a visualização pelo Spark com os dados enviados para o HDFS