In [117]:
from pyspark.sql.functions import *
from pyspark.sql.types import*

#  Otimizar todos os dados do hdfs para uma tabela Hive particionada por município.

In [118]:
!hdfs dfs -ls /user/carlos/projeto/

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2021-07-10 01:28 /user/carlos/projeto/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2021-07-10 01:28 /user/carlos/projeto/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2021-07-10 01:28 /user/carlos/projeto/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2021-07-10 01:28 /user/carlos/projeto/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [119]:
_schema =[
    StructField("regiao", StringType()), 
    StructField("estado", StringType()),
    StructField("municipio", StringType()), 
    StructField("coduf", IntegerType()),
    StructField("codmun", IntegerType()), 
    StructField("codRegiaoSaude", IntegerType()), 
    StructField("nomeRegiaoSaude", StringType()), 
    StructField("data", DateType()), 
    StructField("semanaEpi", IntegerType()),
    StructField("populacaoTCU2019", IntegerType()), 
    StructField("casosAcumulado", IntegerType()), 
    StructField("casosNovos", IntegerType()), 
    StructField("obitosAcumulado", IntegerType()), 
    StructField("obitosNovos", IntegerType()), 
    StructField("Recuperadosnovos", IntegerType()), 
    StructField("emAcompanhamentoNovos", IntegerType()), 
    StructField("nterior_metropolitana", IntegerType())
    
]

str_type = StructType(_schema)

In [121]:
arquivos  = spark.read.csv('/user/carlos/projeto/*.csv', schema=str_type, sep=";")

In [31]:
#arquivos.write.parquet().partitionBy("codmun").mode("overwrite").saveAsTable("/user/carlos/hist_covid")
arquivos.write.parquet("/user/carlos/hist_covid",partitionBy='codmun', compression='snappy',mode='overwrite')

# 3. Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:

# Visualização 1

In [34]:
painel = spark.read.parquet("/user/carlos/hist_covid")
painel.printSchema()

In [43]:
max_data = painel.agg(max('data').alias('max_data'))
painel1 = painel.join(max_data, painel.data == max_data.max_data, 'inner').filter(painel.regiao == "Brasil").select("Recuperadosnovos","emAcompanhamentoNovos")
painel1.show()

+----------------+---------------------+
|Recuperadosnovos|emAcompanhamentoNovos|
+----------------+---------------------+
|        17262646|              1065477|
+----------------+---------------------+



# Visualização 2

In [88]:
painel2 = painel.join(max_data, painel.data == max_data.max_data, 'inner')\
.filter(painel.regiao == "Brasil")\
.withColumn('Incidentes', round((col("casosAcumulado")  / col("populacaoTCU2019"))*100000.00,2))

painel2_vf = painel2.select("casosAcumulado", "casosNovos", "Incidentes")

In [87]:

painel2_vf.show()

+--------------+----------+----------+
|casosAcumulado|casosNovos|Incidentes|
+--------------+----------+----------+
|      18855015|     62504|   8972.29|
+--------------+----------+----------+



# Visualizão 3

In [101]:
painel3 = painel.join(max_data, painel.data == max_data.max_data, 'inner')\
.filter(painel.regiao == "Brasil")\
.withColumn('mortalidade', round((col("obitosAcumulado")  / col("populacaoTCU2019"))*100000.00,2))\
.withColumn('letalidade', round((col("obitosAcumulado")  / col("casosAcumulado"))*100.00,1))

painel3_vf = painel3.select("obitosAcumulado", "obitosNovos", "mortalidade", "letalidade")

In [102]:
painel3_vf.show()

+---------------+-----------+-----------+----------+
|obitosAcumulado|obitosNovos|mortalidade|letalidade|
+---------------+-----------+-----------+----------+
|         526892|       1780|     250.73|       2.8|
+---------------+-----------+-----------+----------+



#  Salvar a primeira visualização como tabela Hive

In [105]:
painel1.write.parquet("/user/carlos/covid_painel1")

# 5.Salvar a segunda visualização com formato parquet e compressão snappy

In [116]:
painel2_vf.write.parquet("/user/carlos/covid_painel2", compression='snappy')

+--------------+----------+----------+
|casosAcumulado|casosNovos|Incidentes|
+--------------+----------+----------+
|      18855015|     62504|   8972.29|
+--------------+----------+----------+



# 6. Salvar a terceira visualização em um tópico no Kafka

In [203]:
# kafka-topics.sh --bootstrap-server localhost:9092 --topic 'painel3-covid-output' --create --partitions 2 --replication-factor 1
# kafka-console-producer.sh --broker-list kafka:9092 --topic 'painel3-covid-output'
# kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 'painel3-covid-output'


painel3_vf.write.mode("overwrite").json("/user/carlos/painel3_covid_c")

socketDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("subscribe", "painel3-covid-kafka")\
    .load()


userSchema = StructType().add("obitosNovos", "long")\
                         .add("obitosAcumulado", "long")\
                         .add("mortalidade", "double")\
                         .add("letalidade", "double")
painel3_rds = spark \
    .readStream \
    .schema(userSchema) \
    .json("/user/carlos/painel3_covid_c")  




In [206]:
painel3_kafka  = painel3_rds.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("topic", "painel3-covid-kafka")\
.option("checkpointLocation", "/user/carlos/painel3_covid_kafka").start()

In [207]:
!hdfs dfs -ls /user/carlos/painel3_covid_kafka

Found 4 items
drwxr-xr-x   - root supergroup          0 2021-07-11 20:45 /user/carlos/painel3_covid_kafka/commits
-rw-r--r--   3 root supergroup         45 2021-07-11 20:45 /user/carlos/painel3_covid_kafka/metadata
drwxr-xr-x   - root supergroup          0 2021-07-11 20:45 /user/carlos/painel3_covid_kafka/offsets
drwxr-xr-x   - root supergroup          0 2021-07-11 20:45 /user/carlos/painel3_covid_kafka/sources


# 7. Criar a visualização pelo Spark com os dados enviados para o HDFS:

In [216]:
painel_regioes = arquivos.join(max_data, arquivos.data == max_data.max_data, 'inner')\
.groupBy("regiao").agg(sum("casosAcumulado").alias("Casos"),\
                       sum("obitosAcumulado").alias("Obitos"),\
                       round((sum("casosAcumulado")  / sum("populacaoTCU2019"))*100000.00,2).alias("Incidencia"),\
                       round((sum("obitosAcumulado")  / sum("populacaoTCU2019"))*100000.00,2).alias("mortalidade"))\
.withColumn("Dta_atualizacao",current_timestamp())



painel_regioes.orderBy("regiao").show()

                     

+------------+--------+------+----------+-----------+--------------------+
|      regiao|   Casos|Obitos|Incidencia|mortalidade|     Dta_atualizacao|
+------------+--------+------+----------+-----------+--------------------+
|      Brasil|18855015|526892|   8972.29|     250.73|2021-07-12 00:40:...|
|Centro-Oeste| 3833238| 98414|  11760.51|     301.94|2021-07-12 00:40:...|
|    Nordeste| 8911474|215648|   7807.27|     188.93|2021-07-12 00:40:...|
|       Norte| 3465630| 87690|   9401.64|     237.89|2021-07-12 00:40:...|
|     Sudeste|14277606|490622|   8078.18|     277.59|2021-07-12 00:40:...|
|         Sul| 7222082|161410|  12046.45|     269.23|2021-07-12 00:40:...|
+------------+--------+------+----------+-----------+--------------------+



# 8. Salvar a visualização do exercício 6 em um tópico no Elastic