## Sejam Bem-Vindos ao curso <br> "Introdução ao Processamento Paralelo e Distribuído Utilizando o Apache Spark"

Instrutor: M.Sc. Fernando Pereira Gonçalves de Sá (fpgdesa@gmail.com)

#### Trabalharemos com o Spark versão 3.2.0 <br>
#### Todas as bibliotecas que necessitaremos já estão instaladas e configuradas em nosso container Docker

In [1]:
! pyspark --version


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.11
Branch HEAD
Compiled by user ubuntu on 2021-10-06T12:46:30Z
Revision 5d45a415f3a29898d92380380cfd82bfc7f579ea
Url https://github.com/apache/spark
Type --help for more information.


In [4]:
# Instalando gdown para realizar download a partir do Google Drive

# ! pip install gdown   # retire o comentário
import gdown

url_covid = 'https://drive.google.com/uc?id=1uefwk3fhAhCWywoiozyJ7h0iLo0yMIpK'
output_covid = './files/dados_covid.csv' 

# Baixando dados sobre a Covid na cidade do Rio de Janeiro
path_dados_covid = gdown.download(url_covid, output_covid, quiet=False)

Downloading...
From: https://drive.google.com/uc?id=1uefwk3fhAhCWywoiozyJ7h0iLo0yMIpK
To: /application/application/files/dados_covid.csv
100% 31.5M/31.5M [00:02<00:00, 11.3MB/s]


In [5]:
! cd files && ls 

dados_covid.csv


### Vamos conhecer duas formas de manipular o conjunto de dados

<ul>
    <li>Utilizando o DataFrame;</ui>
    <li>A partir da construção de RDDs</ui>
 </ul>

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Primeiro criamos uma sessão do Spark
spark = (SparkSession
    .builder
    .appName("CovidApp")
    .getOrCreate())



In [7]:
# Pela abordagem direta
schema = """`CD_GEOCODB` STRING, 
            `NM_BAIRRO` STRING, 
            `dt_notific` STRING, 
            `sexo` STRING,
            `faixa_etaria` STRING, 
            `evolucao` STRING, 
            `raca_cor` STRING, 
            `data_atualizacao` STRING"""


df_dados_covid = spark.read.format("csv").\
              option("header", "true").\
              option("inferSchema", "false").\
              option("delimiter",';').\
              schema(schema).\
              load(path_dados_covid)


In [8]:
df_dados_covid.printSchema()

root
 |-- CD_GEOCODB: string (nullable = true)
 |-- NM_BAIRRO: string (nullable = true)
 |-- dt_notific: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- faixa_etaria: string (nullable = true)
 |-- evolucao: string (nullable = true)
 |-- raca_cor: string (nullable = true)
 |-- data_atualizacao: string (nullable = true)



In [9]:
df_dados_covid.show()

[Stage 0:>                                                          (0 + 1) / 1]

+------------+--------------------+----------+----+------------+----------+--------+----------------+
|  CD_GEOCODB|           NM_BAIRRO|dt_notific|sexo|faixa_etaria|  evolucao|raca_cor|data_atualizacao|
+------------+--------------------+----------+----+------------+----------+--------+----------------+
|330455705105|              COSMOS|06-11-2020|   F|  De 50 a 59|     Óbito|   Parda|      08-09-2021|
|330455705027|           MANGUEIRA|10-14-2020|   F|  De 40 a 49|Recuperado|Ignorado|      08-09-2021|
|330455705058|           ENCANTADO|12-04-2020|   F|  De 70 a 79|     Óbito|Ignorado|      08-09-2021|
|330455705076|       ROCHA MIRANDA|11-30-2020|   F|  De 90 a 99|Recuperado|   Preta|      08-09-2021|
|330455705135|RECREIO DOS BANDE...|11-30-2020|   M|  De 70 a 79|     Óbito|Ignorado|      08-09-2021|
|330455705079|       BENTO RIBEIRO|12-10-2020|   M|  De 60 a 69|Recuperado|Ignorado|      08-09-2021|
|330455705159|       VASCO DA GAMA|12-07-2020|   M|  De 70 a 79|Recuperado|   Pret

                                                                                

In [10]:
# trabalhando com RDD puro

rdd_covid = spark.sparkContext.textFile(path_dados_covid)

header = rdd_covid.first()
rdd_dados_covid = rdd_covid.filter(lambda t: t != header).map(lambda t: t.split(";"))#.map(lambda r: Row(int(r[0]),r[1], r[2], r[3], r[4], r[5], r[6], r[7]))

 

                                                                                

In [12]:
rdd_dados_covid.collect()   # collect é uma operação custosa!

                                                                                

[['330455705105',
  'COSMOS',
  '06-11-2020',
  'F',
  'De 50 a 59',
  'Óbito',
  'Parda',
  '08-09-2021'],
 ['330455705027',
  'MANGUEIRA',
  '10-14-2020',
  'F',
  'De 40 a 49',
  'Recuperado',
  'Ignorado',
  '08-09-2021'],
 ['330455705058',
  'ENCANTADO',
  '12-04-2020',
  'F',
  'De 70 a 79',
  'Óbito',
  'Ignorado',
  '08-09-2021'],
 ['330455705076',
  'ROCHA MIRANDA',
  '11-30-2020',
  'F',
  'De 90 a 99',
  'Recuperado',
  'Preta',
  '08-09-2021'],
 ['330455705135',
  'RECREIO DOS BANDEIRANTES',
  '11-30-2020',
  'M',
  'De 70 a 79',
  'Óbito',
  'Ignorado',
  '08-09-2021'],
 ['330455705079',
  'BENTO RIBEIRO',
  '12-10-2020',
  'M',
  'De 60 a 69',
  'Recuperado',
  'Ignorado',
  '08-09-2021'],
 ['330455705159',
  'VASCO DA GAMA',
  '12-07-2020',
  'M',
  'De 70 a 79',
  'Recuperado',
  'Preta',
  '08-09-2021'],
 ['330455705073',
  'MADUREIRA',
  '12-15-2020',
  'F',
  'De 70 a 79',
  'Óbito',
  'Branca',
  '08-09-2021'],
 ['330455705005',
  'CATUMBI',
  '12-27-2020',
  'M',
 

## Realizando algumas análises

### Total de categorias de evolução por bairro

O Painel Rio divide os casos de COVID-19 registrados em diferentes categorias de evolução, são eles:

<ul>
    <li>Óbito</li>
    <li>Recuperado</li>
    <li>Ativo </li>
 </ul>

In [37]:
from pyspark.sql.functions import *

In [33]:
# Vamos obter uma lista distinta de bairros
                        #----------------- transformação ------------------------------- # ação
bairros = df_dados_covid.select("NM_BAIRRO").distinct().rdd.map(lambda t: t['NM_BAIRRO']).collect()



In [35]:
len(bairros)

161

In [36]:
bairros

['MARÉ',
 'LEME',
 'COPACABANA',
 'HONÓRIO GURGEL',
 'CASCADURA',
 'SENADOR VASCONCELOS',
 'PARQUE COLUMBIA',
 'RIBEIRA',
 'COMPLEXO DO ALEMÃO',
 'HIGIENÓPOLIS',
 'TODOS OS SANTOS',
 'FREGUESIA (ILHA DO GOVERNADOR)',
 'RIACHUELO',
 'JARDIM CARIOCA',
 'OSWALDO CRUZ',
 'DEL CASTILHO',
 'PARADA DE LUCAS',
 'PIEDADE',
 'ZUMBI',
 'LAGOA',
 'CACHAMBI',
 'MANGUEIRA',
 'BENTO RIBEIRO',
 'PAVUNA',
 'PITANGUEIRAS',
 'GERICINÓ',
 'CORDOVIL',
 'VISTA ALEGRE',
 'PARQUE ANCHIETA',
 'ENGENHEIRO LEAL',
 'SANTO CRISTO',
 'VILA ISABEL',
 'GARDÊNIA AZUL',
 'GRAJAÚ',
 'ÁGUA SANTA',
 'VARGEM GRANDE',
 'RAMOS',
 'TAUÁ',
 'PRAÇA SECA',
 'ALTO DA BOA VISTA',
 'CACUIA',
 'LINS DE VASCONCELOS',
 'HUMAITÁ',
 'PRAIA DA BANDEIRA',
 'GUARATIBA',
 'CIDADE DE DEUS',
 'RICARDO DE ALBUQUERQUE',
 'MARACANÃ',
 'VIDIGAL',
 'VILA MILITAR',
 'SENADOR CAMARÁ',
 'SANTA CRUZ',
 'COELHO NETO',
 'VICENTE DE CARVALHO',
 'SAMPAIO',
 'INHOAIBA',
 'VASCO DA GAMA',
 'JARDIM BOTÂNICO',
 'ENCANTADO',
 'TIJUCA',
 'ANCHIETA',
 'VAZ LOBO'

In [62]:
# Utilizando Built-in Functions do DataFrame

df_dados_covid.select("NM_BAIRRO", "evolucao").\
               where(col("evolucao") == "Óbito").\
               groupBy("NM_BAIRRO", "evolucao").\
               agg(count("*").alias("Total de Óbitos")).\
               show()

+------------------+--------+---------------+
|         NM_BAIRRO|evolucao|Total de Óbitos|
+------------------+--------+---------------+
|             PENHA|   Óbito|            301|
|            MONERO|   Óbito|             39|
|     VASCO DA GAMA|   Óbito|             58|
|         GUADALUPE|   Óbito|            258|
|            COSMOS|   Óbito|            299|
| PRAIA DA BANDEIRA|   Óbito|             24|
|            GLÓRIA|   Óbito|             60|
|            OLARIA|   Óbito|            311|
|BARRA DE GUARATIBA|   Óbito|             25|
|            TIJUCA|   Óbito|            981|
|       VILA KOSMOS|   Óbito|             82|
|     VARGEM GRANDE|   Óbito|             55|
|            GALEÃO|   Óbito|             66|
|    VARGEM PEQUENA|   Óbito|             79|
|              TAUÁ|   Óbito|            119|
|      DEL CASTILHO|   Óbito|            105|
|     BENTO RIBEIRO|   Óbito|            250|
|            COCOTÁ|   Óbito|             24|
|           PIEDADE|   Óbito|     



In [101]:
# Usando processamento SQL-like

# criamos a view da tabela
df_dados_covid.createOrReplaceTempView("COVID_tbl")

In [113]:
# vamos obter o mesmo resultado anterior, porém utilizando SQL

spark.sql("""SELECT NM_BAIRRO, evolucao, COUNT(*) as `Total de Óbitos`
             FROM COVID_tbl
             WHERE evolucao == 'Óbito'
             GROUP BY NM_BAIRRO, evolucao
          """).show()

+------------------+--------+---------------+
|         NM_BAIRRO|evolucao|Total de Óbitos|
+------------------+--------+---------------+
|             PENHA|   Óbito|            301|
|            MONERO|   Óbito|             39|
|     VASCO DA GAMA|   Óbito|             58|
|         GUADALUPE|   Óbito|            258|
|            COSMOS|   Óbito|            299|
| PRAIA DA BANDEIRA|   Óbito|             24|
|            GLÓRIA|   Óbito|             60|
|            OLARIA|   Óbito|            311|
|BARRA DE GUARATIBA|   Óbito|             25|
|            TIJUCA|   Óbito|            981|
|       VILA KOSMOS|   Óbito|             82|
|     VARGEM GRANDE|   Óbito|             55|
|            GALEÃO|   Óbito|             66|
|    VARGEM PEQUENA|   Óbito|             79|
|              TAUÁ|   Óbito|            119|
|      DEL CASTILHO|   Óbito|            105|
|     BENTO RIBEIRO|   Óbito|            250|
|            COCOTÁ|   Óbito|             24|
|           PIEDADE|   Óbito|     



In [117]:
# obtenção do total de óbitos utilizando RDD puro

rdd_dados_covid.filter(lambda l: l[5] == 'Óbito').map(lambda l: (l[1], 1)).reduceByKey(lambda a,b : a+b).collect()

                                                                                

[('COSMOS', 299),
 ('ENCANTADO', 72),
 ('CATUMBI', 70),
 ('GLÓRIA', 60),
 ('GAMBOA', 43),
 ('PACIÊNCIA', 419),
 ('CAMPO GRANDE', 1764),
 ('SENADOR CAMARÁ', 417),
 ('SANTA CRUZ', 939),
 ('JACAREPAGUÁ', 553),
 ('REALENGO', 932),
 ('BANGU', 1228),
 ('BRÁS DE PINA', 284),
 ('MÉIER', 313),
 ('LINS DE VASCONCELOS', 161),
 ('RAMOS', 201),
 ('VILA ISABEL', 420),
 ('CENTRO', 309),
 ('PRAÇA SECA', 356),
 ('BARRA DA TIJUCA', 662),
 ('FREGUESIA (JACAREPAGUÁ)', 403),
 ('DEL CASTILHO', 105),
 ('CASCADURA', 151),
 ('ENGENHO NOVO', 222),
 ('ABOLIÇÃO', 85),
 ('SÃO CONRADO', 48),
 ('VIGÁRIO GERAL', 120),
 ('ENGENHO DA RAINHA', 129),
 ('PORTUGUESA', 117),
 ('PITANGUEIRAS', 43),
 ('INHOAIBA', 273),
 ('VILA DA PENHA', 161),
 ('RIO COMPRIDO', 234),
 ('LAGOA', 89),
 ('COSME VELHO', 31),
 ('OLARIA', 311),
 ('CIDADE DE DEUS', 170),
 ('TANQUE', 198),
 ('COLÉGIO', 141),
 ('JARDIM SULACAP', 80),
 ('VILA VALQUEIRE', 198),
 ('GARDÊNIA AZUL', 103),
 ('VARGEM GRANDE', 55),
 ('JARDIM CARIOCA', 121),
 ('ROCHA MIRANDA',

## Exercícío:

(1) Aplique o mesmo raciocínio de agregação para as demais categorias de evolução de pacientes COVID-19 <br>
(2) Experimente as três abordagens

### Cálculo de Mortalidade Segundo o Gênero

In [170]:
# Utilizando Built-in Functions do DataFrame

df_dados_covid.select("NM_BAIRRO","sexo", "evolucao").\
               where(col("evolucao") == "Óbito").where(col("NM_BAIRRO") == "COSMOS").\
               groupBy("NM_BAIRRO","sexo", "evolucao").\
               agg(count("*").alias("Total de Óbitos")).\
               withColumn("Mortalidade %", 100*(col("`Total de Óbitos`")/ df_dados_covid.filter(col("NM_BAIRRO")=="COSMOS").filter(col("evolucao") == "Óbito").count()) ).\
               show()

+---------+----+--------+---------------+-----------------+
|NM_BAIRRO|sexo|evolucao|Total de Óbitos|    Mortalidade %|
+---------+----+--------+---------------+-----------------+
|   COSMOS|   M|   Óbito|            158|52.84280936454849|
|   COSMOS|   F|   Óbito|            141|47.15719063545151|
+---------+----+--------+---------------+-----------------+



## Exercícío:

(1) Experimente encontrar outras relações, como taxa de curados por bairro, sexo e evolução

## Exercício: <br>

(1) Realize operações sobre o arquivo <i>sales.csv</i>

In [171]:
url_sales = 'https://drive.google.com/uc?id=1g1tZmCmwxI8NtkHFJEse0IohwFbEWf52'
output_sales = './files/sales.csv' 

# Baixando dados sobre vendas de uma empresa de informática
path_dados_sales= gdown.download(url_sales, output_sales, quiet=False)

Downloading...
From: https://drive.google.com/uc?id=1g1tZmCmwxI8NtkHFJEse0IohwFbEWf52
To: /application/application/files/sales.csv
100% 204k/204k [00:00<00:00, 889kB/s]


In [172]:
! cd files && ls 

dados_covid.csv  sales.csv
