**Aluno: Danilo Henrique Achcar**<br/>
**RM: 351516**

# 1) Exploração inicial da base do PNAD COVID19 com Apache Spark

## 1.1) Instalando dependências

In [None]:
!pip install pyspark



## 1.2) Apache Spark

### 1.2.1) Iniciando a sessão do Spark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.Enable', True)
spark

### 1.2.2) Criando um validador de schema para as bases de dados
Esta etapa tem como intuíto carregar as bases .csv, utilizando um schema pré-definido, de forma a validar automaticamente e realizar os devidos "casts" das colunas da base.

In [None]:
from pyspark.sql.types import *

# labels do schema para converter os valores do CSV corretamente
labels = [
  ('Ano', IntegerType()),
  ('UF', IntegerType()),
  ('CAPITAL', IntegerType()),
  ('RM_RIDE', IntegerType()),
  ('V1008', IntegerType()),
  ('V1012', IntegerType()),
  ('V1013', IntegerType()),
  ('V1016', IntegerType()),
  ('Estrato', IntegerType()),
  ('UPA', IntegerType()),
  ('V1022', IntegerType()),
  ('V1023', IntegerType()),
  ('V1030', IntegerType()),
  ('V1031', DoubleType()),
  ('V1032', DoubleType()),
  ('posest', IntegerType()),
  ('A001A', IntegerType()),
  ('A001', IntegerType()),
  ('A001B1', IntegerType()),
  ('A001B2', IntegerType()),
  ('A001B3', IntegerType()),
  ('A002', IntegerType()),
  ('A003', IntegerType()),
  ('A004', IntegerType()),
  ('A005', IntegerType()),
  ('A006', IntegerType()),
  ('A006A', IntegerType()),
  ('A006B', IntegerType()),
  ('A007', IntegerType()),
  ('A007A', IntegerType()),
  ('A008', IntegerType()),
  ('A009', IntegerType()),
  ('B0011', IntegerType()),
  ('B0012', IntegerType()),
  ('B0013', IntegerType()),
  ('B0014', IntegerType()),
  ('B0015', IntegerType()),
  ('B0016', IntegerType()),
  ('B0017', IntegerType()),
  ('B0018', IntegerType()),
  ('B0019', IntegerType()),
  ('B00110', IntegerType()),
  ('B00111', IntegerType()),
  ('B00112', IntegerType()),
  ('B00113', IntegerType()),
  ('B002', IntegerType()),
  ('B0031', IntegerType()),
  ('B0032', IntegerType()),
  ('B0033', IntegerType()),
  ('B0034', IntegerType()),
  ('B0035', IntegerType()),
  ('B0036', IntegerType()),
  ('B0037', IntegerType()),
  ('B0041', IntegerType()),
  ('B0042', IntegerType()),
  ('B0043', IntegerType()),
  ('B0044', IntegerType()),
  ('B0045', IntegerType()),
  ('B0046', IntegerType()),
  ('B005', IntegerType()),
  ('B006', IntegerType()),
  ('B007', IntegerType()),
  ('B008', IntegerType()),
  ('B009A', IntegerType()),
  ('B009B', IntegerType()),
  ('B009C', IntegerType()),
  ('B009D', IntegerType()),
  ('B009E', IntegerType()),
  ('B009F', IntegerType()),
  ('B0101', IntegerType()),
  ('B0102', IntegerType()),
  ('B0103', IntegerType()),
  ('B0104', IntegerType()),
  ('B0105', IntegerType()),
  ('B0106', IntegerType()),
  ('B011', IntegerType()),
  ('C001', IntegerType()),
  ('C002', IntegerType()),
  ('C003', IntegerType()),
  ('C004', IntegerType()),
  ('C005', IntegerType()),
  ('C0051', IntegerType()),
  ('C0052', IntegerType()),
  ('C0053', IntegerType()),
  ('C006', IntegerType()),
  ('C007', IntegerType()),
  ('C007A', IntegerType()),
  ('C007B', IntegerType()),
  ('C007C', IntegerType()),
  ('C007D', IntegerType()),
  ('C007E', IntegerType()),
  ('C007E1', IntegerType()),
  ('C007E2', IntegerType()),
  ('C007F', IntegerType()),
  ('C008', IntegerType()),
  ('C009', IntegerType()),
  ('C009A', IntegerType()),
  ('C010', IntegerType()),
  ('C0101', IntegerType()),
  ('C01011', IntegerType()),
  ('C01012', DoubleType()),
  ('C0102', IntegerType()),
  ('C01021', IntegerType()),
  ('C01022', IntegerType()),
  ('C0103', IntegerType()),
  ('C0104', IntegerType()),
  ('C011A', IntegerType()),
  ('C011A1', IntegerType()),
  ('C011A11', IntegerType()),
  ('C011A12', DoubleType()),
  ('C011A2', IntegerType()),
  ('C011A21', IntegerType()),
  ('C011A22', DoubleType()),
  ('C012', IntegerType()),
  ('C013', IntegerType()),
  ('C014', IntegerType()),
  ('C015', IntegerType()),
  ('C016', IntegerType()),
  ('C017A', IntegerType()),
  ('D0011', IntegerType()),
  ('D0013', DoubleType()),
  ('D0021', IntegerType()),
  ('D0023', DoubleType()),
  ('D0031', IntegerType()),
  ('D0033', DoubleType()),
  ('D0041', IntegerType()),
  ('D0043', DoubleType()),
  ('D0051', IntegerType()),
  ('D0053', DoubleType()),
  ('D0061', IntegerType()),
  ('D0063', DoubleType()),
  ('D0071', IntegerType()),
  ('D0073', DoubleType()),
  ('E001', IntegerType()),
  ('E0021', IntegerType()),
  ('E0022', IntegerType()),
  ('E0023', IntegerType()),
  ('E0024', IntegerType()),
  ('F001', IntegerType()),
  ('F0021', DoubleType()),
  ('F0022', IntegerType()),
  ('F002A1', IntegerType()),
  ('F002A2', IntegerType()),
  ('F002A3', IntegerType()),
  ('F002A4', IntegerType()),
  ('F002A5', IntegerType()),
  ('F0061', IntegerType()),
  ('F006', IntegerType()),
]

In [None]:
# schema para conversão
schema = StructType([StructField(x[0], x[1], True) for x in labels])
schema

StructType([StructField('Ano', IntegerType(), True), StructField('UF', IntegerType(), True), StructField('CAPITAL', IntegerType(), True), StructField('RM_RIDE', IntegerType(), True), StructField('V1008', IntegerType(), True), StructField('V1012', IntegerType(), True), StructField('V1013', IntegerType(), True), StructField('V1016', IntegerType(), True), StructField('Estrato', IntegerType(), True), StructField('UPA', IntegerType(), True), StructField('V1022', IntegerType(), True), StructField('V1023', IntegerType(), True), StructField('V1030', IntegerType(), True), StructField('V1031', DoubleType(), True), StructField('V1032', DoubleType(), True), StructField('posest', IntegerType(), True), StructField('A001A', IntegerType(), True), StructField('A001', IntegerType(), True), StructField('A001B1', IntegerType(), True), StructField('A001B2', IntegerType(), True), StructField('A001B3', IntegerType(), True), StructField('A002', IntegerType(), True), StructField('A003', IntegerType(), True), S

### 1.2.3) Lendo a base de dados de Setembro 2020

Neste momento, a validação das bases é feita de forma manual, portanto precisamos fazer o upload dos arquivos .csv no ambiente do Colabs.

In [None]:
%%time
df_set = spark.read.csv('/content/PNAD_COVID_092020.csv', header=True, sep=',', schema=schema)
df_set.createOrReplaceTempView('df_set')
df_set.show(5)
print(f'Total registros set/2020: {df_set.count()}')

+----+---+-------+-------+-----+-----+-----+-----+-------+---------+-----+-----+------+------------+------------+------+-----+----+------+------+------+----+----+----+----+----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+----+----+-----+-----+-----+----+----+-----+-----+-----+-----+-----+------+------+-----+----+----+-----+----+-----+------+------+-----+------+------+-----+-----+-----+------+-------+-------+------+-------+-------+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+----+-----+-----+------+------+------+------+------+-----+----+
| Ano| UF|CAPITAL|RM_RIDE|V1008|V1012|V1013|V1016|Estrato|      UPA|V1022|V1023| V1030|     

### 1.2.4) Lendo a base de dados de Outubro 2020

In [None]:
%%time
df_out = spark.read.csv('/content/PNAD_COVID_102020.csv', header=True, sep=',', schema=schema)
df_out.createOrReplaceTempView('df_out')
df_out.show(5)
print(f'Total registros out/2020: {df_out.count()}')

+----+---+-------+-------+-----+-----+-----+-----+-------+---------+-----+-----+------+------------+------------+------+-----+----+------+------+------+----+----+----+----+----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+----+----+-----+-----+-----+----+----+-----+-----+-----+-----+-----+------+------+-----+----+----+-----+----+-----+------+------+-----+------+------+-----+-----+-----+------+-------+-------+------+-------+-------+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+----+-----+-----+------+------+------+------+------+-----+----+
| Ano| UF|CAPITAL|RM_RIDE|V1008|V1012|V1013|V1016|Estrato|      UPA|V1022|V1023| V1030|     

### 1.2.5) Lendo a base de dados de Novembro 2020

In [None]:
%%time
df_nov = spark.read.csv('/content/PNAD_COVID_112020.csv', header=True, sep=',', schema=schema)
df_nov.createOrReplaceTempView('df_nov')
df_nov.show(5)
print(f'Total registros nov/2020: {df_nov.count()}')

+----+---+-------+-------+-----+-----+-----+-----+-------+---------+-----+-----+------+------------+------------+------+-----+----+------+------+------+----+----+----+----+----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+----+----+-----+-----+-----+----+----+-----+-----+-----+-----+-----+------+------+-----+----+----+-----+----+-----+------+------+-----+------+------+-----+-----+-----+------+-------+-------+------+-------+-------+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+----+-----+-----+------+------+------+------+------+-----+----+
| Ano| UF|CAPITAL|RM_RIDE|V1008|V1012|V1013|V1016|Estrato|      UPA|V1022|V1023| V1030|    

In [None]:
# verificando se o schema processado é o schema que foi definido de fato
df_nov.printSchema()

root
 |-- Ano: integer (nullable = true)
 |-- UF: integer (nullable = true)
 |-- CAPITAL: integer (nullable = true)
 |-- RM_RIDE: integer (nullable = true)
 |-- V1008: integer (nullable = true)
 |-- V1012: integer (nullable = true)
 |-- V1013: integer (nullable = true)
 |-- V1016: integer (nullable = true)
 |-- Estrato: integer (nullable = true)
 |-- UPA: integer (nullable = true)
 |-- V1022: integer (nullable = true)
 |-- V1023: integer (nullable = true)
 |-- V1030: integer (nullable = true)
 |-- V1031: double (nullable = true)
 |-- V1032: double (nullable = true)
 |-- posest: integer (nullable = true)
 |-- A001A: integer (nullable = true)
 |-- A001: integer (nullable = true)
 |-- A001B1: integer (nullable = true)
 |-- A001B2: integer (nullable = true)
 |-- A001B3: integer (nullable = true)
 |-- A002: integer (nullable = true)
 |-- A003: integer (nullable = true)
 |-- A004: integer (nullable = true)
 |-- A005: integer (nullable = true)
 |-- A006: integer (nullable = true)
 |-- A006A: 

### 1.2.6) Checando os totais de cada base de dados

In [None]:
# totais de cada base de dados
total_set = df_set.count()
total_out = df_out.count()
total_nov = df_nov.count()
total = total_set + total_out + total_nov

print('Total de registros set/2020', total_set)
print('Total de registros out/2020', total_out)
print('Total de registros nov/2020', total_nov)
print('Total de registros', total)

Total de registros set/2020 387298
Total de registros out/2020 380461
Total de registros nov/2020 381438
Total de registros 1149197


### 1.2.7) Verificando se a quantidade de registros bate após a união das 3 bases numa consolidada

In [None]:
%%time
df = df_set.union(df_out).union(df_nov)
df.count()

CPU times: user 15.6 ms, sys: 1.97 ms, total: 17.6 ms
Wall time: 2.29 s


1149197

### 1.2.8) Verificando se existem registros duplicados

In [None]:
%%time
df_deduplicated = df.dropDuplicates()
df_deduplicated.count()

CPU times: user 573 ms, sys: 56.8 ms, total: 630 ms
Wall time: 1min 36s


1149197

In [None]:
# exporta a base de dados para CSV, para utilização futura
df.write.csv('dados_tratados')

In [None]:
# teste da base exportada
df = spark.read.csv('dados_tratados', header=True, schema=schema)
df.count()

1149191

In [None]:
df.show(5)

+----+---+-------+-------+-----+-----+-----+-----+-------+---------+-----+-----+------+------------+------------+------+-----+----+------+------+------+----+----+----+----+----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+----+----+-----+-----+-----+----+----+-----+-----+-----+-----+-----+------+------+-----+----+----+-----+----+-----+------+------+-----+------+------+-----+-----+-----+------+-------+-------+------+-------+-------+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+----+-----+-----+------+------+------+------+------+-----+----+
| Ano| UF|CAPITAL|RM_RIDE|V1008|V1012|V1013|V1016|Estrato|      UPA|V1022|V1023| V1030|     