In [1]:
import pyspark as spark
from pyspark.sql.functions import *

spark = SparkSession\
.builder\
.appName('Projeto final Básico - Campanha Nacional de Vacinação contra Covid-19')\
.config('spark.some.config.option', 'some-value')\
.enableHiveSupport()\
.getOrCreate()

#### Investiguei um dos arquivos CSV para entender como ele está formatado. Defini para esse projeto dois arquivos csv.
> O arquivo parte 1 encobre dados do primeiro semestre de 2021 e o arquivo parte 2 encobre dados dos primeiros 6 dias do mês de julho de 2021.

In [None]:
!hdfs dfs -cat /user/leo-silva/Documentos/Semantix/spark/projeto_final/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv

In [None]:
!hdfs dfs -cat /user/leo-silva/Documentos/Semantix/spark/projeto_final/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv

#### Identificado o formato do arquivo, criei o dataframe através do diretório do hdfs. Visualizei como está Schema. (Início dos tratamento dos dados).

In [2]:
csv_df = spark.read.csv('hdfs://namenode/user/spark/projeto_final_basico', sep=";",header=True, inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)
csv_df.show(10, vertical=True)

-RECORD 0-------------------------------------
 regiao                 | Brasil              
 estado                 | null                
 municipio              | null                
 coduf                  | 76                  
 codmun                 | null                
 codRegiaoSaude         | null                
 nomeRegiaoSaude        | null                
 data                   | 2021-01-01 00:00:00 
 semanaEpi              | 53                  
 populacaoTCU2019       | 210147125           
 casosAcumulado         | 7700578             
 casosNovos             | 24605               
 obitosAcumulado        | 195411              
 obitosNovos            | 462                 
 Recuperadosnovos       | 6756284             
 emAcompanhamentoNovos  | 748883              
 interior/metropolitana | null                
-RECORD 1-------------------------------------
 regiao                 | Brasil              
 estado                 | null                
 municipio   

In [3]:
csv_df.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [4]:
csv_df_to_unix = csv_df.withColumn('data', from_unixtime(unix_timestamp(csv_df.data), 'yyyy-MM-dd'))

In [5]:
csv_df_to_unix.show(30, vertical=True)

-RECORD 0----------------------------
 regiao                 | Brasil     
 estado                 | null       
 municipio              | null       
 coduf                  | 76         
 codmun                 | null       
 codRegiaoSaude         | null       
 nomeRegiaoSaude        | null       
 data                   | 2021-01-01 
 semanaEpi              | 53         
 populacaoTCU2019       | 210147125  
 casosAcumulado         | 7700578    
 casosNovos             | 24605      
 obitosAcumulado        | 195411     
 obitosNovos            | 462        
 Recuperadosnovos       | 6756284    
 emAcompanhamentoNovos  | 748883     
 interior/metropolitana | null       
-RECORD 1----------------------------
 regiao                 | Brasil     
 estado                 | null       
 municipio              | null       
 coduf                  | 76         
 codmun                 | null       
 codRegiaoSaude         | null       
 nomeRegiaoSaude        | null       
 data       

#### Criei o Banco de dados "covid" e particionei por Municipio ( como pede o exercício )

In [None]:
spark.sql("create database covid")

In [6]:
csv_df_to_unix.write.mode('overwrite').partitionBy('municipio').format('csv').saveAsTable('covid.municipio', path='hdfs://namenode:8020/user/hive/warehouse/covid_municipio/')

In [None]:
!hdfs dfs -ls /user/hive/warehouse/covid_municipio

In [7]:
spark.sql("SHOW DATABASES").show()

+------------+
|databaseName|
+------------+
|       covid|
|     default|
+------------+



In [8]:
spark.sql("USE covid")

DataFrame[]

In [9]:
spark.sql("SHOW TABLES").show()

+--------+-----------------+-----------+
|database|        tableName|isTemporary|
+--------+-----------------+-----------+
|   covid|        municipio|      false|
|   covid|obitos_por_estado|      false|
|   covid|obitos_por_regiao|      false|
+--------+-----------------+-----------+



In [10]:
spark.sql("SELECT * FROM municipio").show(200,vertical=True)

-RECORD 0----------------------------
 regiao                 | Brasil     
 estado                 | null       
 coduf                  | 76         
 codmun                 | null       
 codRegiaoSaude         | null       
 nomeRegiaoSaude        | null       
 data                   | 2021-01-01 
 semanaEpi              | 53         
 populacaoTCU2019       | 210147125  
 casosAcumulado         | 7700578    
 casosNovos             | 24605      
 obitosAcumulado        | 195411     
 obitosNovos            | 462        
 Recuperadosnovos       | 6756284    
 emAcompanhamentoNovos  | 748883     
 interior/metropolitana | null       
 municipio              | null       
-RECORD 1----------------------------
 regiao                 | Brasil     
 estado                 | null       
 coduf                  | 76         
 codmun                 | null       
 codRegiaoSaude         | null       
 nomeRegiaoSaude        | null       
 data                   | 2021-01-02 
 semanaEpi  

In [11]:
# VISUALIZAÇÃO 1 | Obitos estados do maior para o menor -> 1º Semestre 2021 (01/01/2021 - 30/06/2021)
estado_obitos = spark.sql("SELECT estado, MAX(obitosAcumulado) AS obitos FROM municipio WHERE estado IS NOT NULL GROUP BY estado ORDER BY obitos DESC")
estado_obitos.show()

+------+------+
|estado|obitos|
+------+------+
|    SP|130389|
|    RJ| 56192|
|    MG| 47148|
|    RS| 31867|
|    PR| 31692|
|    BA| 24428|
|    CE| 22791|
|    GO| 19485|
|    PE| 17953|
|    SC| 17146|
|    PA| 15624|
|    AM| 13349|
|    MT| 12000|
|    ES| 11582|
|    DF|  9322|
|    MA|  9190|
|    PB|  8724|
|    MS|  8400|
|    RN|  6853|
|    PI|  6662|
+------+------+
only showing top 20 rows



In [12]:
# VISUALIZAÇÃO 1.1 | Óbitos pelas regiões do Brasil e Brasil como um todo. ( Brasil inicia o ano de 2021 com 195.411 óbitos )
regiao_br_obitos = spark.sql("select regiao, max(obitosAcumulado) as obitos from municipio group by regiao order by obitos desc")
regiao_br_obitos.show()

+------------+------+
|      regiao|obitos|
+------------+------+
|      Brasil|526892|
|     Sudeste|130389|
|         Sul| 31867|
|    Nordeste| 24428|
|Centro-Oeste| 19485|
|       Norte| 15624|
+------------+------+



In [13]:
# VISUALIZAÇÃO 2 | Número total de casos novos no fim do primeiro semestre de 2021.
casos_novos = spark.sql("SELECT estado, sum(casosNovos) AS casos_novos FROM municipio where estado IS NOT NULL group by estado order by casos_novos desc")
casos_novos.show()

+------+-----------+
|estado|casos_novos|
+------+-----------+
|    SP|    4693850|
|    MG|    2586578|
|    PR|    1784154|
|    RS|    1572480|
|    BA|    1296424|
|    SC|    1147802|
|    CE|    1119174|
|    RJ|    1071240|
|    GO|     755130|
|    PE|     678678|
|    MT|     553456|
|    ES|     549766|
|    PA|     528336|
|    PB|     471382|
|    RN|     458330|
|    MS|     411124|
|    AM|     408106|
|    DF|     366014|
|    PI|     312824|
|    RO|     312496|
+------+-----------+
only showing top 20 rows



In [14]:
# Visualizaçaõ 3 | Valor médio de casos novos e óbitos diários no primeiro semestre por estado.
casos_obitos_media = spark.sql("SELECT estado, ROUND(SUM(casosNovos) / COUNT(data),2) AS media_casos_novos , ROUND(AVG(obitosAcumulado),2) AS media_obitos_diarios FROM municipio WHERE estado IS NOT NULL GROUP BY estado ORDER BY media_casos_novos DESC")
casos_obitos_media.show()

+------+-----------------+--------------------+
|estado|media_casos_novos|media_obitos_diarios|
+------+-----------------+--------------------+
|    DF|           978.65|             6551.88|
|    RJ|            60.94|              847.68|
|    SP|             38.8|              253.57|
|    ES|            36.75|              204.69|
|    AM|            34.64|              352.01|
|    CE|            32.18|              164.61|
|    AP|            31.37|              161.49|
|    RO|            30.95|              152.26|
|    RR|            28.35|              153.21|
|    MS|            27.48|              121.27|
|    PR|            23.79|                91.4|
|    SE|             21.4|              100.22|
|    MT|             20.7|              111.37|
|    SC|            20.67|               74.01|
|    AC|            20.64|               112.0|
|    PA|            19.49|               152.9|
|    PE|            19.41|              139.33|
|    RS|            16.85|              

#### Após as 3 visualizações criadas, salvei a 1º como tabela HIVE

In [15]:
estado_obitos.write.format('csv').saveAsTable('Obitos_por_estado')

AnalysisException: 'Table `Obitos_por_estado` already exists.;'

In [14]:
regiao_br_obitos.write.format('csv').saveAsTable('Obitos_por_regiao')

AnalysisException: 'Table `Obitos_por_regiao` already exists.;'

In [15]:
#Visualizar as tabelas salvas 
spark.sql('SHOW TABLES').show()

+--------+-----------------+-----------+
|database|        tableName|isTemporary|
+--------+-----------------+-----------+
|   covid|        municipio|      false|
|   covid|obitos_por_estado|      false|
|   covid|obitos_por_regiao|      false|
+--------+-----------------+-----------+



#### A 2ª com formato parquet e compressão snappy

In [18]:
casos_novos.write.option('compression', 'snappy').parquet('/user/spark/projeto_final_basico/segunda_visualizacao')

AnalysisException: 'path hdfs://namenode:8020/user/spark/projeto_final_basico/segunda_visualizacao already exists.;'

In [17]:
#Conferindo se foi salvo corretamente
!hdfs dfs -ls '/user/spark/projeto_final_basico/segunda_visualizacao'

Found 28 items
-rw-r--r--   2 root supergroup          0 2022-12-10 19:46 /user/spark/projeto_final_basico/segunda_visualizacao/_SUCCESS
-rw-r--r--   2 root supergroup        650 2022-12-10 19:46 /user/spark/projeto_final_basico/segunda_visualizacao/part-00000-4973fb5e-6e20-4589-bf5c-e2b01f12a3aa-c000.snappy.parquet
-rw-r--r--   2 root supergroup        650 2022-12-10 19:46 /user/spark/projeto_final_basico/segunda_visualizacao/part-00001-4973fb5e-6e20-4589-bf5c-e2b01f12a3aa-c000.snappy.parquet
-rw-r--r--   2 root supergroup        650 2022-12-10 19:46 /user/spark/projeto_final_basico/segunda_visualizacao/part-00002-4973fb5e-6e20-4589-bf5c-e2b01f12a3aa-c000.snappy.parquet
-rw-r--r--   2 root supergroup        650 2022-12-10 19:46 /user/spark/projeto_final_basico/segunda_visualizacao/part-00003-4973fb5e-6e20-4589-bf5c-e2b01f12a3aa-c000.snappy.parquet
-rw-r--r--   2 root supergroup        650 2022-12-10 19:46 /user/spark/projeto_final_basico/segunda_visualizacao/part-00004-4973fb5e-

#### A 3ª em um tópico no Kafka

In [16]:
casos_obitos_media.selectExpr("to_json(struct(*)) AS value").write.format('kafka').option('kafka.bootstrap.servers', 'kafka:9092').option('topic', 'casos_obitos_media').save()

In [17]:
topic = spark.read.format('kafka').option('kafka.bootstrap.servers', 'kafka:9092').option('subscribe','casos_obitos_media').load()

topic_media_casos_obitos = topic.select(col('value').cast('string'))
topic_media_casos_obitos.show(truncate = False)

+-------------------------------------------------------------------------+
|value                                                                    |
+-------------------------------------------------------------------------+
|{"estado":"ES","media_casos_novos":36.75,"media_obitos_diarios":204.69}  |
|{"estado":"SP","media_casos_novos":38.8,"media_obitos_diarios":253.57}   |
|{"estado":"DF","media_casos_novos":978.65,"media_obitos_diarios":6551.88}|
|{"estado":"RJ","media_casos_novos":60.94,"media_obitos_diarios":847.68}  |
|{"estado":"AP","media_casos_novos":31.37,"media_obitos_diarios":161.49}  |
|{"estado":"AM","media_casos_novos":34.64,"media_obitos_diarios":352.01}  |
|{"estado":"RO","media_casos_novos":30.95,"media_obitos_diarios":152.26}  |
|{"estado":"CE","media_casos_novos":32.18,"media_obitos_diarios":164.61}  |
|{"estado":"MS","media_casos_novos":27.48,"media_obitos_diarios":121.27}  |
|{"estado":"PR","media_casos_novos":23.79,"media_obitos_diarios":91.4}    |
|{"estado":"

#### Agora criei uma visualização geral no Spark com todos os dados enviados para o HDFS : Síntese de casos, óbitos, incidência e mortalidade

##### Não consegui criar com SQL Querys

In [19]:
df_geral = csv_df.groupBy(['regiao', 'estado']).agg({'casosAcumulado':'max', 'obitosAcumulado':'max', 'populacaoTCU2019':'max'})

In [20]:
df_geral.show(10)

+--------+------+---------------------+-------------------+--------------------+
|  regiao|estado|max(populacaoTCU2019)|max(casosAcumulado)|max(obitosAcumulado)|
+--------+------+---------------------+-------------------+--------------------+
|   Norte|    TO|              1572866|             200243|                3266|
|   Norte|    AC|               881935|              85997|                1760|
|   Norte|    PA|              8602865|             557708|               15624|
|Nordeste|    MA|              7075181|             322052|                9190|
|     Sul|    RS|             11377239|            1235914|               31867|
| Sudeste|    SP|             45919049|            3809222|              130389|
|Nordeste|    PI|              3273227|             299084|                6662|
|   Norte|    AP|               845731|             118066|                1857|
| Sudeste|    MG|             21168791|            1836198|               47148|
|     Sul|    PR|           

In [21]:
df_renomear_campos = df_geral.withColumnRenamed('max(populacaoTCU2019)','populacao').withColumnRenamed('max(casosAcumulado)', 'casos_acumulados').withColumnRenamed('max(obitosAcumulado)','obitos_acumulados')

In [22]:
df_geral_completo = (df_renomear_campos.withColumn('incidencia', round(df_renomear_campos['casos_acumulados']/df_renomear_campos['populacao']*100000,1)).withColumn('mortalidade', round(df_renomear_campos['obitos_acumulados']/df_renomear_campos['populacao']*100000,1)))

In [23]:
df_geral_completo.show(10)

+--------+------+---------+----------------+-----------------+----------+-----------+
|  regiao|estado|populacao|casos_acumulados|obitos_acumulados|incidencia|mortalidade|
+--------+------+---------+----------------+-----------------+----------+-----------+
|   Norte|    TO|  1572866|          200243|             3266|   12731.1|      207.6|
|   Norte|    AC|   881935|           85997|             1760|    9750.9|      199.6|
|   Norte|    PA|  8602865|          557708|            15624|    6482.8|      181.6|
|Nordeste|    MA|  7075181|          322052|             9190|    4551.9|      129.9|
|     Sul|    RS| 11377239|         1235914|            31867|   10863.0|      280.1|
| Sudeste|    SP| 45919049|         3809222|           130389|    8295.5|      284.0|
|Nordeste|    PI|  3273227|          299084|             6662|    9137.3|      203.5|
|   Norte|    AP|   845731|          118066|             1857|   13960.2|      219.6|
| Sudeste|    MG| 21168791|         1836198|          

#### Salvar a visualização do exercício 6 em um tópico no Elastic

In [24]:
df_final = topic_media_casos_obitos
df_final.write.format("csv").save('hdfs://namenode/user/spark/projeto_final_basico/visualizacao3/covid_elastic.csv')

In [26]:
!hdfs dfs -ls /user/spark/projeto_final_basico/visualizacao3/covid_elastic.csv

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-12-11 21:06 /user/spark/projeto_final_basico/visualizacao3/covid_elastic.csv/_SUCCESS
-rw-r--r--   2 root supergroup      13152 2022-12-11 21:06 /user/spark/projeto_final_basico/visualizacao3/covid_elastic.csv/part-00000-0e0c2620-4d94-4828-b4b0-a6cf95193708-c000.csv


In [25]:
!hdfs dfs -get /user/spark/projeto_final_basico/visualizacao3/covid_elastic.csv /input