# Analisando o COVID19 com Spark

Como exercício e treinamento de meus estudos, segue pequena análise referente ao assunto mais comentado de 2020, o novo corona vírus. O notebook ainda está em testes e poderá ser aprimorado/atualizado.

OBS: Este notebook foi escrito pelo Colab e por isso é necessário alguns códigos abaixo.

**Fernanda de Siqueira Teixeira**

#### Utilizando a base do covid disponivel em: https://drive.google.com/file/d/1ZOcIJslOAdgMgltQrm9hEzCzO13pOmgw/view?usp=sharing

Colunas:

**city:** nome do município (pode estar em branco quando o registro é referente ao estado, pode ser preenchido com Importados/Indefinidos também).

**city_ibge_code:** código IBGE do local.

**date:** data de coleta dos dados no formato YYYY-MM-DD.

**epidemiological_week:** número da semana epidemiológica.

**estimated_population_2019: **população estimada para esse município/estado em 2019, segundo o IBGE (acesse o script que faz o download e conversão dos dados de população).

**is_last:** campo pré-computado que diz se esse registro é o mais novo para esse local, pode ser True ou False (caso filtre por esse campo, use is_last=True ou is_last=False, não use o valor em minúsculas).

**is_repeated:** campo pré-computado que diz se as informações nesse registro foram publicadas pela Secretaria Estadual de Saúde no dia date ou se o dado é repetido do último dia em que o dado está disponível (igual ou anterior a date). Isso ocorre pois nem todas as secretarias publicam boletins todos os dias. Veja também o campo last_available_date.

**last_available_confirmed:** número de casos confirmados do último dia disponível igual ou anterior à data date.

**last_available_confirmed_per_100k_inhabitants:** número de casos confirmados por 100.000 habitantes do último dia disponível igual ou anterior à data date.

**last_available_date:** data da qual o dado se refere.

**last_available_death_rate:** taxa de mortalidade (mortes / confirmados) do último dia disponível igual ou anterior à data date.

**last_available_deaths:** número de mortes do último dia disponível igual ou anterior à data date.

**order_for_place:** número que identifica a ordem do registro para este local. O registro referente ao primeiro boletim em que esse local aparecer será contabilizado como 1 e os demais boletins incrementarão esse valor.

**place_type:** tipo de local que esse registro descreve, pode ser city ou state.

**state:** sigla da unidade federativa, exemplo: SP.

**new_confirmed:** número de novos casos confirmados desde o último dia (note que caso is_repeated seja True, esse valor sempre será 0 e que esse valor pode ser negativo caso a SES remaneje os casos desse município para outro).

**new_deaths:** número de novos óbitos desde o último dia (note que caso is_repeated seja True, esse valor sempre será 0 e que esse valor pode ser negativo caso a SES remaneje os casos desse município para outro).

**Desenvolvimento**

In [1]:
# Instalando o Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"
import findspark
findspark.init()

In [2]:
# Inicializando a sessão do spark
# O master local * é pra usar todas as threads
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

In [3]:
# Fazendo upload de um arquivo pro colab
from google.colab import files
files.upload()

Saving covid19.csv to covid19.csv


{'covid19.csv': b"epidemiological_week,date,order_for_place,state,city,city_ibge_code,place_type,last_available_confirmed,last_available_confirmed_per_100k_inhabitants,new_confirmed,last_available_deaths,new_deaths,last_available_death_rate,estimated_population_2019,is_last,is_repeated\r\n28,2020-07-09,63,MG,Abadia dos Dourados,3100104,city,5,71.54099,0,0,0,0.0,6989,True,False\r\n28,2020-07-09,89,MG,Abaet\xc3\xa9,3100203,city,20,86.06963,0,0,0,0.0,23237,True,False\r\n28,2020-07-09,32,MG,Abre Campo,3100302,city,9,66.8946,0,0,0,0.0,13454,True,False\r\n28,2020-07-09,22,MG,Acaiaca,3100401,city,11,275.41312,0,1,0,0.0909,3994,True,False\r\n28,2020-07-09,52,MG,A\xc3\xa7ucena,3100500,city,18,190.07392,2,1,0,0.0556,9470,True,False\r\n28,2020-07-09,41,MG,\xc3\x81gua Comprida,3100708,city,2,100.05003,0,0,0,0.0,1999,True,False\r\n28,2020-07-09,22,MG,Aguanil,3100807,city,5,111.45787,0,0,0,0.0,4486,True,False\r\n28,2020-07-09,36,MG,\xc3\x81guas Formosas,3100906,city,18,93.71583,5,0,0,0.0,19207,True,

In [4]:
#importando
from pyspark.sql import functions as f

Após salvar os arquivos baixados utilizaremos a função .read_csv para carregar a base de dados. Nomearemos como df.

In [5]:
# lendo o arquivo csv
df = spark.read.csv('covid19.csv', inferSchema=True, header=True)
df.show()

+--------------------+----------+---------------+-----+-------------------+--------------+----------+------------------------+---------------------------------------------+-------------+---------------------+----------+-------------------------+-------------------------+-------+-----------+
|epidemiological_week|      date|order_for_place|state|               city|city_ibge_code|place_type|last_available_confirmed|last_available_confirmed_per_100k_inhabitants|new_confirmed|last_available_deaths|new_deaths|last_available_death_rate|estimated_population_2019|is_last|is_repeated|
+--------------------+----------+---------------+-----+-------------------+--------------+----------+------------------------+---------------------------------------------+-------------+---------------------+----------+-------------------------+-------------------------+-------+-----------+
|                  28|2020-07-09|             63|   MG|Abadia dos Dourados|       3100104|      city|                       

Para visualizar as colunas iremos usar a função .columns que permite verificar os nomes de todas as colunas da base de dados.

In [6]:
df.columns

['epidemiological_week',
 'date',
 'order_for_place',
 'state',
 'city',
 'city_ibge_code',
 'place_type',
 'last_available_confirmed',
 'last_available_confirmed_per_100k_inhabitants',
 'new_confirmed',
 'last_available_deaths',
 'new_deaths',
 'last_available_death_rate',
 'estimated_population_2019',
 'is_last',
 'is_repeated']

Para verificar os números de itens distintos iremos usar a função .distinct(),count() 

In [7]:
df.distinct().count()

38285

Verificando se existem dados nulos. Caso sim, iremos remove-los.

In [8]:
#verificando se existem dados nulos 
df.describe().show()

+-------+--------------------+----------+-----------------+-----+-------------------+------------------+----------+------------------------+---------------------------------------------+------------------+---------------------+-------------------+-------------------------+-------------------------+
|summary|epidemiological_week|      date|  order_for_place|state|               city|    city_ibge_code|place_type|last_available_confirmed|last_available_confirmed_per_100k_inhabitants|     new_confirmed|last_available_deaths|         new_deaths|last_available_death_rate|estimated_population_2019|
+-------+--------------------+----------+-----------------+-----+-------------------+------------------+----------+------------------------+---------------------------------------------+------------------+---------------------+-------------------+-------------------------+-------------------------+
|  count|               38285|     38285|            38285|38285|              38161|             38

Observa-se que as colunas que possuem dados faltantes são: 
- **city**:  38161 dados do tipo object 
- **city_ibge_code**: 38174 dados do tipo  float64  
- **last_available_confirmed_per_100k_inhabitants**: 37090 dados do tipo  float64
- **estimated_population_2019**: 38174 dados do tipo float64

In [9]:
# Excluindo os dados faltantes utilizando subset
data = df.dropna(subset=['city', 'city_ibge_code', 'estimated_population_2019', 'last_available_confirmed_per_100k_inhabitants'])
data.count()

36966

In [10]:
# Como optamos por excluir todos os dados faltantes, podemos utilizar essa maneira que é mais fácil 
df = df.dropna(how='any')
df.count()

36966

Mostrando o Schema do DataFrame. Isso é importante pois conseguimos verificar o tipo da coluna.

In [11]:
df.printSchema()

root
 |-- epidemiological_week: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- order_for_place: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- city_ibge_code: integer (nullable = true)
 |-- place_type: string (nullable = true)
 |-- last_available_confirmed: integer (nullable = true)
 |-- last_available_confirmed_per_100k_inhabitants: double (nullable = true)
 |-- new_confirmed: integer (nullable = true)
 |-- last_available_deaths: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- last_available_death_rate: double (nullable = true)
 |-- estimated_population_2019: integer (nullable = true)
 |-- is_last: boolean (nullable = true)
 |-- is_repeated: boolean (nullable = true)



Para visualizar os dados vamos usar a função .head() que permitirá verificar as 5 primeiras linhas como um vetor da base de dados.

In [12]:
df.head(5)

[Row(epidemiological_week=28, date='2020-07-09', order_for_place=63, state='MG', city='Abadia dos Dourados', city_ibge_code=3100104, place_type='city', last_available_confirmed=5, last_available_confirmed_per_100k_inhabitants=71.54099, new_confirmed=0, last_available_deaths=0, new_deaths=0, last_available_death_rate=0.0, estimated_population_2019=6989, is_last=True, is_repeated=False),
 Row(epidemiological_week=28, date='2020-07-09', order_for_place=89, state='MG', city='Abaeté', city_ibge_code=3100203, place_type='city', last_available_confirmed=20, last_available_confirmed_per_100k_inhabitants=86.06963, new_confirmed=0, last_available_deaths=0, new_deaths=0, last_available_death_rate=0.0, estimated_population_2019=23237, is_last=True, is_repeated=False),
 Row(epidemiological_week=28, date='2020-07-09', order_for_place=32, state='MG', city='Abre Campo', city_ibge_code=3100302, place_type='city', last_available_confirmed=9, last_available_confirmed_per_100k_inhabitants=66.8946, new_con

Para visualizar os dados em formato de Pandas DataFrame vamos usar a função .toPandas(), limitando a 10 linhas

In [13]:
df.limit(10).toPandas()

Unnamed: 0,epidemiological_week,date,order_for_place,state,city,city_ibge_code,place_type,last_available_confirmed,last_available_confirmed_per_100k_inhabitants,new_confirmed,last_available_deaths,new_deaths,last_available_death_rate,estimated_population_2019,is_last,is_repeated
0,28,2020-07-09,63,MG,Abadia dos Dourados,3100104,city,5,71.54099,0,0,0,0.0,6989,True,False
1,28,2020-07-09,89,MG,Abaeté,3100203,city,20,86.06963,0,0,0,0.0,23237,True,False
2,28,2020-07-09,32,MG,Abre Campo,3100302,city,9,66.8946,0,0,0,0.0,13454,True,False
3,28,2020-07-09,22,MG,Acaiaca,3100401,city,11,275.41312,0,1,0,0.0909,3994,True,False
4,28,2020-07-09,52,MG,Açucena,3100500,city,18,190.07392,2,1,0,0.0556,9470,True,False
5,28,2020-07-09,41,MG,Água Comprida,3100708,city,2,100.05003,0,0,0,0.0,1999,True,False
6,28,2020-07-09,22,MG,Aguanil,3100807,city,5,111.45787,0,0,0,0.0,4486,True,False
7,28,2020-07-09,36,MG,Águas Formosas,3100906,city,18,93.71583,5,0,0,0.0,19207,True,False
8,28,2020-07-09,51,MG,Águas Vermelhas,3101003,city,25,184.65175,0,0,0,0.0,13539,True,False
9,28,2020-07-09,67,MG,Aimorés,3101102,city,108,429.13339,5,0,0,0.0,25167,True,False


Mostrando o sumário do DataFrame

In [14]:
# Demonstra dados estátisticos sobre o DataFrame
df.describe().show()

+-------+--------------------+----------+------------------+-----+-------------------+------------------+----------+------------------------+---------------------------------------------+------------------+---------------------+--------------------+-------------------------+-------------------------+
|summary|epidemiological_week|      date|   order_for_place|state|               city|    city_ibge_code|place_type|last_available_confirmed|last_available_confirmed_per_100k_inhabitants|     new_confirmed|last_available_deaths|          new_deaths|last_available_death_rate|estimated_population_2019|
+-------+--------------------+----------+------------------+-----+-------------------+------------------+----------+------------------------+---------------------------------------------+------------------+---------------------+--------------------+-------------------------+-------------------------+
|  count|               36966|     36966|             36966|36966|              36966|        

Para mostrar o sumário de apenas as colunas de formato inteiro, double ou float, há duas maneiras:

In [15]:
#Maneira 1: Excluindo da visualização as colunas que não são desse formato
df.select(list(set(df.columns) - set(['date', 'city', 'state']))).describe().toPandas()

Unnamed: 0,summary,epidemiological_week,last_available_death_rate,new_deaths,last_available_confirmed_per_100k_inhabitants,city_ibge_code,last_available_confirmed,order_for_place,estimated_population_2019,new_confirmed,place_type,last_available_deaths
0,count,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0,36966,36966.0
1,mean,23.512741438078237,0.0499384596656393,0.0385489368608992,90.80157201617978,3136545.224801169,38.20264567440351,33.914705404966725,46745.24866093167,1.7965968728020345,,0.8949845804252556
2,stddev,3.5063101270362544,0.1646449289042188,0.3706413685439136,156.29838910234386,20665.125182452684,242.80243803143276,24.437566273487075,160185.84836741397,19.19369937811108,,5.254168358399579
3,min,11.0,0.0,-2.0,0.03981,3100104.0,1.0,1.0,1502.0,-127.0,city,0.0
4,max,28.0,1.0,23.0,2713.07909,3172202.0,9361.0,124.0,2512070.0,2756.0,city,223.0


In [16]:
#Maneira 2: Selecionando apenas as colunas numéricas
numericalColumns = [campo for (campo,tipo) in df.dtypes if tipo == 'int' or tipo == 'double']
df.select(numericalColumns).describe().toPandas()

Unnamed: 0,summary,epidemiological_week,order_for_place,city_ibge_code,last_available_confirmed,last_available_confirmed_per_100k_inhabitants,new_confirmed,last_available_deaths,new_deaths,last_available_death_rate,estimated_population_2019
0,count,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0,36966.0
1,mean,23.512741438078237,33.914705404966725,3136545.224801169,38.20264567440351,90.80157201617978,1.7965968728020345,0.8949845804252556,0.0385489368608992,0.0499384596656393,46745.24866093167
2,stddev,3.5063101270362544,24.437566273487075,20665.125182452684,242.80243803143276,156.29838910234386,19.19369937811108,5.254168358399579,0.3706413685439136,0.1646449289042188,160185.84836741397
3,min,11.0,1.0,3100104.0,1.0,0.03981,-127.0,0.0,-2.0,0.0,1502.0
4,max,28.0,124.0,3172202.0,9361.0,2713.07909,2756.0,223.0,23.0,1.0,2512070.0


**Agora vamos para alguns desafios:**

1. Mostrando a média de confirmados por 100k habitantes e media da taxa de mortalidade

In [17]:
df.select(f.mean('last_available_death_rate'), f.mean('last_available_confirmed_per_100k_inhabitants')).toPandas()

Unnamed: 0,avg(last_available_death_rate),avg(last_available_confirmed_per_100k_inhabitants)
0,0.049938,90.801572


2. Mostrando a razão entre o ultimo número de confirmados e a população da cidade -> last_available_confirmed/estimated_population_2019

In [18]:
df.withColumn('razao_mortos_confirmados', f.col('last_available_confirmed')/f.col('estimated_population_2019')).select('razao_mortos_confirmados').toPandas()

Unnamed: 0,razao_mortos_confirmados
0,0.000715
1,0.000861
2,0.000669
3,0.002754
4,0.001901
...,...
36961,0.000004
36962,0.000004
36963,0.000004
36964,0.000004


3. Qual o cidade teve o maior número de novos casos(new_confimed)?

In [19]:
df.filter(f.col('new_confirmed') == df.select(f.max('new_confirmed')).collect()[0][0]).select('city').toPandas()

Unnamed: 0,city
0,Uberlândia


4. Em qual semana epidemiologica isso aconteceu?

In [20]:
df.filter(f.col('new_confirmed') == df.select(f.max('new_confirmed')).collect()[0][0]).select('epidemiological_week').toPandas()

Unnamed: 0,epidemiological_week
0,26


5. Qual a média de novos casos por cidade?

In [21]:
df.groupBy('city').agg(f.mean('new_confirmed').alias('media_novos_casos')).toPandas()

Unnamed: 0,city,media_novos_casos
0,Fronteira,0.283582
1,Piranguinho,0.333333
2,Presidente Olegário,0.758621
3,Rio Novo,0.174603
4,Carrancas,0.038462
...,...,...
749,Reduto,0.785714
750,Glaucilândia,0.222222
751,Pedra Dourada,0.075758
752,Santo Antônio do Amparo,1.160714


6. Ordenando de forma descendente a média dos novos casos por cidade

In [22]:
df.groupBy('city').agg(f.mean('new_confirmed').alias('media_novos_casos')).orderBy(f.desc('media_novos_casos')).toPandas()

Unnamed: 0,city,media_novos_casos
0,Belo Horizonte,80.698276
1,Uberlândia,72.869565
2,Ipatinga,26.000000
3,Juiz de Fora,19.720339
4,Governador Valadares,16.528302
...,...,...
749,Guarará,0.018182
750,Ilicínea,0.015625
751,Divisa Nova,0.014085
752,Icaraí de Minas,0.014085


7. Gerando a relação ordenada de forma descendente entre a media de novos casos e a população da cidade

In [23]:
df.groupBy('city').agg(f.mean('new_confirmed').alias('media_novos_casos'), 
                                  f.mean('estimated_population_2019').alias('populacao'))\
                                  .withColumn('razao_casos_por', f.col('media_novos_casos')/f.col('populacao'))\
                                  .orderBy(f.desc('razao_casos_por')).toPandas()

Unnamed: 0,city,media_novos_casos,populacao,razao_casos_por
0,Bandeira,4.100000,4795.0,0.000855
1,Morada Nova de Minas,7.500000,8863.0,0.000846
2,Albertina,2.000000,3007.0,0.000665
3,Comendador Gomes,1.923077,3111.0,0.000618
4,Alvarenga,2.000000,3907.0,0.000512
...,...,...,...,...
749,Francisco Badaró,0.020833,10332.0,0.000002
750,Alterosa,0.027778,14466.0,0.000002
751,Igaratinga,0.018182,10860.0,0.000002
752,Ilicínea,0.015625,12375.0,0.000001


8. Em quantos registos o número de novas mortes é maior que 0?

In [24]:
df.filter('new_deaths > 0').count()

933

9. Quantos % da base tem o número de novas mortes maior que zero?

In [25]:
df.filter('new_deaths > 0').count()/df.count()

0.02523940918682032

10. Mostrando a média de casos novos por dia na semana

In [26]:
df.groupBy(f.dayofweek('date')).avg('new_confirmed').orderBy('dayofweek(date)').toPandas()

Unnamed: 0,dayofweek(date),avg(new_confirmed)
0,1,1.453917
1,2,1.016521
2,3,1.008977
3,4,2.229814
4,5,2.211666
5,6,2.904724
6,7,1.776973


11. Mostrando a média de novos casos por mês?

In [27]:
df.groupBy(f.month('date')).avg('new_confirmed').orderBy('month(date)').toPandas()

Unnamed: 0,month(date),avg(new_confirmed)
0,3,0.89701
1,4,0.479244
2,5,0.891474
3,6,1.975673
4,7,3.324782
