In [0]:
spark

## Extração dos dados e transformações

In [0]:
casos_df = spark.read.table('casos_covid_opt_1_csv')
casos_df.createOrReplaceTempView('casos_view')
casos_df.show(5)

In [0]:
from pyspark.sql.functions import *

In [0]:
recuperados_acompanhamento_view = casos_df.groupBy('regiao', 'uf', 'municipio') \
    .agg(
        sum('recuperados_novos').alias('recuperados'),
        last('acompanhamento_novos').alias('em_acompanhamento'),
        last('data').alias('atualizacao')
    ) \
    .select('regiao', 'recuperados', 'em_acompanhamento') \
    .orderBy(col('municipio').asc_nulls_first(), 'regiao')

recuperados_acompanhamento_view.where('recuperados != 0 OR em_acompanhamento != 0').show()

In [0]:
# Acumulado | Incidência | Casos Novos
# Incidência por 100K habitantes Método de cálculo: (casos confirmados * 100.000) / população
# Fontes: https://www.coronavirus.sc.gov.br/sobre-os-dados/

casos_confirmados_view = casos_df.groupBy('regiao', 'uf', 'municipio') \
    .agg(
        sum('casos_novos').alias('casos_acumulados'),
        last('casos_novos').alias('casos_novos'),
        last('data').alias('atualizacao'),
        last('populacao').alias('agg_pop')
    ) \
    .withColumn('incidencia', round((col('casos_acumulados') * 100_000 ) / col('agg_pop'), 1)) \
    .select('regiao', 'uf', 'municipio', 'casos_acumulados', 'incidencia', 'casos_novos', 'atualizacao') \
    .orderBy(col('municipio').asc_nulls_first(), 'regiao')

casos_confirmados_view.show()

In [0]:
# Obitos Acumulados | Casos Novos | Letalidade | mortalidade
# Letalidade - Método de cálculo: (número de óbitos x 100) / número de casos confirmados
# Mortalidade -  Método de cálculo: (óbitos * 100.000) / população  -> por 100K habitantes

obitos_confirmados_view = casos_df.groupBy('regiao', 'uf', 'municipio') \
    .agg(
        sum('obitos_novos').alias('obitos_acumulados'),
        last('obitos_novos').alias('casos_novos'),
        sum('casos_novos').alias('casos_confirmados'),
        last('populacao').alias('agg_pop'),
        last('data').alias('atualizacao')
    ) \
    .withColumn('letalidade', round(((col('obitos_acumulados') * 100) / col('casos_confirmados')), 1)) \
    .withColumn('mortalidade', round((col('obitos_acumulados') * 100_000) / col('agg_pop'), 1)) \
    .select('regiao', 'uf', 'municipio', 'obitos_acumulados', 'casos_novos', 'letalidade', 'mortalidade', 'atualizacao') \
    .orderBy(col('municipio').asc_nulls_first(), 'regiao')

obitos_confirmados_view.show()

## Carregamento dos dados

In [0]:
# Salvando a Primeira view como tabela hive:
recuperados_acompanhamento_view.write.saveAsTable('recuperados_acompanhamento')

In [0]:
# Salvando a Segunda view em formato parquet e compressao snappy
casos_confirmados_view.write.option('compression', 'snappy').parquet('/user/root/data')

In [0]:
# # Salvando a terceira View como um tópico Kafka
BROKERS_SERVERS = 'localhost:9092'
TOPICS = 'obitos_view'

obitos_confirmados_view.selectExpr('to_json(struct(*)) AS value') \
    .write \
    .format('kafka') \
    .option('kafka.bootstrap.servers', BROKERS_SERVERS)
    .option('topic', TOPICS) \
    .save()
    

## Main View

In [0]:
# Casos | Óbitos | Indicencia 100k/hab | Mortalidade | Atualização
# Incidência por 100K habitantes Método de cálculo: (casos confirmados * 100.000) / população
# Mortalidade -  Método de cálculo: (óbitos * 100.000) / população  -> por 100K habitantes

casos = casos_df.groupBy('regiao', 'municipio') \
    .agg(
        sum('casos_novos').alias('casos'),
        sum('obitos_novos').alias('obitos'),
        last('data').alias('atualizacao'),
        last('populacao').alias('agg_pop'),
    ) \
    .withColumn('incidencia', round((col('casos') * 100_000) / col('agg_pop'), 1)) \
    .withColumn('mortalidade', round((col('obitos') * 100_000) / col('agg_pop'), 1)) \
    .select('regiao', 'municipio', 'casos', 'obitos', 'incidencia', 'mortalidade', 'atualizacao') \
    .orderBy(col('municipio').asc_nulls_first(), 'regiao')

casos.show()

## Sending Data to ElasticSearch

In [0]:
casos.write \
    .format('org.elasticsearch.spark.sql') \
    .option('es.port', ELASTIC_PORT) \
    .option('es.nodes', ELASTIC_ENDPOINT) \
    .mode('append') \
    .save('casos/_doc')