<h2>Projeto Final de Spark (Básico)</h2>

<h3>Questão 1</h3>

In [None]:
# Enviar os dados para o hdfs:
# docker cp /home/cayo/Documents/Cayo/Semantix/projeto-final/Semantix-Eng-Dados/ namenode:/home/

In [None]:
# Criando a pasta no hdfs
hdfs dfs -mkdir /user/cayo/projeto-final

In [None]:
# Copiando arquivos para o hdfs
hdfs dfs -copyFromLocal /home/*.csv /user/cayo/projeto-final

<h3>Questão 2</h3>

In [6]:
# Criando o banco de dados
sqlContext.sql("create database projeto_final").show()

AnalysisException: 'org.apache.hadoop.hive.metastore.api.AlreadyExistsException: Database projeto_final already exists;'

In [8]:
# Criando a tabela simples sem partições
sqlContext.sql("create table dados_covid(regiao string, estado string, municipio string, coduf int,\
    codmun int, codRegiaoSaude int, nomeRegiaoSaude string, data date, semanaEpi int, populacaoTCU2019 int,\
    casosAcumulado int, casosNovos int, obitosAcumulado int, obitosNovos int, recuperadosnovos int,\
    emAcompanhamentoNovos int, interiorMetropolitana int) row format delimited fields terminated by ';'\
    lines terminated by '\n' stored as textfile tblproperties('skip.header.line.count'='1')").show()

AnalysisException: "Table or view 'dados_covid' already exists in database 'default';"

In [None]:
# Carregando arquivos de dados para o banco de dados
sqlContext.sql("load data inpath '/user/cayo/projeto-final' overwrite into table dados_covid")

In [None]:
# Criando tabela participada pelo municipio apartir da tabela simples

In [None]:
sqlContext.sql("INSERT OVERWRITE TABLE dados_covid_particionada PARTITION(municipio) SELECT regiao, estado,\
    municipio, coduf, codmun, codregiaosaude, nomeregiaosaude, data, semanaepi, populacaotcu2019, casosacumulado,\
    casosnovos, obitosacumulado, obitosnovos, recuperadosnovos, emacompanhamentonovos, interiormetropolitana\
    FROM dados_covid;")

<h3>Questão 3</h3>

In [1]:
# Lendo arquivos do hdfs com os dados
dados_covid = spark.read.option("sep", ";").option("header", "true").csv("hdfs:///user/cayo/projeto-final")

In [9]:
dados_covid.show(5)

+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|      data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+----------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-26|        9|       2101471

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, LongType

In [23]:
# Primeira visualização

dados_covid_view1 = dados_covid.agg(\
    sum("Recuperadosnovos").cast(LongType()).alias("Casos_recuperados"),\
    sum("emAcompanhamentoNovos").cast(LongType()).alias("Em_acompanhamento"))

dados_covid_view1.show()

+-----------------+-----------------+
|Casos_recuperados|Em_acompanhamento|
+-----------------+-----------------+
|       2920055795|        320793426|
+-----------------+-----------------+



In [4]:
# Segunda visualização

dados_covid_view2 = dados_covid.agg(\
    sum("casosAcumulado").cast(LongType()).alias("Acumulado"),\
    sum("casosNovos").cast(LongType()).alias("Casos_novos"),\
    (format_number(((sum("casosAcumulado").cast(LongType()) * 100000).cast(FloatType()) / sum("populacaoTCU2019").cast(LongType())), 2)).alias("Incidencia"))

dados_covid_view2.show()

+----------+-----------+----------+
| Acumulado|Casos_novos|Incidencia|
+----------+-----------+----------+
|9998472092|   56565045|  3,247.67|
+----------+-----------+----------+



In [5]:
# Terceira visualização

dados_covid_view3 = dados_covid.agg(\
    sum("obitosAcumulado").cast(LongType()).alias("Obitos_acumulado"),\
    sum("obitosNovos").cast(LongType()).alias("Obitos_novos"),\
    format_number(((sum("obitosAcumulado").cast(LongType()) * 100) / sum("casosAcumulado").cast(LongType())).cast(FloatType()), 2).alias("Letalidade"),\
    format_number(((sum("obitosAcumulado").cast(LongType()) * 100000) / sum("populacaoTCU2019").cast(LongType())).cast(FloatType()), 2).alias("Mortalidade"))

dados_covid_view3.show()

+----------------+------------+----------+-----------+
|Obitos_acumulado|Obitos_novos|Letalidade|Mortalidade|
+----------------+------------+----------+-----------+
|       274784085|     1580676|      2.75|      89.25|
+----------------+------------+----------+-----------+



<h3>Questão 4</h3>

In [29]:
# Salvando a primeira visualização em uma tabela hive
dados_covid_view1.write.option('header','true').csv("/user/cayo/projeto-final/data/dados_covid_view1_hive")

<h3>Questão 5</h3>

In [5]:
# Salvando a segunda visualização em uma tabela parquet
dados_covid_view2.write.option('header','true').option("compression", "snappy").parquet("/user/cayo/projeto-final/data/dados_covid_view2_parquet")

<h3>Questão 6</h3>

In [25]:
# Salvando a terceira visualização em um tṕpico kafka
dados_covid_view3.select(to_json(struct("*")).alias("value"))\
    .selectExpr("CAST(value AS STRING)").write\
    .format("kafka").option("kafka.bootstrap.servers", "kafka:9092")\
    .option("topic", "dados_covid_view3_kafka")\
    .save()

NameError: name 'dados_covid_view3' is not defined

<h3>Questão 7</h3>

In [11]:
# Qaarta visualização

dados_covid_tabela = dados_covid.groupBy('regiao').agg(\
    sum("casosAcumulado").cast(LongType()).alias("Casos_acumulado"),\
    sum("obitosAcumulado").cast(LongType()).alias("Obitos_acumulado"),\
    format_number(((sum("casosAcumulado").cast(LongType()) * 100000).cast(FloatType()) / sum("populacaoTCU2019").cast(LongType())), 2).alias("Incidencia"),\
    format_number(((sum("obitosAcumulado").cast(LongType()) * 100000) / sum("populacaoTCU2019").cast(LongType())).cast(FloatType()), 2).alias("Mortalidade"),\
    date_format(last("data"),"dd/MM/yyyy hh:mm").alias("Atualizacao")).sort('regiao')

# Salvando a visualização no hdfs
dados_covid_tabela.write.save("/user/cayo/projeto-final/data/dados_covid_tabela")
# dados_covid_tabela.write.option('header','true').csv("/user/cayo/projeto-final/data/dados_covid_view1_hive")

dados_covid_tabela.show()

+------------+---------------+----------------+----------+-----------+----------------+
|      regiao|Casos_acumulado|Obitos_acumulado|Incidencia|Mortalidade|     Atualizacao|
+------------+---------------+----------------+----------+-----------+----------------+
|      Brasil|     3343282900|        91858674|  3,181.85|      87.42|06/07/2021 12:00|
|Centro-Oeste|      715162485|        16114579|  4,547.45|     102.47|31/07/2020 12:00|
|    Nordeste|     1635015853|        42387899|  2,968.75|      76.97|06/07/2021 12:00|
|       Norte|      736523753|        18128122|  4,141.06|     101.92|06/07/2021 12:00|
|     Sudeste|     2416910626|        83614760|  2,834.14|      98.05|06/07/2021 12:00|
|         Sul|     1151576475|        22680051|  3,981.00|      78.40|06/07/2021 12:00|
+------------+---------------+----------------+----------+-----------+----------------+



<h3>Questão 8 e 9 (Pendente)</h3>

In [None]:
# Não consegui configurar meu spark com elastic