
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
import pyspark.sql.functions as F

In [0]:
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# Read CSV files
df_09 = spark.read.format("csv") \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/PNAD_COVID_092020.csv")

df_10 = spark.read.format("csv") \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/PNAD_COVID_102020.csv")

df_11 = spark.read.format("csv") \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/PNAD_COVID_112020.csv")

In [0]:
# Drop duplicates
df_09 = df_09.dropDuplicates()
df_10 = df_10.dropDuplicates()
df_11 = df_11.dropDuplicates()

# Exclude df_11 exclusive columns
df_11 = df_11.drop(*['A006A','A006B','A007A'])

# Appending dfs into one df
df_pnad = df_09.union(df_10)
df_pnad = df_pnad.union(df_11)

In [0]:
# Filter columns
db = df_pnad.select(
    F.col("Ano").alias("ano"),
    F.col("UF").alias("uf"),
    F.col("CAPITAL").alias("capital"),
    F.col("RM_RIDE").alias("rm_ride"),
    F.col("V1008").alias("n_domicilio"),
    F.col("V1012").alias("n_semana"),
    F.col("V1013").alias("mes_pesquisa"),
    F.col("V1016").alias("n_entrevista_domicilio"),
    F.col("Estrato").alias("estrato"),
    F.col("UPA").alias("unidade_primaria_amostragem"),
    F.col("V1022").alias("situacao_domicilio"),
    F.col("V1023").alias("tipo_area"),
    F.col("V1030").alias("projecao_populacao"),
    F.col("V1031").alias("peso_domicilio_pessoas_com_pos"),
    F.col("V1032").alias("peso_domicilio_pessoas_sem_pos"),
    F.col("posest").alias("dominios_projecao"),

    # Parte A - Características gerais dos moradores
    F.col("A002").alias("idade_morador"),  # 1
    F.col("A003").alias("sexo"),           # 2

    # Parte B - COVID19 - Todos os moradores
    F.col("B0031").alias("ficou_em_casa"),                     # 3
    F.col("B0041").alias("buscou_atendimento_posto_ubs_esf"),  # 4
    F.col("B0042").alias("buscou_atendimento_ps_sus_upa"),     # 5
    F.col("B0043").alias("buscou_atendimento_hospital_sus"),   # 6
    F.col("B005").alias("ficou_internado"),                    # 7
    F.col("B006").alias("internado_risco"),                    # 8
    F.col("B007").alias("possui_plano_saude"),                 # 9

    # Parte C - Características de trabalho das pessoas de 14 anos ou mais de idade
    F.col("C011A").alias("respondeu_valor_recebido"),          # 10
    F.col("C011A1").alias("valores_recebidos_unidade"),        # 10
    F.col("C011A11").alias("valores_recebidos_faixa"),         # 10
    F.col("C011A12").alias("valores_recebidos_reais"),         # 10
)


In [0]:
# Create a table
db.write.format("parquet").saveAsTable("pnad_covid")