<h2>1. Enviar os dados para o hdfs</h2>

<h2>Criando diretório para armazenar os dados</h2>

In [None]:
!hdfs dfs -mkdir -p /user/paulo/projeto-final/data

<h2>Enviando dados para o HDFS (deve ser executado diretamente no bash)</h>

In [None]:
!hdfs dfs -put /home/csv-files /user/paulo/projeto-final/data

<h2>2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por
município.</h2>

<h2>Carregando dados</h2>

In [1]:
from pyspark.sql.types import *

column_list = [
    StructField("regiao", StringType()),
    StructField("estado",StringType()),
    StructField("municipio",StringType()),
    StructField("coduf",IntegerType()),
       StructField("codmun",IntegerType()),
       StructField("codRegiaoSaude",IntegerType()),
       StructField("nomeRegiaoSaude",StringType()),
       StructField("data",StringType()),
       StructField("semanaEpi",IntegerType()),
       StructField("populacaoTCU2019",IntegerType()),
       StructField("casosAcumulado",IntegerType()),
       StructField("casosNovos",IntegerType()),
       StructField("obitosAcumulado",IntegerType()),
       StructField("obitosNovos",IntegerType()),
       StructField("Recuperadosnovos",IntegerType()),
       StructField("emAcompanhamentoNovos",IntegerType()),
       StructField("interior/metropolitana",IntegerType())
]

schema = StructType(column_list)

csv_files = spark.read.option("delimiter", ";").option("header","true").schema(schema).csv("/user/paulo/projeto-final/data/csv-files");
csv_files.show(5);

+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|      data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-26|        9|       2101471

<h2>Salvando dados em uma tabela hive particionada por município</h2>

In [None]:
csv_files.write.partitionBy('municipio').saveAsTable('projeto_final.painel_covid')

<h2>3. Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:</h2>

<h2>Primeira Visualização</h2>

In [4]:
from pyspark.sql.functions import sum, avg, min, max, count, desc, unix_timestamp,from_unixtime, col, substring, split,lit, concat

In [5]:
painel_covid = spark.read.table('projeto_final.painel_covid')
painel_covid = painel_covid\
.withColumn("data_timestamp", unix_timestamp(col("data"), "yyyy-MM-dd"))

In [None]:
casos_recuperados_e_acompanhamento_view = painel_covid.filter("regiao='Brasil'")\
.groupBy('regiao').agg(max('recuperadosnovos').alias("RecuperadosNovos"),\
                       max('emAcompanhamentoNovos').alias('EmAcompanhamento'))

casos_recuperados_e_acompanhamento_view.show()

<h2>Segunda Visualização</h2>

In [None]:
casos_novos = painel_covid.select('casosNovos').filter("regiao='Brasil'").sort(col('data_timestamp').desc()).collect()

In [None]:
casos_confirmados_view = painel_covid.filter("regiao='Brasil'")\
.groupBy('regiao').agg(max('casosacumulado').alias("CasosConfirmados"))\
.withColumn('CasosNovos', lit(casos_novos[0].casosNovos))



casos_confirmados_view.show()

<h2>Terceira Visualização</h2>

In [None]:
casos_novos = painel_covid.select('obitosNovos').filter("regiao='Brasil'").sort(col('data_timestamp').desc()).collect()


In [None]:
obitos_acumulados_view = painel_covid.filter("regiao='Brasil'")\
.groupBy('regiao').agg(max('obitosAcumulado').alias("Óbitos Acumulados")).withColumn('Casos Novos', lit(casos_novos[0].obitosNovos))


obitos_acumulados_view.show()

In [7]:
obitos_acumulados_view = painel_covid.filter("regiao!='Brasil'")\
.groupBy('data_timestamp').agg(sum('obitosNovos').alias("obitos"))\
.withColumn("data_notificacao", from_unixtime(col("data_timestamp"), "dd-MM-yyyy"))\
.sort(desc('data_timestamp'))

#obitos_acumulados_view.select('data_notificacao','obitos').show(10)

<h2>4.Salvar a primeira visualização como tabela Hive</h2>

In [None]:
casos_recuperados_e_acompanhamento_view.write.saveAsTable('projeto_final.casos_recuperados')


<h2>5. Salvar a segunda visualização com formato parquet e compressão snappy</h2>

In [None]:
casos_confirmados_view.write.format('parquet')\
.option('compression','snappy')\
.saveAsTable('projeto_final.casos_confirmados')

<h2>6. Salvar a terceira visualização em um tópico no Kafka</h2>

topico kafka

<p>kafka-topics.sh --bootstrap-server kafka:9092 --topic obitosConfirmados --create --partitions 1 --replication-factor 1</p>
<p>kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic obitosConfirmados</p>
<p>kafka-topics.sh --bootstrap-server kafka:9092 --topic obitosConfirmados --delete</p>

In [None]:
obitos_acumulados_view\
.withColumn("value", concat(col('data_notificacao').cast(StringType()),lit(','),col('obitos').cast(StringType())))\
.write\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("topic", "obitosConfirmados")\
.option("checkpointLocation","/user/paulo/kafka_checkpoint")\
.save()

<h2>7. Criar a visualização pelo Spark com os dados enviados para o HDFS:</h2>

In [None]:
sintexe_view = painel_covid.filter("regiao != 'null'").groupBy('regiao')\
.agg(max('casosAcumulado').alias("obitosAcumulado"),max('obitosAcumulado').alias("obitosAcumulado"))

sintexe_view.show()

In [12]:
sc

In [11]:
obitos_acumulados_view.saveToEs("spark/docs")

AttributeError: 'DataFrame' object has no attribute 'saveToEs'