In [None]:
#Importando bibliotecas.

from pyspark.sql.types import *
from pyspark.sql.functions import *
from os.path import abspath
from pyspark.sql import SparkSession



In [2]:
warehouse_location = abspath('spark-warehouse')

In [3]:
spark = SparkSession \
    .builder \
    .appName("SparkCovid.com") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()


In [15]:
df=spark.sql("show databases")
df.show()

+------------+
|databaseName|
+------------+
|    covid_19|
|     default|
+------------+



In [4]:
#Criando Schema do dataframe.

covid19_schema = (
StructType(
[StructField("regiao",StringType()),StructField("estado",StringType()),StructField("municipio",StringType()),StructField("coduf",IntegerType()),StructField("codmun",IntegerType()),StructField("codRegiaoSaude",IntegerType()),StructField("nomeRegiaoSaude",StringType()),StructField("data",DateType()),StructField("semanaEpi",IntegerType()),StructField("populacaoTCU2019",IntegerType()),StructField("casosAcumulado",IntegerType()),StructField("casosNovos",IntegerType()),StructField("obitosAcumulado",IntegerType()),StructField("obitosNovos",IntegerType()),StructField("Recuperadosnovos",IntegerType()),StructField("emAcompanhamentoNovos",IntegerType()),StructField("interior/metropolitana",StringType())]
))

In [5]:
#Criando dataframe e lendo os arquivos no HDFS "/user/joao-paulo/data".

df_covid19 = (
 spark
.read
.option("header", "true")
.option("sep", ";")
.schema(covid19_schema)
.csv("/user/joao-paulo/data/")
)

In [6]:
# Criando dataframe com algumas transformações de dados.

df_covid19_resultado = df_covid19.agg(max("populacaoTCU2019").alias("População"), max("casosAcumulado").alias("casosAcumulados"),max("casosNovos").alias("casosNovos"), max("obitosAcumulado").alias("obitosAcumulado"), max("obitosNovos").alias("obitosNovos"), max("Recuperadosnovos").alias("recuperadosNovos"), max("emAcompanhamentoNovos").alias("emAcompanhamentoNovos"))

In [7]:
#### inserindo atributo no dataframe com transformações de dados.
###Calculo feito conforme documentação oficial do Ministério da saude, link: https://covid.saude.gov.br/  menu: Sobre
##Método de cálculo Incidência
#Número de casos confirmados de COVID-19 em residentes X 100.000 / População* total residente no período determinado.

df_covid19_resultado = df_covid19_resultado.withColumn('Incidencia', col('casosAcumulados') / col('População') * 100000)

In [8]:
#### inserindo atributo no dataframe com transformações de dados.
###Calculo feito conforme documentação oficial do Ministério da saude, link: https://covid.saude.gov.br/  menu: Sobre
##Método de cálculo Mortalidade
#Número de óbitos confirmados de COVID-19 em residentes X 100.000 / População* total residente no período determinado.

df_covid19_resultado = df_covid19_resultado.withColumn('Mortalidade', col('obitosAcumulado') / col('População') * 100000)

In [9]:
#### inserindo atributo no dataframe com transformações de dados.
###Calculo feito conforme documentação oficial do Ministério da saude, link: https://covid.saude.gov.br/  menu: Sobre
##Método de cálculo Letalidade
#Número de óbitos confirmados de COVID-19 em determinada área e período X 100 / Número de casos confirmados de COVID-19 em determinada área e período.

df_covid19_resultado = df_covid19_resultado.withColumn('Letalidade', col('obitosNovos') / col('casosNovos') * 100)

In [10]:
#Vizualizando dados dos "casos recuperados" e "em acompanhamento"
df_covid19_visualização_hive = df_covid19_resultado.select('recuperadosNovos','emAcompanhamentoNovos')

In [17]:
#Salvando vizualização como tabela Hive
##df_covid19_visualização_hive.write.option("path","hdfs://namenode:8020/user/joao-paulo/data").saveAsTable("data_us_hive")

df_covid19_visualização_hive.write.format("parquet").saveAsTable("covid_19")

AnalysisException: "Can not create the managed table('`covid_19`'). The associated location('hdfs://namenode:8020/user/hive/warehouse/covid_19') already exists.;"

In [156]:
#Vizualizando dados dos "Casos confirmados: Acumulados e novos"  e "incidência"
df_covid19_resultado.select('Casos Acumulados','Casos Novos', 'Incidência').show()

+----------------+-----------+-----------------+
|Casos Acumulados|Casos Novos|       Incidência|
+----------------+-----------+-----------------+
|        18855015|     115228|8972.292625940041|
+----------------+-----------+-----------------+



In [155]:
#Vizualizando dados dos "Obitos confirmados: Acumulados e novos",  "Mortalidade" e "letalidade".
df_covid19_resultado.select('Obitos Acumulado','Obitos Novos', 'Mortalidade', 'Letalidade').show()

+----------------+------------+------------------+-----------------+
|Obitos Acumulado|Obitos Novos|       Mortalidade|       Letalidade|
+----------------+------------+------------------+-----------------+
|          526892|        4249|250.72529543290204|3.687471795049814|
+----------------+------------+------------------+-----------------+

