# Projeto Final Big Data Engineer Semantix Academy

Autor: **Renato Holzlsauer Mattos Macedo**<br>
Contato: renato.holzlsauer@gmail.com

In [1]:
import os
import requests
import zipfile
from pyspark.sql.functions import *
from pyspark.sql.types import *

# 1. Download dos arquivos e envio para o HDFS

In [2]:
def download_unzip_file(directory, filename, url):
    path_to_file = os.path.join(directory, filename)

    if not os.path.exists(directory):
        os.mkdir(directory)

    if not os.path.exists(path_to_file):
        request = requests.get(url, allow_redirects=True)
        with open(path_to_file, 'wb') as f:
            f.write(request.content)
        with zipfile.ZipFile(path_to_file, 'r') as file:
            file.extractall(directory)
            
        print(f'Arquivo salvo com sucesso em: {directory}')

In [3]:
download_unzip_file('./data', 'covid-data.zip', 'https://github.com/Holzlsauer/semantix-covid/blob/main/HIST_PAINEL_COVIDBR.zip?raw=true')

In [9]:
# Criação do diretório local
!mkdir -p data

In [10]:
# Verificação dos arquivos extraidos
!ls -l ./data/

total 242808
-rw-r--r-- 1 root root 62492959 Apr 22 00:09 HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r-- 1 root root 76520681 Apr 22 00:09 HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r-- 1 root root 91120916 Apr 22 00:09 HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r-- 1 root root  3046774 Apr 22 00:09 HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv
-rw-r--r-- 1 root root 15431753 Apr 22 00:09 covid-data.zip


In [6]:
# Criação do diretório no hdfs
!hdfs dfs -mkdir -p /user/data/covid

In [5]:
# Envio para o hdfs
!hdfs dfs -put ./data/*.csv /user/data/covid

put: `/user/data/covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv': File exists
put: `/user/data/covid/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv': File exists
put: `/user/data/covid/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv': File exists
put: `/user/data/covid/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv': File exists


In [7]:
# Verificação do envio para o hdfs
!hdfs dfs -ls -R /user/data

drwxr-xr-x   - root supergroup          0 2022-04-22 00:12 /user/data/covid
-rw-r--r--   2 root supergroup   62492959 2022-04-22 00:12 /user/data/covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   2 root supergroup   76520681 2022-04-22 00:12 /user/data/covid/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   2 root supergroup   91120916 2022-04-22 00:12 /user/data/covid/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   2 root supergroup    3046774 2022-04-22 00:12 /user/data/covid/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


# 2. Otimização para tabela HIVE com particionamento por município

In [7]:
data = spark.read.csv('/user/data/covid', sep=';', header='true')
# data.printSchema()

In [8]:
# Ajustes da tipagem dos dados após leitura
data = data.withColumnRenamed('interior/metropolitana', 'interior_metropolitana')
data = data\
    .withColumn('regiao', col('regiao').cast(StringType()))\
    .withColumn('estado', col('estado').cast(StringType()))\
    .withColumn('municipio', col('municipio').cast(StringType()))\
    .withColumn('coduf', col('coduf').cast(IntegerType()))\
    .withColumn('codmun', col('codmun').cast(IntegerType()))\
    .withColumn('codRegiaoSaude', col('codRegiaoSaude').cast(IntegerType()))\
    .withColumn('nomeRegiaoSaude', col('nomeRegiaoSaude').cast(StringType()))\
    .withColumn('semanaEpi', col('semanaEpi').cast(IntegerType()))\
    .withColumn('populacaoTCU2019', col('populacaoTCU2019').cast(IntegerType()))\
    .withColumn('casosAcumulado', col('casosAcumulado').cast(IntegerType()))\
    .withColumn('casosNovos', col('casosNovos').cast(IntegerType()))\
    .withColumn('obitosAcumulado', col('obitosAcumulado').cast(IntegerType()))\
    .withColumn('obitosNovos', col('obitosNovos').cast(IntegerType()))\
    .withColumn('Recuperadosnovos', col('Recuperadosnovos').cast(IntegerType()))\
    .withColumn('emAcompanhamentoNovos', col('emAcompanhamentoNovos').cast(IntegerType()))\
    .withColumn('interior_metropolitana', col('interior_metropolitana').cast(StringType()))\
    .withColumn('data', col('data').cast(TimestampType()))

data.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior_metropolitana: string (nullable = true)



In [9]:
# Preenchendo valores nulos de municipio para particionamento por municipio
data = data.withColumn('municipio', rtrim(ltrim(lower(col('municipio'))))).fillna({'municipio': 'nulo'})

In [10]:
# Salvando tabela havi em formato orc
data.cache()
data.write.format('orc').mode('overwrite').partitionBy('municipio').saveAsTable('covid_data')

In [10]:
# Verificação da criação da tabela hive
!hdfs dfs -ls /user/hive/warehouse/covid_data

Found 5299 items
-rw-r--r--   2 root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/_SUCCESS
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=abadia de goiás
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=abadia dos dourados
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=abadiânia
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=abaetetuba
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=abaeté
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=abaiara
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=abaré
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municip

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=altair
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=altamira
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=altamira do maranhão
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=altamira do paraná
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=altaneira
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=alterosa
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=altinho
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=altinópolis
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/munici

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=antas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=antonina
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=antonina do norte
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=antônio almeida
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=antônio cardoso
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=antônio carlos
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=antônio dias
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=antônio gonçalves
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/war

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=arraias
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=arroio do meio
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=arroio do padre
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=arroio do sal
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=arroio do tigre
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=arroio dos ratos
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=arroio grande
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=arroio trinta
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=barra do chapéu
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=barra do choça
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=barra do corda
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=barra do garças
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=barra do guarita
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=barra do jacaré
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=barra do mendes
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=barra do ouro
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hi

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=bodó
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=bofete
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=boituva
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=bom conselho
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=bom despacho
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=bom jardim
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=bom jardim da serra
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=bom jardim de goiás
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_d

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=buíque
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=bálsamo
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caapiranga
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caaporã
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caarapó
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caatiba
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=cabaceiras
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=cabaceiras do paraguaçu
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipi

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=careiro da várzea
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=cariacica
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caridade
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caridade do piauí
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=carinhanha
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=carira
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=cariri do tocantins
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caririaçu
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/c

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caxambu do sul
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caxias
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caxias do sul
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caxingó
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caçador
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caçapava
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caçapava do sul
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=caçu
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=c

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=deputado irapuan pinheiro
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=derrubadas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=descalvado
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=descanso
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=descoberto
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=desterro
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=desterro de entre rios
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=desterro do melo
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hi

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=governador mangabeira
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=governador newton bello
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=governador nunes freire
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=governador valadares
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=gracho cardoso
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=grajaú
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=gramado
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=gramado dos loureiros
drwxr-xr-x   - root supergroup          0

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=jequitinhonha
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=jequiá da praia
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=jequié
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=jeremoabo
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=jericó
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=jeriquara
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=jerumenha
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=jerônimo monteiro
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/m

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=mendes pimentel
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=mendonça
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=mercedes
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=mercês
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=meridiano
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=meruoca
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=mesquita
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=messias
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=messias

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=nova ramada
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=nova redenção
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=nova resende
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=nova roma
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=nova roma do sul
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=nova rosalândia
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=nova russas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=nova santa bárbara
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/w

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=paranatinga
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=paranavaí
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=paranaíba
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=paranaíta
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=paranhos
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=paraná
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=paranã
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=paraopeba
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=parapuã

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=pouso alto
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=pouso novo
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=pouso redondo
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=poxoréu
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=poá
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=poço branco
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=poço dantas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=poço das antas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/munici

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=santa rita de cássia
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=santa rita de ibitipoca
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=santa rita de jacutinga
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=santa rita de minas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=santa rita do araguaia
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=santa rita do itueto
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=santa rita do novo destino
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=santa rita do pardo
drwx

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=são francisco do oeste
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=são francisco do pará
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=são francisco do piauí
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=são francisco do sul
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=são félix
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=são félix de balsas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=são félix de minas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=são félix do araguaia
drwxr-xr-x   - root supe

drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=tufilândia
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=tuiuti
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=tumiritinga
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=tunas
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=tunas do paraná
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=tuneiras do oeste
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=tuntum
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/municipio=tunápolis
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data/munici

In [11]:
# Eliminando leitura armazenada
del data

# 3. Criação das visualizações

In [2]:
covid_data = spark.read.table('covid_data')

In [13]:
covid_data.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior_metropolitana: string (nullable = true)
 |-- municipio: string (nullable = true)



In [3]:
# Preenchendo nulls com 0
covid_data = covid_data\
    .fillna({'populacaoTCU2019': 0})\
    .fillna({'casosAcumulado': 0})\
    .fillna({'casosNovos': 0})\
    .fillna({'obitosAcumulado': 0})\
    .fillna({'obitosNovos': 0})\
    .fillna({'Recuperadosnovos': 0})\
    .fillna({'emAcompanhamentoNovos': 0})

In [15]:
# Identificando último data de dados coletados
covid_data.select(max('data')).show()

+-------------------+
|          max(data)|
+-------------------+
|2021-07-06 00:00:00|
+-------------------+



### Primeira visualização

In [16]:
# Visualizando dados do último dia de dados coletados
first_view = covid_data.where(col('data') == '2021-07-06')\
    .select('Recuperadosnovos', 'emAcompanhamentoNovos')\
    .agg(sum('Recuperadosnovos').alias('casos_recuperados'), \
        (sum('emAcompanhamentoNovos')).alias('em_acompanhamento'))

first_view.show()

+-----------------+-----------------+
|casos_recuperados|em_acompanhamento|
+-----------------+-----------------+
|         17262646|          1065477|
+-----------------+-----------------+



### Segunda visualização

In [17]:
second_view = covid_data.where(col('data') == '2021-07-06')\
.agg(max('casosAcumulado').alias('acumulado'), \
     max('casosNovos').alias('casos_novos'), \
     regexp_replace((format_number(((sum('casosAcumulado') * 100000) / sum('populacaoTCU2019')), 1))\
        .cast(StringType()), '\,', '').alias('incidencia'))

second_view.show()

+---------+-----------+----------+
|acumulado|casos_novos|incidencia|
+---------+-----------+----------+
| 18855015|      62504|    8972.3|
+---------+-----------+----------+



### Terceira visualização

In [3]:
third_view = covid_data.where(col('data') == '2021-07-06')\
.agg(max('obitosAcumulado').alias('obitos_acumulados'), \
     max('obitosNovos').alias('casos_novos'), \
     regexp_replace((format_number(((sum('obitosAcumulado') * 100000) / sum('populacaoTCU2019')), 1))\
        .cast(StringType()), '\,', '').alias('mortalidade'), \
     regexp_replace((format_number(((max('obitosNovos') * 100) / max('casosNovos')), 1))\
        .cast(StringType()), '\,', '').alias('letalidade'))

third_view.show()

+-----------------+-----------+-----------+----------+
|obitos_acumulados|casos_novos|mortalidade|letalidade|
+-----------------+-----------+-----------+----------+
|           526892|       1780|      250.7|       2.8|
+-----------------+-----------+-----------+----------+



Os coef. de **incidência**, **mortalidade** e **letalidade** foram calculados seguindo as fórmula expostas na seção **Sobre** no site [https://covid.saude.gov.br/](https://covid.saude.gov.br/)

# 4. Salvar a primeira visualização como tabela Hive

In [19]:
first_view.createOrReplaceTempView('first_tmp_view')

spark.sql('create table if not exists first_view as select * from first_tmp_view');

In [19]:
!hdfs dfs -ls /user/hive/warehouse

Found 3 items
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data
drwxrwxr-x   - root supergroup          0 2022-04-28 01:36 /user/hive/warehouse/first_view
drwxr-xr-x   - root supergroup          0 2022-04-28 23:19 /user/hive/warehouse/second_view


In [20]:
spark.read.table('first_view').show()

+-----------------+-----------------+
|casos_recuperados|em_acompanhamento|
+-----------------+-----------------+
|         17262646|          1065477|
+-----------------+-----------------+



# 5. Salvar a segunda visualização no formato parquet com compressão snappy

In [22]:
second_view.write.parquet('/user/hive/warehouse/second_view', mode='overwrite', compression='snappy')

In [21]:
!hdfs dfs -ls /user/hive/warehouse

Found 3 items
drwxr-xr-x   - root supergroup          0 2022-04-26 23:29 /user/hive/warehouse/covid_data
drwxrwxr-x   - root supergroup          0 2022-04-28 01:36 /user/hive/warehouse/first_view
drwxr-xr-x   - root supergroup          0 2022-04-28 23:19 /user/hive/warehouse/second_view


In [22]:
spark.read.parquet('/user/hive/warehouse/second_view').show()

+---------+-----------+----------+
|acumulado|casos_novos|incidencia|
+---------+-----------+----------+
| 18855015|      62504|    8972.3|
+---------+-----------+----------+



# 6. Salvar a terceira visualização em um tópico kafka

Tópico criado no kafka:

    kafka-topics.sh --bootstrap-server kafka:9092 --topic third_view --create --partitions 1 --replication-factor 1

In [23]:
third_view_save = third_view.withColumn('value', struct(
    col('obitos_acumulados'),\
    col('casos_novos'),\
    col('mortalidade'),\
    col('letalidade')
).cast(StringType()))

third_view_save.printSchema()

root
 |-- obitos_acumulados: integer (nullable = true)
 |-- casos_novos: integer (nullable = true)
 |-- mortalidade: string (nullable = true)
 |-- letalidade: string (nullable = true)
 |-- value: string (nullable = false)



In [26]:
third_view_save.write.format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('topic','third_view').save()

# 7. Criar a visualização pelo Spark com os dados enviados para o HDFS

In [12]:
third_view.withColumn('value', struct(
    col('obitos_acumulados'),\
    col('casos_novos'),\
    col('mortalidade'),\
    col('letalidade')
)).show()

+-----------------+-----------+-----------+----------+--------------------+
|obitos_acumulados|casos_novos|mortalidade|letalidade|               value|
+-----------------+-----------+-----------+----------+--------------------+
|           526892|       1780|      250.7|       2.8|[526892, 1780, 25...|
+-----------------+-----------+-----------+----------+--------------------+



In [25]:
covid_data = spark.read.table('covid_data')
covid_data.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior_metropolitana: string (nullable = true)
 |-- municipio: string (nullable = true)



In [26]:
brasil = covid_data.where((col('regiao') == 'Brasil') & (col('data') == '2021-07-06'))\
.agg(
    max('regiao').alias('regiao'),
    max('casosAcumulado').alias('casos'),
    max('obitosAcumulado').alias('obitos'),
    regexp_replace((format_number(((sum('casosAcumulado') * 100000) / sum('populacaoTCU2019')), 1))\
        .cast(StringType()), '\,', '').alias('incidencia'),
    regexp_replace((format_number(((sum('obitosAcumulado') * 100000) / sum('populacaoTCU2019')), 1))\
        .cast(StringType()), '\,', '').alias('mortalidade')
)

brasil.show()

+------+--------+------+----------+-----------+
|regiao|   casos|obitos|incidencia|mortalidade|
+------+--------+------+----------+-----------+
|Brasil|18855015|526892|    8972.3|      250.7|
+------+--------+------+----------+-----------+



In [27]:
regioes = covid_data.where((col('regiao') != 'Brasil') & (col('data') == '2021-07-06'))\
.groupBy('regiao').agg(
    max('casosAcumulado').alias('casos'),
    max('obitosAcumulado').alias('obitos'),
    regexp_replace((format_number(((sum('casosAcumulado') * 100000) / sum('populacaoTCU2019')), 1))\
        .cast(StringType()), '\,', '').alias('incidencia'),
    regexp_replace((format_number(((sum('obitosAcumulado') * 100000) / sum('populacaoTCU2019')), 1))\
        .cast(StringType()), '\,', '').alias('mortalidade')
)

regioes.show()

+------------+-------+------+----------+-----------+
|      regiao|  casos|obitos|incidencia|mortalidade|
+------------+-------+------+----------+-----------+
|    Nordeste|1141612| 24428|    7807.3|      188.9|
|         Sul|1308643| 31867|   12046.4|      269.2|
|     Sudeste|3809222|130389|    8078.2|      277.6|
|Centro-Oeste| 686433| 19485|   11760.5|      301.9|
|       Norte| 557708| 15624|    9401.6|      237.9|
+------------+-------+------+----------+-----------+



In [28]:
# Realizando o append dos dataframes e mantendo os dados do Brasil no topo
view = brasil.union(regioes)
view.show()

+------------+--------+------+----------+-----------+
|      regiao|   casos|obitos|incidencia|mortalidade|
+------------+--------+------+----------+-----------+
|      Brasil|18855015|526892|    8972.3|      250.7|
|    Nordeste| 1141612| 24428|    7807.3|      188.9|
|         Sul| 1308643| 31867|   12046.4|      269.2|
|     Sudeste| 3809222|130389|    8078.2|      277.6|
|Centro-Oeste|  686433| 19485|   11760.5|      301.9|
|       Norte|  557708| 15624|    9401.6|      237.9|
+------------+--------+------+----------+-----------+

