In [85]:
import os
import sys
import json
import pyspark.sql.functions as f
from pyspark.sql.functions import when, col,round
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, TimestampType, DecimalType
from pyspark import SparkContext, SparkConf, SQLContext, HiveContext
from collections import namedtuple

In [2]:
!hdfs dfs -ls /user/jairomonassa/covid/excel

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2022-08-08 00:15 /user/jairomonassa/covid/excel/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-08-08 00:15 /user/jairomonassa/covid/excel/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-08-08 00:15 /user/jairomonassa/covid/excel/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-08-08 00:15 /user/jairomonassa/covid/excel/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [3]:
conf = SparkConf().set("spark.driver.memory", "8G")\
    .set("spark.executor.cores", "5")\
    .set("spark.executor.instances", "5")\
    .set("spark.executor.memory", "10G")\
    .set("spark.sql.files.maxPartitionBytes", "128MB")\
    .set("spark.sql.adaptive.enabled", "true")\
    .set("spark.sql.shuffle.partitions", "50")\
    .set("spark.yarn.executor.memoryOverhead", "2")\
    .set("spark.yarn.driver.memoryOverhead", "1G")

In [4]:
spark = SparkSession.builder \
        .master("yarn") \
        .enableHiveSupport() \
        .getOrCreate()

In [5]:
sql = SQLContext(sc)
hc = HiveContext(sc)

In [6]:
schema = StructType([ \
    StructField("regiao",StringType(),True), \
    StructField("estado",StringType(),True), \
    StructField("municipio",StringType(),True), \
    StructField("coduf", IntegerType(), True), \
    StructField("codmun", IntegerType(), True), \
    StructField("codRegiaoSaude", IntegerType(), True), \
    StructField("nomeRegiaoSaude",StringType(),True), \
    StructField("data",TimestampType(),True), \
    StructField("semanaEpi",IntegerType(),True), \
    StructField("populacaoTCU2019", IntegerType(), True), \
    StructField("casosAcumulado", DecimalType(), True), \
    StructField("casosNovos", IntegerType(), True) ,\
    StructField("obitosAcumulado", IntegerType(), True), \
    StructField("obitosNovos", IntegerType(), True), \
    StructField("Recuperadosnovos", IntegerType(), True), \
    StructField("emAcompanhamentoNovos", IntegerType(), True), \
    StructField("interior/metropolitana", IntegerType(), True), \
  ])

In [7]:
df = spark.read.option("header","true")\
 .option("schema",schema)\
 .option("mode","DROPMALFORMED")\
 .option("delimiter", ";")\
 .option("inferSchema", "true")\
 .csv("/user/jairomonassa/covid/excel/*.csv")

In [8]:
df.count()

2624943

In [9]:
df.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [10]:
arq1 = !hdfs dfs -cat /user/jairomonassa/covid/excel/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv | wc -l
arq2 = !hdfs dfs -cat /user/jairomonassa/covid/excel/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv | wc -l
arq3 = !hdfs dfs -cat /user/jairomonassa/covid/excel/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv | wc -l
arq4 = !hdfs dfs -cat /user/jairomonassa/covid/excel/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv | wc -l

In [11]:
#desconsiderar header
arq1 = int(arq1[0])-1
arq2 = int(arq2[0])-1
arq3 = int(arq3[0])-1
arq4 = int(arq4[0])-1
arq_count= arq1 + arq2 + arq3 + arq4

In [12]:
arq_count

2624943

In [13]:
df.show(2)

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2021-01-01 00:00:00|       53|       210147125|       7700578|     24605|         195411|        462|         6756284|               748883|                  null|
|Brasil|  null|     null|   76|  null|          null|           null

In [14]:
df= df.withColumnRenamed("codRegiaoSaude","codregiaosaude")\
	.withColumnRenamed("nomeRegiaoSaude","nomeregiaosaude")\
	.withColumnRenamed("semanaEpi","semanaepi")\
	.withColumnRenamed("populacaoTCU2019","populacaotcu2019")\
	.withColumnRenamed("casosAcumulado","casosacumulado")\
	.withColumnRenamed("casosNovos","casosnovos")\
	.withColumnRenamed("obitosAcumulado","obitosacumulado")\
	.withColumnRenamed("obitosNovos","obitosnovos")\
	.withColumnRenamed("emAcompanhamentoNovos","emacompanhamentonovos")\
	.withColumnRenamed("interior/metropolitana","interiormetropolitana")\
    .withColumnRenamed("Recuperadosnovos","recuperadosnovos")

In [15]:
df.write.mode("overwrite").partitionBy("municipio").saveAsTable("covid")

In [16]:
!hdfs dfs -ls /user/hive/warehouse/covid

Found 5299 items
-rw-r--r--   2 root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/_SUCCESS
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Abadia de Goiás
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Abadia dos Dourados
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Abadiânia
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Abaetetuba
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Abaeté
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Abaiara
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Abaré
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Abatiá
drwxr-xr-x   - root supergroup     

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Alvorada
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Alvorada D%27Oeste
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Alvorada de Minas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Alvorada do Gurguéia
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Alvorada do Norte
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Alvorada do Sul
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Além Paraíba
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Amajari
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Amambai
drwxr-

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Apicum-Açu
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Apiúna
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Apodi
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Aporá
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Aporé
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Apuarema
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Apucarana
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Apuiarés
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Apuí
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Areial
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Areias
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Areiópolis
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Arenápolis
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Arenópolis
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Argirita
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Aricanduva
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Arinos
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Aripuanã
drwxr-xr-x   - root supergroup          0 20

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Baixa Grande
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Baixa Grande do Ribeiro
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Baixio
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Baixo Guandu
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Baião
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Balbinos
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Baldim
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Baliza
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Balneário Arroio do Silva
drwxr-xr-x   - 

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bodó
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bofete
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Boituva
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bom Conselho
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bom Despacho
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bom Jardim
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bom Jardim da Serra
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bom Jardim de Goiás
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bom Jardim de Minas
drwxr-xr-x

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Buritis
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Buritizal
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Buritizeiro
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Butiá
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Buíque
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Bálsamo
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Caapiranga
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Caaporã
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Caarapó
drwxr-xr-x   - root supergroup          0 2022-08

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Carnaubais
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Carnaubal
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Carnaubeira da Penha
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Carnaíba
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Carnaúba dos Dantas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Carneirinho
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Carneiros
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Caroebe
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Carolina
drwxr-xr-x   - ro

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Charqueadas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Charrua
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Chaval
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Chavantes
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Chaves
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Chiador
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Chiapetta
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Chopinzinho
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Chorozinho
drwxr-xr-x   - root supergroup          0 

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruz Alta
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruz Machado
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruz das Almas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruz do Espírito Santo
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruzaltense
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruzeiro
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruzeiro da Fortaleza
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruzeiro do Iguaçu
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Cruze

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Escada
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Esmeralda
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Esmeraldas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Espera Feliz
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Esperantina
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Esperantinópolis
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Esperança
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Esperança Nova
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Esperança do Sul
drwxr-xr-x   - 

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Gararu
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Garibaldi
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Garopaba
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Garrafão do Norte
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Garruchos
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Garuva
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Garça
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Gaspar
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Gastão Vidigal
drwxr-xr-x   - root supergroup        

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inhacorá
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inhambupe
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inhangapi
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inhapi
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inhapim
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inhaúma
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inhuma
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inhumas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Inimutaba
drwxr-xr-x   - root supergroup          0 2022-08-

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=João Monlevade
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=João Neiva
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=João Pessoa
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=João Pinheiro
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=João Ramalho
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Juara
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Juarez Távora
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Juarina
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Juatuba
drwxr-xr-x   - root supergroup     

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Maravilhas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Maraã
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Maraú
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Marcação
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Marcelino Ramos
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Marcelino Vieira
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Marcelândia
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Marcionílio Souza
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Marco
drwxr-xr-x   - root supergroup     

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova Olinda
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova Olinda do Maranhão
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova Olinda do Norte
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova Olímpia
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova Palma
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova Palmeira
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova Petrópolis
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova Ponte
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Nova P

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Pavão
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Paço do Lumiar
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Peabiru
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Pederneiras
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Pedra
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Pedra Azul
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Pedra Bela
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Pedra Bonita
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Pedra Branca
drwxr-xr-x   - root supergroup  

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Restinga
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Restinga Sêca
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Retirolândia
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Riachinho
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Riacho Frio
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Riacho da Cruz
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Riacho das Almas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Riacho de Santana
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Riacho de Santo Antô

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Santa Rita do Tocantins
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Santa Rita do Trivelato
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Santa Rosa
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Santa Rosa da Serra
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Santa Rosa de Goiás
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Santa Rosa de Lima
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Santa Rosa de Viterbo
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Santa Rosa do Piauí
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Sarzedo
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Satuba
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Satubinha
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Saubara
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Saudade do Iguaçu
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Saudades
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Saúde
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Schroeder
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Seabra
drwxr-xr-x   - root supergroup          0 20

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Felipe D%27Oeste
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Fernando
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Fidélis
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Francisco
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Francisco de Assis
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Francisco de Assis do Piauí
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Francisco de Goiás
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Francisco de Itabapoana
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/h

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Raimundo do Doca Bezerra
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Roberto
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Romão
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Roque
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Roque de Minas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Roque do Canaã
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Salvador do Tocantins
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=São Sebastião
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid

drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Ubirajara
drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Ubiratã
drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Ubiretama
drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Ubá
drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Uchoa
drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Uibaí
drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Uiramutã
drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Uirapuru
drwxr-xr-x   - root supergroup          0 2022-08-09 11:54 /user/hive/warehouse/covid/municipio=Uiraúna
drwxr-xr-x   - root supergroup          0 2022-08-09 11:5

drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Água Nova
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Água Preta
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Água Santa
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Águas Belas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Águas Formosas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Águas Frias
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Águas Lindas de Goiás
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Águas Mornas
drwxr-xr-x   - root supergroup          0 2022-08-09 11:53 /user/hive/warehouse/covid/municipio=Águas Vermelhas
drwxr

In [17]:
hc = HiveContext(sc)

In [18]:
df_covid = hc.read.table("covid")

In [59]:
df_covid.withColumn('recuperadosnovos',when(col('recuperadosnovos').isNull(),0)\
                    .otherwise(col('recuperadosnovos')))\
                    .agg(sum(col("recuperadosnovos"))).show(20)

+---------------------+
|sum(recuperadosnovos)|
+---------------------+
|           2920055795|
+---------------------+



In [37]:
df_covid.select(col("municipio")).where(col("municipio").isNotNull()).show(3)

+------------+
|   municipio|
+------------+
|Campo Grande|
|Campo Grande|
|Campo Grande|
+------------+
only showing top 3 rows



In [46]:
from pyspark.sql.functions import sum, max,coalesce
from pyspark.sql.functions import rank
from pyspark.sql.window import Window

In [68]:
df_covid.select(col("recuperadosnovos"),col("municipio"),col("emacompanhamentonovos"),col("data"), col("regiao"),\
                col("estado"),col("nomeregiaosaude"))\
         .show(10)

+----------------+---------+---------------------+-------------------+------+------+---------------+
|recuperadosnovos|municipio|emacompanhamentonovos|               data|regiao|estado|nomeregiaosaude|
+----------------+---------+---------------------+-------------------+------+------+---------------+
|         6756284|     null|               748883|2021-01-01 00:00:00|Brasil|  null|           null|
|            null|     null|                 null|2021-01-01 00:00:00|   Sul|    PR|           null|
|         6769420|     null|               751260|2021-01-02 00:00:00|Brasil|  null|           null|
|            null|     null|                 null|2021-01-02 00:00:00|   Sul|    PR|           null|
|         6813008|     null|               724720|2021-01-03 00:00:00|Brasil|  null|           null|
|            null|     null|                 null|2021-01-03 00:00:00|   Sul|    PR|           null|
|         6875230|     null|               681961|2021-01-04 00:00:00|Brasil|  null|       

In [71]:
df_covid.where(col("regiao")=='Brasil')\
               .select(col('recuperadosnovos'),col('emacompanhamentonovos'))\
               .orderBy(col("data").desc())\
               .show(1)

+----------------+---------------------+
|recuperadosnovos|emacompanhamentonovos|
+----------------+---------------------+
|        17262646|              1065477|
+----------------+---------------------+
only showing top 1 row



In [76]:
df_covid.write.parquet("visao1")

In [74]:
df_covid.select(col("casosnovos"),col("municipio"),col("casosacumulado"),col("data"), col("regiao"),\
                col("estado"),col("nomeregiaosaude"))\
         .show(10)

+----------+---------+--------------+-------------------+------+------+---------------+
|casosnovos|municipio|casosacumulado|               data|regiao|estado|nomeregiaosaude|
+----------+---------+--------------+-------------------+------+------+---------------+
|     24605|     null|       7700578|2021-01-01 00:00:00|Brasil|  null|           null|
|        17|     null|          3171|2021-01-01 00:00:00|   Sul|    PR|           null|
|     15827|     null|       7716405|2021-01-02 00:00:00|Brasil|  null|           null|
|        10|     null|          3181|2021-01-02 00:00:00|   Sul|    PR|           null|
|     17341|     null|       7733746|2021-01-03 00:00:00|Brasil|  null|           null|
|        12|     null|          3193|2021-01-03 00:00:00|   Sul|    PR|           null|
|     20006|     null|       7753752|2021-01-04 00:00:00|Brasil|  null|           null|
|         9|     null|          3202|2021-01-04 00:00:00|   Sul|    PR|           null|
|     56648|     null|       781

In [75]:
df_covid.where(col("regiao")=='Brasil')\
               .select(col('casosnovos'),col('casosacumulado'))\
               .orderBy(col("data").desc())\
               .show(1)

+----------+--------------+
|casosnovos|casosacumulado|
+----------+--------------+
|     62504|      18855015|
+----------+--------------+
only showing top 1 row



In [89]:
df_brazil = df_covid.where(col("regiao")=='Brasil')\
               .orderBy(col("data").desc())\
               .limit(1)
               

In [101]:
df1 = df_brazil\
           .select(col('recuperadosnovos'),col('emacompanhamentonovos'))\
           .limit(1) 
df1.show()
df1.write.mode("overwrite").parquet("/user/jairomonassa/v1")

+----------------+---------------------+
|recuperadosnovos|emacompanhamentonovos|
+----------------+---------------------+
|        17262646|              1065477|
+----------------+---------------------+



In [102]:
!hdfs dfs -ls /user/jairomonassa/v1

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-08-09 20:02 /user/jairomonassa/v1/_SUCCESS
-rw-r--r--   2 root supergroup        688 2022-08-09 20:02 /user/jairomonassa/v1/part-00000-b03930c8-b754-47b6-bf1a-a7e7e68c3b30-c000.snappy.parquet


In [104]:
df1_t = spark.read.parquet("/user/jairomonassa/v1")

In [105]:
df1_t.show()

+----------------+---------------------+
|recuperadosnovos|emacompanhamentonovos|
+----------------+---------------------+
|        17262646|              1065477|
+----------------+---------------------+



In [106]:
df2 = df_brazil\
           .select(col('casosnovos'),col('casosacumulado'))\
           .limit(1) 
df2.show()  
df2.write.mode("overwrite").parquet("/user/jairomonassa/v2")           

+----------+--------------+
|casosnovos|casosacumulado|
+----------+--------------+
|     62504|      18855015|
+----------+--------------+



In [108]:
df2_t = spark.read.parquet("/user/jairomonassa/v2")
df2_t.show()

+----------+--------------+
|casosnovos|casosacumulado|
+----------+--------------+
|     62504|      18855015|
+----------+--------------+



In [115]:
df3 = df_brazil\
           .withColumn("letalidade",col("obitosacumulado")/col('casosacumulado')*100)\
           .withColumn("letalidade",round(col("letalidade"),2))\
           .withColumn("mortalidade",col("obitosacumulado")/col('populacaotcu2019')*100000)\
           .withColumn("mortalidade",round(col("mortalidade"),2))\
           .select(col('obitosacumulado'),col('obitosnovos'),col("letalidade"),col("mortalidade"))\
           .limit(1)
df3.show()

+---------------+-----------+----------+-----------+
|obitosacumulado|obitosnovos|letalidade|mortalidade|
+---------------+-----------+----------+-----------+
|         526892|       1780|      2.79|     250.73|
+---------------+-----------+----------+-----------+



In [116]:
df3.write.mode("overwrite").parquet("/user/jairomonassa/v3")

In [117]:
!hdfs dfs -ls /user/jairomonassa/v3

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-08-09 21:27 /user/jairomonassa/v3/_SUCCESS
-rw-r--r--   2 root supergroup       1214 2022-08-09 21:27 /user/jairomonassa/v3/part-00000-4cafa893-ce95-468b-bdf3-00c7df9da511-c000.snappy.parquet


In [118]:
df3_t = spark.read.parquet("/user/jairomonassa/v3")

In [119]:
df3_t.show()

+---------------+-----------+----------+-----------+
|obitosacumulado|obitosnovos|letalidade|mortalidade|
+---------------+-----------+----------+-----------+
|         526892|       1780|      2.79|     250.73|
+---------------+-----------+----------+-----------+



In [41]:
windowMunicipio  = Window.partitionBy("municipio").orderBy(col("data").desc())

In [65]:
max_df= df_covid\
                .withColumn("rank",rank().over(windowMunicipio))\
                .where(col("municipio").isNotNull())\
                .where(col("rank") == 1)\
                .select(col("recuperadosnovos"),col("municipio"),col("emacompanhamentonovos"))\
                .show(20)

+----------------+--------------------+---------------------+
|recuperadosnovos|           municipio|emacompanhamentonovos|
+----------------+--------------------+---------------------+
|            null|               Apodi|                 null|
|            null|            Araruama|                 null|
|            null|        Assis Brasil|                 null|
|            null|      Benedito Leite|                 null|
|            null|Boa Esperança do Sul|                 null|
|            null|        Borrazópolis|                 null|
|            null|           Buritizal|                 null|
|            null|          Carlópolis|                 null|
|            null|           Carrancas|                 null|
|            null|           Fronteira|                 null|
|            null|            Guidoval|                 null|
|            null|             Ibiporã|                 null|
|            null|          Itaguatins|                 null|
|       

In [62]:
max_df= df_covid\
                .where(col('recuperadosnovos').isNotNull())\
                .withColumn("rank",rank().over(windowMunicipio))\
                .where(col("municipio").isNotNull())\
                .where(col("rank") == 1)\
                .withColumn('recuperadosnovos',when(col('recuperadosnovos').isNull(),0)\
                .otherwise(col('recuperadosnovos')))\
                .select(col("recuperadosnovos"),col("municipio"))
max_df.agg(sum("recuperadosnovos")).show(1)

+---------------------+
|sum(recuperadosnovos)|
+---------------------+
|                 null|
+---------------------+



In [55]:
max_df.agg(sum("emacompanhamentonovos")).show(1)

+--------------------------+
|sum(emacompanhamentonovos)|
+--------------------------+
|                         0|
+--------------------------+



In [33]:
df_recuperados = df_covid\
                  .select(col("recuperadosnovos"))\
                  .where(col("recuperadosnovos").isNotNull())\
                  .show(2)


+----------------+
|recuperadosnovos|
+----------------+
|         6756284|
|         6769420|
+----------------+
only showing top 2 rows



In [None]:
#.select(col("recuperadosnovos"),col("emacompanhamentonovos"))\