In [None]:
# Instalando os pacotes necessários para descompactar arquivo .rar
!pip install unrar
!pip install rarfile

In [None]:
import pandas as pd
import numpy as np
import requests
from io import BytesIO
import os
import rarfile
from pyspark.sql.types import StructType
from pyspark.sql import SQLContext, DataFrameWriter
from pyspark.sql.functions import *
from pyspark.sql import Row
from datetime import timedelta

In [None]:
# Criar um diretório para armazenar o conteúdo do arquivo
os.makedirs("./covid_bash", exist_ok=True)

In [4]:
# Define a url
url = "https://mobileapps.saude.gov.br/esus-vepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar"

# Faz o download do conteúdo
filebytes = (
    BytesIO(
        requests
        .get(url)
        .content
    )
)

# Extrair o conteúdo do rarfile
myrar = rarfile.RarFile(filebytes)
myrar.extractall("./covid_bash")
myrar.close()

In [None]:
!hdfs dfs -mkdir /user/fabio/data/covid_bash
!hdfs dfs -mkdir /user/fabio/data/covid_bash/raw
!hdfs dfs -mkdir /user/fabio/data/covid_bash/stage
!hdfs dfs -mkdir /user/fabio/data/covid_bash/refined
!hdfs dfs -put ./covid_bash/* /user/fabio/data/covid_bash/raw

In [5]:
schema = StructType()
(
    schema
    .add("regiao", "string", True)
    .add("estado", "string", True)
    .add("municipio", "string", True)
    .add("coduf", "integer", True)
    .add("codmun", "integer", True)
    .add("codRegiaoSaude", "integer", True)
    .add("nomeRegiaoSaude", "string", True)
    .add("data", "date", True)
    .add("semanaEpi", "integer", True)
    .add("populacaoTCU2019", "integer", True)
    .add("casosAcumulado", "integer", True)
    .add("casosNovos", "integer", True)
    .add("obitosAcumulado", "integer", True)
    .add("obitosNovos", "integer", True)
    .add("Recuperadosnovos", "integer", True)
    .add("emAcompanhamentoNovos", "integer", True)
    .add("interior/metropolitana", "integer", True)
)

StructType(List(StructField(regiao,StringType,true),StructField(estado,StringType,true),StructField(municipio,StringType,true),StructField(coduf,IntegerType,true),StructField(codmun,IntegerType,true),StructField(codRegiaoSaude,IntegerType,true),StructField(nomeRegiaoSaude,StringType,true),StructField(data,DateType,true),StructField(semanaEpi,IntegerType,true),StructField(populacaoTCU2019,IntegerType,true),StructField(casosAcumulado,IntegerType,true),StructField(casosNovos,IntegerType,true),StructField(obitosAcumulado,IntegerType,true),StructField(obitosNovos,IntegerType,true),StructField(Recuperadosnovos,IntegerType,true),StructField(emAcompanhamentoNovos,IntegerType,true),StructField(interior/metropolitana,IntegerType,true)))

In [8]:
# Lendo dados
df = (
    spark
    .read
    .csv("/user/fabio/data/covid_bash/raw", schema=schema, sep=";", header=True)
)

In [9]:
# Limpeza dos dados
df = (
    df
    .filter(col("estado").isNotNull() & col("municipio").isNotNull())
    .na.fill(0, ["casosAcumulado"])
    .na.fill(0, ["casosNovos"])
    .na.fill(0, ["obitosAcumulado"])
    .na.fill(0, ["obitosNovos"])
    .na.fill(0, ["Recuperadosnovos"])
    .na.fill(0, ["emAcompanhamentoNovos"])
)

In [10]:
# Enviando os dados para o HDFS
(
    df
    .write
    .mode("overwrite")
    .parquet("/user/fabio/data/covid_bash/stage")
)

In [11]:
# Lendo os dados do HDFS
df = (
    spark
    .read
    .parquet("/user/fabio/data/covid_bash/stage")
)

In [None]:
# Otimizando os dados do HDFS para uma tabela Hive particionada por Município
(
    df
    .write
    .mode("overwrite")
    .saveAsTable("covid_bash_stage", partitionBy="municipio")
)

In [12]:
# Obtendo a última data de registro de dados
last_date = df.select("data").agg(max(df.data)).collect()[0][0]

In [13]:
# Contabilizando dados
recuperados_acumulado_df = (
    df
    .select("casosAcumulado")
    .where(f"data='{last_date}'")
    .agg(sum(df.casosAcumulado).alias("casosAcumulado"))
).collect()[0][0]
obitos_acumulados_df = (
    df
    .select("obitosAcumulado")
    .where(f"data='{last_date}'")
    .agg(sum(df.obitosAcumulado).alias("obitosAcumulado"))
).collect()[0][0]
acompanhamento_df = (
    df
    .select("casosNovos")
    .where(f"data>'{last_date - timedelta(14)}'")
    .agg(sum(df.casosNovos).alias("acompanhamento"))
).collect()[0][0]

In [14]:
# Criando a primeira visualização
data = [Row(recuperados=recuperados_acumulado_df-obitos_acumulados_df, acompanhamento=acompanhamento_df)]
recuperados_df = spark.createDataFrame(data)
recuperados_df.show()

+--------------+-----------+
|acompanhamento|recuperados|
+--------------+-----------+
|        799667|   18288779|
+--------------+-----------+



In [15]:
# Realizando filtro e agregação dos dados
casos_df = (
    df
    .select("casosAcumulado", "casosNovos", "populacaoTCU2019")
    .where(f"data='{last_date}'")
    .agg(sum(df.casosAcumulado).alias("casosAcumulado"), sum(df.casosNovos).alias("casosNovos"), sum(df.populacaoTCU2019).alias("populacaoTCU2019"))
)

In [16]:
# Criando a segunda vizualização
casos_df = (
    casos_df
    .withColumn("incidencia", round(casos_df.casosAcumulado*100000/casos_df.populacaoTCU2019, 1))
    .select("casosAcumulado", "casosNovos", "incidencia")
)
casos_df.show()

+--------------+----------+----------+
|casosAcumulado|casosNovos|incidencia|
+--------------+----------+----------+
|      18814678|     62623|    8953.1|
+--------------+----------+----------+



In [17]:
# Realizando filtro e agregação dos dados
obitos_df = (
    df
    .select("casosAcumulado", "obitosAcumulado", "obitosNovos", "populacaoTCU2019")
    .where(f"data='{last_date}'")
    .agg(sum(df.casosAcumulado).alias("casosAcumulado"), sum(df.obitosAcumulado).alias("obitosAcumulado"), sum(df.obitosNovos).alias("obitosNovos"), sum(df.populacaoTCU2019).alias("populacaoTCU2019"))
)

In [18]:
# Criando a terceira vizualização
obitos_df = (
    obitos_df
    .withColumn("letalidade", round(obitos_df.obitosAcumulado*100/obitos_df.casosAcumulado, 1))
    .withColumn("mortalidade", round(obitos_df.obitosAcumulado*100000/obitos_df.populacaoTCU2019, 1))
    .select("obitosAcumulado", "obitosNovos", "letalidade", "mortalidade")
)
obitos_df.show()

+---------------+-----------+----------+-----------+
|obitosAcumulado|obitosNovos|letalidade|mortalidade|
+---------------+-----------+----------+-----------+
|         525899|       1776|       2.8|      250.3|
+---------------+-----------+----------+-----------+



In [19]:
# Salvando a primeira vizualização como tabela Hive
(
    recuperados_df
    .write
    .mode("overwrite")
    .saveAsTable("covid_bash_recuperados")
)

In [20]:
# Salvando a segunda vizualização no formato parquet e compressão snappy
(
    casos_df
    .write
    .mode("overwrite")
    .parquet("/user/fabio/data/covid_bash/refined", compression="snappy")
)

In [21]:
# Salvando a terceira vizualização em um tópico kafka
(
    obitos_df
    .selectExpr("'obitosAcumulado' as key", "CAST(obitosAcumulado as string) as value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "topic-covid-obitos")
    .save()
)

In [22]:
# Salvando a terceira vizualização em um tópico kafka
(
    obitos_df
    .selectExpr("'obitosNovos' as key", "CAST(obitosNovos as string) as value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "topic-covid-obitos")
    .save()
)

In [23]:
# Salvando a terceira vizualização em um tópico kafka
(
    obitos_df
    .selectExpr("'letalidade' as key", "CAST(letalidade as string) as value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "topic-covid-obitos")
    .save()
)

In [24]:
# Salvando a terceira vizualização em um tópico kafka
(
    obitos_df
    .selectExpr("'mortalidade' as key", "CAST(mortalidade as string) as value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "topic-covid-obitos")
    .save()
)

In [25]:
# Realizando filtro e agregação dos dados por região
regioes_df = (
    df
    .select("regiao", "casosAcumulado", "obitosAcumulado", "populacaoTCU2019", "data")
    .where(f"data='{last_date}'")
    .groupBy("regiao", "data")
    .agg(sum(df.casosAcumulado).alias("casos"), sum(df.obitosAcumulado).alias("obitos"), sum(df.populacaoTCU2019).alias("populacaoTCU2019"))
)

In [26]:
# Criando visualização dos dados do HDFS por regiões
regioes_df = (
    regioes_df
    .withColumn("incidencia", round(regioes_df.casos*100000/regioes_df.populacaoTCU2019, 1))
    .withColumn("mortalidade", round(regioes_df.obitos*100000/regioes_df.populacaoTCU2019, 1))
    .select("regiao", "casos", "obitos", "incidencia", "mortalidade", "data")
    .orderBy("regiao")
)
regioes_df.show()

+------------+-------+------+----------+-----------+----------+
|      regiao|  casos|obitos|incidencia|mortalidade|      data|
+------------+-------+------+----------+-----------+----------+
|Centro-Oeste|1916619| 49207|   11760.5|      301.9|2021-07-06|
|    Nordeste|4434293|107560|    7769.7|      188.5|2021-07-06|
|       Norte|1730197| 43830|    9387.4|      237.8|2021-07-06|
|     Sudeste|7129450|244771|    8067.6|      277.0|2021-07-06|
|         Sul|3604119| 80531|   12023.4|      268.7|2021-07-06|
+------------+-------+------+----------+-----------+----------+

