# Imports para Spark

In [1]:
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *
from time import time

sc = spark.sparkContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
sqlContext.setConf("hive.exec.max.dynamic.partitions","60000")
sqlContext.setConf("hive.exec.max.dynamic.partitions.pernode","256");

# variaveis para controle do processamento
data_processamento = "2021-07-06"
data_referencia    = "2021-07-05"

# Enviar os dados para o HDFS

os arquivos .csv devem estar dentro da pasta input do projeto.

In [2]:
!hdfs dfs -mkdir /datasus

mkdir: `/datasus': File exists


In [None]:
!hdfs dfs -copyFromLocal /input/*.csv /datasus/

In [None]:
!hdfs dfs -ls /datasus

# Otimizar todos os dados do HDFS para uma tabela hive.

In [3]:
sqlContext.sql("drop table covid19Temp")

AnalysisException: 'Table or view not found: covid19Temp;'

In [2]:
dataframe_hdfs = spark.read.csv("/datasus/", header="true", sep=";", quote="\'", inferSchema=True)

In [3]:
dataframe_hdfs.createOrReplaceTempView("covid19Temp")

In [7]:
sqlContext.sql("drop table covid19")

DataFrame[]

In [6]:
sqlContext.sql("CREATE TABLE covid19 (regiao string,estado string,municipio string,coduf int,codRegiaoSaude int,nomeRegiaoSaude string,data string,semanaEpi int,populacaoTCU2019 int,casosAcumulado int,casosNovos int,obitosAcumulado int,obitosNovos int,Recuperadosnovos int,emAcompanhamentoNovos int,interiorMetropolitana string) PARTITIONED BY (codmun int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' STORED AS TEXTFILE")

DataFrame[]

### sqlContext.sql("insert into table covid19 partition(codmun) select regiao,estado,municipio,coduf,codregiaosaude,nomeregiaosaude,data,semanaepi,populacaotcu2019,casosacumulado,casosnovos,obitosacumulado,obitosnovos,recuperadosnovos,emacompanhamentonovos,`interior/metropolitana`,COALESCE(codmun, 0) codmun from covid19Temp WHERE COALESCE(codmun, 0) > 0 and data = '" + data_processamento + "' order by codmun ")

In [8]:
sqlContext.sql("create table covid19 as select * from covid19Temp where municipio is not null");

In [6]:
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|             covid19|      false|
| default|primeiravisualizacao|      false|
|        |         covid19temp|       true|
+--------+--------------------+-----------+



# Primeira Visualização

In [None]:
# Casos Recuperados, Em acompanhamento

x = datetime.now()

Casos_Recuperados_DF = \
    sqlContext.sql( \
        "SELECT \
            FORMAT_NUMBER(SUM(Recuperadosnovos),0) AS Casos_Recuperados, \
            FORMAT_NUMBER(SUM(emAcompanhamentoNovos),0) AS Em_Acompanhamento \
        FROM \
            covid19 \
        WHERE \
            data = '" + data_processamento + "'")

Casos_Recuperados_DF.show()

y = datetime.now()

print("TEMPO DE EXECUÇÃO -->", y-x)

# Segunda Visualização

In [None]:
# Casos confirmados

x = datetime.now()

Casos_Confirmados_DF = \
    spark.sql( \
        "SELECT \
            FORMAT_NUMBER((SUM(Recuperadosnovos) + SUM(emAcompanhamentoNovos) + SUM(obitosAcumulado)),0) AS Acumulado, \
            (\
                SELECT \
                    FORMAT_NUMBER(SUM(casosNovos),0) \
                FROM \
                    covid19 \
                WHERE \
                    data = '" + data_referencia + "' \
             ) AS Casos_Novos, \
            FORMAT_NUMBER(((SUM(Recuperadosnovos) + SUM(emAcompanhamentoNovos) + SUM(obitosAcumulado)) * 100000) / SUM(populacaoTCU2019), 0) as Incidencia \
        FROM \
            covid19 \
        WHERE \
            data = '" + data_processamento + "'")

Casos_Confirmados_DF.show()

y = datetime.now()
print("TEMPO DE EXECUÇÃO -->", y-x)

# Terceira Visualização

In [None]:
# Óbitos Confirmados

x = datetime.now()

Casos_Obitos_DF = \
    spark.sql( \
        "SELECT \
            FORMAT_NUMBER(((SUM(obitosAcumulado))),0) AS Obitos_acumulados, \
            (\
                SELECT \
                    FORMAT_NUMBER(SUM(obitosNovos),0) \
                FROM \
                    covid19 \
                WHERE \
                    data = '" + data_referencia + "' \
            ) AS Obitos_novos, \
            ROUND(((SUM(obitosAcumulado)) * 100) / (SUM(Recuperadosnovos) + SUM(emAcompanhamentoNovos) + (SUM(obitosAcumulado))),2) AS Letalidade, \
            ROUND(((((SUM(obitosAcumulado))) * 100000)) / SUM(populacaoTCU2019), 2) AS Mortalidade \
        FROM \
            covid19 \
        WHERE data = '" + data_processamento + "'")

Casos_Obitos_DF.show()

y = datetime.now()

print("TEMPO DE EXECUÇÃO -->", y-x)

# Salvar primeira visualização em uma tabela Hive

In [None]:
sqlContext.sql("CREATE TABLE primeiraVisualizacao as \
        SELECT \
            FORMAT_NUMBER(SUM(Recuperadosnovos),0) AS Casos_Recuperados, \
            FORMAT_NUMBER(SUM(emAcompanhamentoNovos),0) AS Em_Acompanhamento \
        FROM \
            covid19 \
        WHERE \
            data = '" + data_processamento + "'");

# Salvar segunda visualização em formato parquet e compressão snappy

In [None]:
Casos_Confirmados_DF.write.mode("overwrite").option("compression", "snappy").parquet("/user/datacovid/")

In [None]:
!hdfs dfs -ls /user/datacovid

# Salvar terceira visualização como um tópico no Kafka

In [None]:
Casos_Obitos_DF\
    .selectExpr("'terceira' AS key", "to_json(struct(*)) AS value") \
    .write\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("topic", "terceira-visualizacao")\
    .save()

In [None]:
prova = spark.read.\
    format("kafka").\
    option("kafka.bootstrap.servers", "kafka:9092").\
    option("subscribe", "terceira-visualizacao").load()

In [None]:
prova.show()