Spark Final Project

In [25]:
# import library
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [26]:
# Init SparkSession with HiveSupport
spark = SparkSession \
    .builder \
    .appName("Spark-Hive Connection") \
    .config("spark.sql.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

In [24]:
# Move csv files to HDFS
!hdfs dfs -mkdir -p /tmp/final_spark_project/covid_br_data/
!hdfs dfs -put /mnt/notebooks/covid_br_data/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv /tmp/final_spark_project/covid_br_data/
!hdfs dfs -put /mnt/notebooks/covid_br_data/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv /tmp/final_spark_project/covid_br_data/
!hdfs dfs -put /mnt/notebooks/covid_br_data/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv /tmp/final_spark_project/covid_br_data/
!hdfs dfs -put /mnt/notebooks/covid_br_data/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv /tmp/final_spark_project/covid_br_data/

In [27]:
# Create dataframe from csv in hdfs
covid_df = spark.read.csv('/tmp/final_spark_project/covid_br_data/*.csv', sep=";", header=True)

In [28]:
# Check dataframe Schema
covid_df.dtypes

[('regiao', 'string'),
 ('estado', 'string'),
 ('municipio', 'string'),
 ('coduf', 'string'),
 ('codmun', 'string'),
 ('codRegiaoSaude', 'string'),
 ('nomeRegiaoSaude', 'string'),
 ('data', 'string'),
 ('semanaEpi', 'string'),
 ('populacaoTCU2019', 'string'),
 ('casosAcumulado', 'string'),
 ('casosNovos', 'string'),
 ('obitosAcumulado', 'string'),
 ('obitosNovos', 'string'),
 ('Recuperadosnovos', 'string'),
 ('emAcompanhamentoNovos', 'string'),
 ('interior/metropolitana', 'string')]

In [29]:
# Check dataframe data
covid_df.show(1,False,True)

-RECORD 0----------------------------
 regiao                 | Brasil     
 estado                 | null       
 municipio              | null       
 coduf                  | 76         
 codmun                 | null       
 codRegiaoSaude         | null       
 nomeRegiaoSaude        | null       
 data                   | 2020-02-25 
 semanaEpi              | 9          
 populacaoTCU2019       | 210147125  
 casosAcumulado         | 0          
 casosNovos             | 0          
 obitosAcumulado        | 0          
 obitosNovos            | 0          
 Recuperadosnovos       | null       
 emAcompanhamentoNovos  | null       
 interior/metropolitana | null       
only showing top 1 row



In [30]:
# Change dataframe Schema acording with the data
covid_df_1 = covid_df.select("regiao",
                             "estado",
                             "municipio",
                             col("coduf").cast("integer"),
                             col("codmun").cast("integer"),
                             col("codRegiaoSaude").cast("integer"),
                             "nomeRegiaoSaude",
                             col("data").cast("date"),
                             col("semanaEpi").cast("integer"),
                             col("populacaoTCU2019").cast("date"),
                             col("casosAcumulado").cast("integer"),
                             col("casosNovos").cast("integer"),
                             col("obitosAcumulado").cast("integer"),
                             col("obitosNovos").cast("integer"),
                             col("Recuperadosnovos").cast("integer"),
                             col("emAcompanhamentoNovos").cast("integer"),
                             col("interior/metropolitana").cast("integer"))


In [31]:
# Show the Hive Database
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [34]:
# Send the data from HDFS To Hive
covid_df_1.write.format("csv").partitionBy("municipio").saveAsTable("covid_br_data")

In [36]:
# Show the table created
spark.sql("show tables").show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
| default|covid_br_data|      false|
+--------+-------------+-----------+



Creating dataframes for Visualizations

First View - Hive

In [13]:
Recuperados = spark.sql("select Recuperadosnovos as Casos_Recuperados from covid_br_data order by 1 desc limit 1")
Recuperados.show()

+-----------------+
|Casos_Recuperados|
+-----------------+
|         17262646|
+-----------------+



In [None]:
Acompanhamento = spark.sql("select emAcompanhamentoNovos as Em_Acompanhamento from covid_br_data order by 1 desc limit 1")
Acompanhamento.show()

Second View - HDFS

In [15]:
casosAcumulado = spark.sql("select casosAcumulado as Acumulado from covid_br_data order by 1 desc limit 1")
casosAcumulado.show()

+---------+
|Acumulado|
+---------+
| 18855015|
+---------+



In [18]:
casosNovos = spark.sql("select casosNovos as Casos_Novos from covid_br_data order by 1 desc limit 1")
casosNovos.show()

+-----------+
|Casos_Novos|
+-----------+
|     115228|
+-----------+



In [22]:
Incidencia = spark.sql("select ((casosAcumulado/210147125)*100000) as Incidencia from covid_br_data order by 1 desc limit 1")
Incidencia.show()

+-----------------+
|       Incidencia|
+-----------------+
|8972.292625940041|
+-----------------+



Third View - Kakfa

In [53]:
# Kafka only accept data from a string type value column 
# Change the current result for a string type named value column 
Obitos_Acumulados = spark.sql("select obitosAcumulado from covid_br_data order by 1 desc limit 1")
Obitos_Acumulados_string = Obitos_Acumulados.withColumn("value", col("obitosAcumulado").cast(StringType())).drop("obitosAcumulado")
Obitos_Acumulados_string.show()

DataFrame[obitosAcumulado: string]

In [29]:
obitosNovos = spark.sql("select obitosNovos from covid_br_data order by 1 desc limit 1")
obitosNovos_string = obitosNovos.withColumn("value", col("obitosNovos").cast(StringType())).drop("obitosNovos")
obitosNovos_string.show()

+-----+
|value|
+-----+
| 4249|
+-----+



In [30]:
Mortalidade = spark.sql("select ((obitosAcumulado/210147125)*100000) as Mortalidade from covid_br_data order by 1 desc limit 1")
Mortalidade_string = Mortalidade.withColumn("value", col("Mortalidade").cast(StringType())).drop("Mortalidade")
Mortalidade_string.show()

+------------------+
|             value|
+------------------+
|250.72529543290204|
+------------------+



In [31]:
Letalidade = spark.sql("select obitosNovos, casosNovos, (obitosNovos/casosNovos)*100 as Letalidade from covid_br_data order by 1 desc limit 1")
Letalidade_string = Letalidade.withColumn("value", col("Letalidade").cast(StringType())).drop("obitosNovos", "casosNovos", "Letalidade")
Letalidade_string.show()

+-----------------+
|            value|
+-----------------+
|4.903522134515072|
+-----------------+



Saving the dataframes

Save to Hive Table

In [None]:
Recuperados.write.format("csv").saveAsTable("Recuperados")

In [None]:
Acompanhamento.write.format("csv").saveAsTable("Acompanhamento")

In [14]:
# Check the created tables
spark.sql("Show tables").show()

+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
| default|acompanhamento|      false|
| default| covid_br_data|      false|
| default|   recuperados|      false|
+--------+--------------+-----------+



Save to HDFS as parquet with snappy compression

In [17]:
casosAcumulado.write.option("compression","snappy").parquet("/user/final_spark_project/casosAcumulado")

In [20]:
casosNovos.write.option("compression","snappy").parquet("/user/final_spark_project/casosNovos")

In [24]:
Incidencia.write.option("compression","snappy").parquet("/user/final_spark_project/Incidencia")

In [25]:
# Check the files in HDFS
!hdfs dfs -ls /user/final_spark_project/

Found 4 items
drwxr-xr-x   - root supergroup          0 2021-11-09 19:52 /user/final_spark_project/Incidencia
drwxr-xr-x   - root supergroup          0 2021-11-09 19:42 /user/final_spark_project/casosAcumulado
drwxr-xr-x   - root supergroup          0 2021-11-09 19:47 /user/final_spark_project/casosNovos
drwxr-xr-x   - root supergroup          0 2021-11-09 18:08 /user/final_spark_project/covid_br_data


Save to Kafka topic

In [47]:
(Obitos_Acumulados_string.write
                .format("kafka") 
                .option("kafka.bootstrap.servers","kafka:9092") 
                .option("topic","topic-Obitos_Acumulados")                
                .save())

In [50]:
df = (spark 
        .read 
        .format("kafka") 
        .option("kafka.bootstrap.servers", "kafka:9092") 
        .option("subscribe", "topic-Obitos_Acumulados")  
        .load())

In [51]:
df.show()

+----+----------------+--------------------+---------+------+--------------------+-------------+
| key|           value|               topic|partition|offset|           timestamp|timestampType|
+----+----------------+--------------------+---------+------+--------------------+-------------+
|null|[39 39 39 38 39]|topic-Obitos_Acum...|        0|     0|2021-11-10 00:06:...|            0|
|null|[39 39 39 38 39]|topic-Obitos_Acum...|        0|     1|2021-11-10 00:12:...|            0|
|null|[39 39 39 38 39]|topic-Obitos_Acum...|        0|     2|2021-11-10 00:22:...|            0|
+----+----------------+--------------------+---------+------+--------------------+-------------+



In [None]:
obitosNovos_string.write\
                .format("kafka") \
                .option("kafka.bootstrap.servers","kafka:9092") \
                .option("topic", "Obitos_Novos") \
                .option("checkpointLocation","user/kafka_checkpoint_Obitos_Novos")\
                .option("path","hdfs:///user/kafka/topic-Obitos_Novos") \
                .save()

In [None]:
Mortalidade_string.write\
                .format("kafka") \
                .option("kafka.bootstrap.servers","kafka:9092") \
                .option("topic", "Mortalidade") \
                .option("checkpointLocation","user/kafka_checkpoint_Mortalidade")\
                .option("path","hdfs:///user/kafka/topic-Mortalidade") \
                .save()

In [None]:
Letalidade_string.write\
                .format("kafka") \
                .option("kafka.bootstrap.servers","kafka:9092") \
                .option("topic", "Letalidade") \
                .option("checkpointLocation","user/kafka_checkpoint_Letalidade")\
                .option("path","hdfs:///user/kafka/topic-Letalidade") \
                .save()

Spark View

In [28]:
spark_df = spark.sql("""select regiao, 
         max(casosAcumulado) as Casos, 
         max(obitosAcumulado) as Obitos, 
         max(cast(((casosAcumulado/210147125)*100000) as decimal(18,2))) as Incidencia, 
         max(cast(((obitosAcumulado/210147125)*100000) as decimal(18,2))) as Mortalidade, 
         max(data) as Atualizacao
         from covid_br_data 
         group by regiao 
         order by regiao""")
spark_df.show()

+------------+--------+------+----------+-----------+-----------+
|      regiao|   Casos|Obitos|Incidencia|Mortalidade|Atualizacao|
+------------+--------+------+----------+-----------+-----------+
|      Brasil|18855015| 99572|   8972.29|     250.73| 2021-07-06|
|Centro-Oeste|  686433|  9980|    326.64|       9.27| 2021-07-06|
|    Nordeste| 1141612|  9993|    543.24|      11.62| 2021-07-06|
|       Norte|  557708|  9992|    265.39|       7.43| 2021-07-06|
|     Sudeste| 3809222| 99989|   1812.65|      62.05| 2021-07-06|
|         Sul| 1308643|   999|    622.73|      15.16| 2021-07-06|
+------------+--------+------+----------+-----------+-----------+



Elastic View

In [None]:
# Create a topic in elastic
# Dictionary
esconf={}
esconf["es.mapping.id"] = "_id"
esconf["es.nodes"] = "localhost"
esconf["es.port"] = "9200"
esconf["es.update.script.inline"] = "ctx._source.location = params.location"
esconf["es.update.script.params"] = "location:"
esconf["es.write.operation"] = "upsert"

# Dataframe to Elastic - topic
Obitos_Acumulados.write \
                 .format("org.elasticsearch.spark.sql") \
                 .options(**esconf) \
                 .save("/user/elastic")