                   # Desafio Semantix - Campanha nacional de vacinação contra a Covid-19

In [1]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

#Setar o banco de dados Hive para a database Covid
spark.catalog.setCurrentDatabase("covid")


In [2]:
#Criar estrutura dos arquivos
columns_list = [
StructField("regiao", StringType()),
StructField("estado", StringType()),
StructField("municipio", StringType()),
StructField("coduf", StringType()),
StructField("codmun", StringType()),
StructField("codRegiaoSaude", StringType()),
StructField("nomeRegiaoSaude", StringType()),
StructField("data", StringType()),
StructField("semanaEpi", StringType()),
StructField("populacaoTCU2019", IntegerType()),
StructField("casosAcumulado", IntegerType()),
StructField("casosNovos", IntegerType()),
StructField("obitosAcumulado", IntegerType()),
StructField("obitosNovos", IntegerType()),
StructField("Recuperadosnovos", IntegerType()),
StructField("emAcompanhamentoNovos", IntegerType()),
StructField("interior/metropolitana", IntegerType())]

name_schema = StructType(columns_list)

#Importar os arquivos historico de Covid Brasil
covidDF = spark.read.csv("/user/lucas/data/covid",header='true',sep=';',schema=name_schema)


In [5]:
# Salvar como tabela hive particionada por municipio
covidDF.write.saveAsTable("covid",partitionBy="municipio")


In [7]:
#Conferir tabela criada 
!hdfs dfs -ls /user/hive/warehouse/covid.db/

Found 1 items
drwxr-xr-x   - root supergroup          0 2022-04-24 15:28 /user/hive/warehouse/covid.db/covid


In [9]:
# Primeira vizualização,foi separada em dois data frames para agrupar o ultimo registro de casos novos de status Em acompanhamento.
covidRecuperados = covidDF.agg(max("Recuperadosnovos").alias("Casos_Recuperados"),last("data").alias("data"))

covidAcompanhamentoNovos = covidDF.groupBy("data").agg(max("emAcompanhamentoNovos").alias("Casos novos")).sort(desc("data"))
covidAcompanhamento = covidAcompanhamentoNovos.agg(first("Casos novos").alias("Em_acompanhamento"),first("data").alias("data"))

covidRecuperadosAcompanhamentoJoin = covidRecuperados.join(covidAcompanhamento, covidRecuperados.data == covidAcompanhamento.data,"inner")
covidRecuperadosAcompanhamento = covidRecuperadosAcompanhamentoJoin.select("Casos_Recuperados","Em_acompanhamento")
covidRecuperadosAcompanhamento.show()

+-----------------+-----------------+
|Casos_Recuperados|Em_acompanhamento|
+-----------------+-----------------+
|         17262646|          1065477|
+-----------------+-----------------+



In [10]:
# Segunda vizualização,foi separada em dois data frames para agrupar o ultimo registro de casos novos de covid.
covidAcumuladoIncidencia = covidDF.agg(max("casosAcumulado").alias("Acumulado"),format_number(max("casosAcumulado")/max("populacaoTCU2019")*100000,1).alias("Incidencia"),last("data").alias("data"))

covidCasosNovos = covidDF.groupBy("data").agg(max("casosNovos").alias("Casos novos")).sort(desc("data"))
covidCasos = covidCasosNovos.agg(first("Casos novos").alias("Casos_novos"),first("data").alias("data"))

covidAcumuladoIncidenciaCasosJoin = covidAcumuladoIncidencia.join(covidCasos, covidAcumuladoIncidencia.data == covidCasos.data,"inner")
covidAcumuladoIncidenciaCasos = covidAcumuladoIncidenciaCasosJoin.select("Acumulado","Casos_novos","Incidencia")
covidAcumuladoIncidenciaCasos.show()

+---------+-----------+----------+
|Acumulado|Casos_novos|Incidencia|
+---------+-----------+----------+
| 18855015|      62504|   8,972.3|
+---------+-----------+----------+



In [11]:
# Terceira vizualização,foi separada em dois data frames para agrupar o ultimo registro de casos novos de obitos.

covidObitos = covidDF.agg(max("obitosAcumulado").alias("Obitos acumulados"),format_number(max("obitosAcumulado")/max("casosAcumulado")*100,1).alias("Letalidade"),format_number(max("obitosAcumulado")/max("populacaoTCU2019")*100000,1).alias("Mortalidade"),last("data").alias("data"))

covidObitosNovos = covidDF.groupBy("data").agg(max("obitosNovos").alias("Obitos novos")).sort(desc("data"))
covidCasosObitos = covidObitosNovos.agg(first("Obitos novos").alias("Casos novos"),first("data").alias("data"))

covidObitosCasosObitosML = covidObitos.join(covidCasosObitos, covidObitos.data == covidCasosObitos.data,"inner")
covidObitosCasosObitosMortalidade = covidObitosCasosObitosML.select("Obitos acumulados","Casos novos","Letalidade","Mortalidade")
covidObitosCasosObitosMortalidade.show()

+-----------------+-----------+----------+-----------+
|Obitos acumulados|Casos novos|Letalidade|Mortalidade|
+-----------------+-----------+----------+-----------+
|           526892|       1780|       2.8|      250.7|
+-----------------+-----------+----------+-----------+



In [12]:
# Salvar primeira vizualização como uma tabela Hive
covidRecuperadosAcompanhamento.write.saveAsTable("covid_recuperados")

In [14]:
# Salvar segunda vizualização no HDFS com formato parquet compressão snappy
covidAcumuladoIncidenciaCasos.write.parquet("/user/lucas/data/covid/casos_confirmados",compression="snappy",mode="overwrite")


In [15]:
# Salvar terceira vizualização em um tópico no Kafka, não funcionou 
covidObitosCasosObitosMortalidade.writeStream.format("kafka")\
.option("kafka.bootstrap.servers","kafka:9092").option("topic","topic-kvspark-covid")\
.option("checkpointLocation","hdfs://namenode:8020/user/lucas/kafka_stream_covid/check").start()



AnalysisException: "'writeStream' can be called only on streaming Dataset/DataFrame;"

In [16]:
# Carregar a primeira vizualização salva como tabela Hive
recuperados = spark.read.table("covid_recuperados")
recuperados.show()

+-----------------+-----------------+
|Casos_Recuperados|Em_acompanhamento|
+-----------------+-----------------+
|         17262646|          1065477|
+-----------------+-----------------+



In [17]:
# Carregar a segunda vizualização salva no HDFS com o formato Parquet
casosConfirmados = spark.read.parquet("/user/lucas/data/covid/casos_confirmados")
casosConfirmados.show()

+---------+-----------+----------+
|Acumulado|Casos_novos|Incidencia|
+---------+-----------+----------+
| 18855015|      62504|   8,972.3|
+---------+-----------+----------+

