# Inicializar PySpark

In [2]:
# pyspark imports
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession\
            .builder\
                .appName('local[*]')\
                    .getOrCreate()

spark

In [3]:
# checando se o pyspark está funcionando
df = spark.sql('''select 'Deu certo!' as hello''')
df.show()

+----------+
|     hello|
+----------+
|Deu certo!|
+----------+



In [4]:
# módulos para manipulação de dados
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, FloatType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F
from functools import reduce
from typing import List
import pyspark.sql.types as T 

In [6]:
spark = SparkSession\
              .builder\
                     .appName("IBGE PNAD Covid")\
                            .getOrCreate()

spark

# Dados de entrada

In [7]:
# para aumentar o número de colunas que o pyspark mostra
spark.conf.set("spark.sql.debug.maxToStringFields", 100)

In [8]:
# ler o dataframe com todas as questões pré-selecionadas
df = spark.read.csv('output/df.csv',
                    sep=',', header=True, inferSchema=True)

df.show()

+--------+------+---+-------------+-------+---------+---------------+----------------+--------------+---------------------+-------------+---------+--------+--------------------+----------------+-------------------------------------+-----------------------------------+---------------------------------+-------------------------+-----------------------+------------------------------+---------------------------+----------------------+--------------------------------+-----------------------------------------+-----+-----+------------+--------------------+----------+---------+------+---------------------+------+---------+--------------------+------------+--------+------------------------------+-----------------------------------+--------------------+------------------------+--------------------+--------------------+-----------------------------+---------------------------------------+--------------------+------------------------+--------------------+--------------------+----------------------

In [9]:
df.printSchema() # mostra o esquema do dataframe

root
 |-- uf: string (nullable = true)
 |-- semana: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- no_entrevista: integer (nullable = true)
 |-- estrato: integer (nullable = true)
 |-- upa: integer (nullable = true)
 |-- respondente: string (nullable = true)
 |-- status_domicilio: string (nullable = true)
 |-- proj_populacao: integer (nullable = true)
 |-- condicao_no_domicilio: string (nullable = true)
 |-- idade_morador: integer (nullable = true)
 |-- sexo: string (nullable = true)
 |-- cor_raca: string (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- frequenta_escola: string (nullable = true)
 |-- atividades_escolares_realizar_em_casa: string (nullable = true)
 |-- dias_dedicados_atividades_escolares: string (nullable = true)
 |-- tempo_diario_atividades_escolares: string (nullable = true)
 |-- ensino_publico_ou_privado: string (nullable = true)
 |-- tendo_aulas_presenciais: string (nullable = true)
 |-- domicilio_tem_sabao_detergente: string (n

In [10]:
print('Total de linhas:', df.count()) # conta o número de linhas
print('Total de colunas:', df.columns) # mostra o nome de cada coluna
print('Colunas:', len(df.columns)) # mostra o nome de cada coluna
print('Tipos de dados:', df.dtypes) # mostra o tipo de dado de cada coluna
print('Schema:', df.schema) # mostra o esquema do dataframe

Total de linhas: 1149197
Total de colunas: ['uf', 'semana', 'mes', 'no_entrevista', 'estrato', 'upa', 'respondente', 'status_domicilio', 'proj_populacao', 'condicao_no_domicilio', 'idade_morador', 'sexo', 'cor_raca', 'escolaridade', 'frequenta_escola', 'atividades_escolares_realizar_em_casa', 'dias_dedicados_atividades_escolares', 'tempo_diario_atividades_escolares', 'ensino_publico_ou_privado', 'tendo_aulas_presenciais', 'domicilio_tem_sabao_detergente', 'domicilio_tem_alcool_70mais', 'domicilio_tem_mascaras', 'domicilio_tem_luvas_descartaveis', 'domicilio_tem_agua_sanitaria_desinfetante', 'febre', 'tosse', 'dor_garganta', 'dificuldade_respirar', 'dor_cabeca', 'dor_peito', 'nausea', 'nariz_entupido_coriza', 'fadiga', 'dor_olhos', 'perda_olfato_paladar', 'dor_muscular', 'diarreia', 'como_restringiu_contato_fisico', 'compareceu_estabelecimento_de_saude', 'ficou_em_casa', 'ligou_profissional_saude', 'automedicou', 'prescricao_medica', 'visita_profissional_saude_sus', 'visita_de_profissio

# Preparar os dados

In [11]:
# classificar estados por região in df withColumn
# colocar para cima

# conferir regiões
# create a new column with the region of each state
df = df.withColumn('regiao', 
                    F.when(col('uf').isin('Acre', 'Amazonas', 'Roraima', 'Amapá', 'Pará', 'Tocantins', 'Rondônia'), 'Norte')
                        .when(col('uf').isin('Maranhão', 'Piauí', 'Ceará', 'Rio Grande do Norte', 'Paraíba', 'Pernambuco', 'Alagoas', 'Sergipe', 'Bahia'), 'Nordeste')
                        .when(col('uf').isin('Minas Gerais', 'Espírito Santo', 'Rio de Janeiro', 'São Paulo'), 'Sudeste')
                        .when(col('uf').isin('Paraná', 'Santa Catarina', 'Rio Grande do Sul'), 'Sul')
                        .when(col('uf').isin('Mato Grosso', 'Mato Grosso do Sul', 'Goiás', 'Distrito Federal'), 'Centro-Oeste')
                        .otherwise(''))

# check
df.select('regiao').distinct().show()

+------------+
|      regiao|
+------------+
|       Norte|
|    Nordeste|
|     Sudeste|
|         Sul|
|Centro-Oeste|
+------------+



In [12]:
# make regiao 2nd col
df = df.select('uf', 'regiao', *[column for column in df.columns if column != 'uf' and column != 'regiao'])

# check
df.show()

+--------+------+------+---+-------------+-------+---------+---------------+----------------+--------------+---------------------+-------------+---------+--------+--------------------+----------------+-------------------------------------+-----------------------------------+---------------------------------+-------------------------+-----------------------+------------------------------+---------------------------+----------------------+--------------------------------+-----------------------------------------+-----+-----+------------+--------------------+----------+---------+------+---------------------+------+---------+--------------------+------------+--------+------------------------------+-----------------------------------+--------------------+------------------------+--------------------+--------------------+-----------------------------+---------------------------------------+--------------------+------------------------+--------------------+--------------------+---------------

In [13]:
df.describe().show()

+-------+---------+------------+------------------+------------------+------------------+-----------------+--------------------+-------------------+----------------+-----------------+---------------------+-----------------+---------+--------+--------------------+----------------+-------------------------------------+-----------------------------------+---------------------------------+-------------------------+-----------------------+------------------------------+---------------------------+----------------------+--------------------------------+-----------------------------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+---------------------+-------------------+-------------------+--------------------+-------------------+-------------------+------------------------------+-----------------------------------+--------------------+------------------------+--------------------+-------

In [14]:
# população urbana saiu mais enquanto rural ficou mais em casa?

# group by status_domicilio, mes => count each differente value in como_restringiu_contato_fisico
df.createOrReplaceTempView('como_restringiu_contato_fisico')

contato_fisico_status_domicilio = spark.sql('''
    SELECT status_domicilio, mes, como_restringiu_contato_fisico, count(*) as count
    FROM como_restringiu_contato_fisico
    GROUP BY status_domicilio, mes, como_restringiu_contato_fisico
    ORDER BY status_domicilio, mes, como_restringiu_contato_fisico
''')

# show the result
contato_fisico_status_domicilio.show()
# aviso: tarefa não pôde ser realizada em memória, foi levada ao disco
# https://stackoverflow.com/questions/46907447/meaning-of-apache-spark-warning-calling-spill-on-rowbasedkeyvaluebatch

+----------------+---+------------------------------+------+
|status_domicilio|mes|como_restringiu_contato_fisico| count|
+----------------+---+------------------------------+------+
|           Rural|  9|          Ficou em casa e s...| 41723|
|           Rural|  9|          Ficou rigorosamen...| 17481|
|           Rural|  9|                      Ignorado|   308|
|           Rural|  9|          Não fez restrição...|  2658|
|           Rural|  9|          Reduziu o contato...| 30741|
|           Rural| 10|          Ficou em casa e s...| 38955|
|           Rural| 10|          Ficou rigorosamen...| 13286|
|           Rural| 10|                      Ignorado|   336|
|           Rural| 10|          Não fez restrição...|  4003|
|           Rural| 10|          Reduziu o contato...| 34215|
|           Rural| 11|          Ficou em casa e s...| 38183|
|           Rural| 11|          Ficou rigorosamen...| 11782|
|           Rural| 11|                      Ignorado|   254|
|           Rural| 11|  

In [15]:
# rural x urbana => compareceu_estabelecimento_de_saude
df.createOrReplaceTempView('compareceu_estabelecimento_de_saude')

compareceu_estabelecimento_saude = spark.sql('''
    SELECT status_domicilio, mes, compareceu_estabelecimento_de_saude, count(*) as count
    FROM compareceu_estabelecimento_de_saude
    WHERE compareceu_estabelecimento_de_saude != 'Ignorado / Não aplicável'
    GROUP BY status_domicilio, mes, compareceu_estabelecimento_de_saude
    ORDER BY status_domicilio, mes, compareceu_estabelecimento_de_saude
''')

compareceu_estabelecimento_saude.show()

+----------------+---+-----------------------------------+-----+
|status_domicilio|mes|compareceu_estabelecimento_de_saude|count|
+----------------+---+-----------------------------------+-----+
|           Rural|  9|                                Não| 2977|
|           Rural|  9|                                Sim|  766|
|           Rural| 10|                                Não| 2323|
|           Rural| 10|                                Sim|  688|
|           Rural| 11|                                Não| 2233|
|           Rural| 11|                                Sim|  624|
|          Urbana|  9|                                Não| 9775|
|          Urbana|  9|                                Sim| 3341|
|          Urbana| 10|                                Não| 7970|
|          Urbana| 10|                                Sim| 2972|
|          Urbana| 11|                                Não| 7863|
|          Urbana| 11|                                Sim| 3405|
+----------------+---+---

In [16]:
# uf => compareceu_estabelecimento_de_saude
df.createOrReplaceTempView('compareceu_estabelecimento_de_saude_uf')

compareceu_estabelecimento_saude_uf = spark.sql('''
    SELECT uf, mes, compareceu_estabelecimento_de_saude, count(*) as count
    FROM compareceu_estabelecimento_de_saude
    WHERE compareceu_estabelecimento_de_saude != 'Ignorado / Não aplicável'
    GROUP BY uf, mes, compareceu_estabelecimento_de_saude
    ORDER BY uf, mes, compareceu_estabelecimento_de_saude
''')

compareceu_estabelecimento_saude_uf.show()

+--------+---+-----------------------------------+-----+
|      uf|mes|compareceu_estabelecimento_de_saude|count|
+--------+---+-----------------------------------+-----+
|    Acre|  9|                                Não|  178|
|    Acre|  9|                                Sim|   46|
|    Acre| 10|                                Não|  161|
|    Acre| 10|                                Sim|   45|
|    Acre| 11|                                Não|  114|
|    Acre| 11|                                Sim|   41|
| Alagoas|  9|                                Não|  273|
| Alagoas|  9|                                Sim|  106|
| Alagoas| 10|                                Não|  290|
| Alagoas| 10|                                Sim|   71|
| Alagoas| 11|                                Não|  214|
| Alagoas| 11|                                Sim|   69|
|   Amapá|  9|                                Não|  128|
|   Amapá|  9|                                Sim|   40|
|   Amapá| 10|                 

In [17]:
# quantos respondentes ficaram em casa devido à sintomas na última semana?
df.createOrReplaceTempView('ficaram_em_casa_devido_a_sintomas')

casa_devido_sintomas = spark.sql('''
    SELECT uf, status_domicilio, mes, ficou_em_casa, count(*) as count
    FROM ficaram_em_casa_devido_a_sintomas
    WHERE ficou_em_casa != 'Ignorado / Não aplicável'
    GROUP BY uf, status_domicilio, mes, ficou_em_casa
    ORDER BY uf, status_domicilio, mes, ficou_em_casa
''')

casa_devido_sintomas.show()

+-------+----------------+---+-------------+-----+
|     uf|status_domicilio|mes|ficou_em_casa|count|
+-------+----------------+---+-------------+-----+
|   Acre|           Rural|  9|          Não|   11|
|   Acre|           Rural|  9|          Sim|   32|
|   Acre|           Rural| 10|          Não|    3|
|   Acre|           Rural| 10|          Sim|   20|
|   Acre|           Rural| 11|          Sim|   19|
|   Acre|          Urbana|  9|          Não|   11|
|   Acre|          Urbana|  9|          Sim|  124|
|   Acre|          Urbana| 10|          Não|    6|
|   Acre|          Urbana| 10|          Sim|  132|
|   Acre|          Urbana| 11|          Não|    6|
|   Acre|          Urbana| 11|          Sim|   87|
|Alagoas|           Rural|  9|          Não|   14|
|Alagoas|           Rural|  9|          Sim|   79|
|Alagoas|           Rural| 10|          Não|    7|
|Alagoas|           Rural| 10|          Sim|   72|
|Alagoas|           Rural| 11|          Não|    5|
|Alagoas|           Rural| 11| 

In [18]:
# percent of people who stayed at home due to symptoms for uf
df.createOrReplaceTempView('ficaram_em_casa_devido_a_sintomas_uf')

casa_devido_sintomas_uf = spark.sql('''
    SELECT uf, mes, ficou_em_casa, count(*) as count,
        ROUND((count(*) / SUM(count(*)) OVER (PARTITION BY uf, mes)) * 100, 2) as percent
    FROM ficaram_em_casa_devido_a_sintomas_uf
    WHERE ficou_em_casa != 'Ignorado / Não aplicável'
    GROUP BY uf, mes, ficou_em_casa
    ORDER BY uf, mes, ficou_em_casa
''')

casa_devido_sintomas_uf.show()

+--------+---+-------------+-----+-------+
|      uf|mes|ficou_em_casa|count|percent|
+--------+---+-------------+-----+-------+
|    Acre|  9|          Não|   22|  12.36|
|    Acre|  9|          Sim|  156|  87.64|
|    Acre| 10|          Não|    9|   5.59|
|    Acre| 10|          Sim|  152|  94.41|
|    Acre| 11|          Não|    6|   5.36|
|    Acre| 11|          Sim|  106|  94.64|
| Alagoas|  9|          Não|   40|  14.76|
| Alagoas|  9|          Sim|  231|  85.24|
| Alagoas| 10|          Não|   30|  10.34|
| Alagoas| 10|          Sim|  260|  89.66|
| Alagoas| 11|          Não|   21|   9.86|
| Alagoas| 11|          Sim|  192|  90.14|
|   Amapá|  9|          Não|   12|   9.45|
|   Amapá|  9|          Sim|  115|  90.55|
|   Amapá| 10|          Não|   15|  18.29|
|   Amapá| 10|          Sim|   67|  81.71|
|   Amapá| 11|          Não|    7|   6.42|
|   Amapá| 11|          Sim|  102|  93.58|
|Amazonas|  9|          Não|   36|  11.11|
|Amazonas|  9|          Sim|  288|  88.89|
+--------+-

In [19]:
# resultados de testes positivos por região
# create a temporary view
df.createOrReplaceTempView('testagem_regiao')

# group by regiao, mes => count each differente value in testagem
testagem_regiao = spark.sql('''
    SELECT regiao, mes, resultado_cotonete, resultado_sangue_dedo, resultado_sangue_braco,
        count(*) as total_respostas
    FROM testagem_regiao
    WHERE resultado_cotonete != 'Ignorado / Não aplicável' OR
        resultado_sangue_dedo != 'Ignorado / Não aplicável' OR
        resultado_sangue_braco != 'Ignorado / Não aplicável'
    GROUP BY regiao, mes, resultado_cotonete, resultado_sangue_dedo, resultado_sangue_braco
    ORDER BY regiao, mes, resultado_cotonete, resultado_sangue_dedo, resultado_sangue_braco
''')

testagem_regiao.show()

+------------+---+--------------------+---------------------+----------------------+---------------+
|      regiao|mes|  resultado_cotonete|resultado_sangue_dedo|resultado_sangue_braco|total_respostas|
+------------+---+--------------------+---------------------+----------------------+---------------+
|Centro-Oeste|  9|Ainda não recebeu...| Ainda não recebeu...|  Ainda não recebeu...|              2|
|Centro-Oeste|  9|Ainda não recebeu...| Ainda não recebeu...|  Ignorado / Não ap...|              1|
|Centro-Oeste|  9|Ainda não recebeu...| Ignorado / Não ap...|  Ainda não recebeu...|              1|
|Centro-Oeste|  9|Ainda não recebeu...| Ignorado / Não ap...|  Ignorado / Não ap...|             30|
|Centro-Oeste|  9|Ainda não recebeu...| Ignorado / Não ap...|              Negativo|              1|
|Centro-Oeste|  9|Ainda não recebeu...|             Negativo|  Ignorado / Não ap...|              1|
|Centro-Oeste|  9|Ainda não recebeu...|             Negativo|              Negativo|       

In [20]:
# new column for only cont of positivos
testagem_regiao = testagem_regiao.withColumn('positivos', 
                                     F.when((col('resultado_cotonete') == 'Positivo') |
                                            (col('resultado_sangue_dedo') == 'Positivo') |
                                            (col('resultado_sangue_braco') == 'Positivo'), 1).otherwise(0))

testagem_regiao.show()

+------------+---+--------------------+---------------------+----------------------+---------------+---------+
|      regiao|mes|  resultado_cotonete|resultado_sangue_dedo|resultado_sangue_braco|total_respostas|positivos|
+------------+---+--------------------+---------------------+----------------------+---------------+---------+
|Centro-Oeste|  9|Ainda não recebeu...| Ainda não recebeu...|  Ainda não recebeu...|              2|        0|
|Centro-Oeste|  9|Ainda não recebeu...| Ainda não recebeu...|  Ignorado / Não ap...|              1|        0|
|Centro-Oeste|  9|Ainda não recebeu...| Ignorado / Não ap...|  Ainda não recebeu...|              1|        0|
|Centro-Oeste|  9|Ainda não recebeu...| Ignorado / Não ap...|  Ignorado / Não ap...|             30|        0|
|Centro-Oeste|  9|Ainda não recebeu...| Ignorado / Não ap...|              Negativo|              1|        0|
|Centro-Oeste|  9|Ainda não recebeu...|             Negativo|  Ignorado / Não ap...|              1|        0|
|

In [21]:
# positivos for regiao and month, calculate the percent
testagem_regiao.createOrReplaceTempView('positivos_regiao')

positivos_regiao = spark.sql('''
    SELECT regiao, mes, sum(positivos) as total_positivos,
        ROUND((sum(positivos) / SUM(total_respostas)) * 100, 2) as percentual_positivos
    FROM positivos_regiao
    GROUP BY regiao, mes
    ORDER BY regiao, mes
''')

positivos_regiao.show()

+------------+---+---------------+--------------------+
|      regiao|mes|total_positivos|percentual_positivos|
+------------+---+---------------+--------------------+
|Centro-Oeste|  9|             24|                0.42|
|Centro-Oeste| 10|             22|                0.33|
|Centro-Oeste| 11|             24|                0.32|
|    Nordeste|  9|             26|                0.23|
|    Nordeste| 10|             26|                0.21|
|    Nordeste| 11|             29|                0.21|
|       Norte|  9|             25|                0.48|
|       Norte| 10|             29|                0.52|
|       Norte| 11|             29|                0.48|
|     Sudeste|  9|             25|                0.24|
|     Sudeste| 10|             28|                0.23|
|     Sudeste| 11|             28|                 0.2|
|         Sul|  9|             20|                0.34|
|         Sul| 10|             21|                0.31|
|         Sul| 11|             21|              

In [22]:
# select 5 worst results for percent
positivos_regiao.createOrReplaceTempView('piores_positivos_regiao')

positivos_regiao_percent = spark.sql('''
    SELECT regiao, mes, total_positivos, percentual_positivos
    FROM piores_positivos_regiao
    ORDER BY percentual_positivos DESC
    LIMIT 5
''')

positivos_regiao_percent.show()

+------------+---+---------------+--------------------+
|      regiao|mes|total_positivos|percentual_positivos|
+------------+---+---------------+--------------------+
|       Norte| 10|             29|                0.52|
|       Norte| 11|             29|                0.48|
|       Norte|  9|             25|                0.48|
|Centro-Oeste|  9|             24|                0.42|
|         Sul|  9|             20|                0.34|
+------------+---+---------------+--------------------+



In [23]:
# testagem Positivo, Inconclusivo, Negativo
# create a temporary view
df.createOrReplaceTempView('testagem_uf')

# group by status_domicilio, mes => count each differente value in testagem
testagem_uf = spark.sql('''
    SELECT uf, mes, resultado_cotonete, resultado_sangue_dedo, resultado_sangue_braco,
        count(*) as total_respostas
    FROM testagem_uf
    WHERE resultado_cotonete != 'Ignorado / Não aplicável' OR
        resultado_sangue_dedo != 'Ignorado / Não aplicável' OR
        resultado_sangue_braco != 'Ignorado / Não aplicável'
    GROUP BY uf, mes, resultado_cotonete, resultado_sangue_dedo, resultado_sangue_braco
    ORDER BY uf, mes, resultado_cotonete, resultado_sangue_dedo, resultado_sangue_braco
''')

testagem_uf.show()

+----+---+--------------------+---------------------+----------------------+---------------+
|  uf|mes|  resultado_cotonete|resultado_sangue_dedo|resultado_sangue_braco|total_respostas|
+----+---+--------------------+---------------------+----------------------+---------------+
|Acre|  9|Ainda não recebeu...| Ignorado / Não ap...|  Ainda não recebeu...|              1|
|Acre|  9|Ainda não recebeu...| Ignorado / Não ap...|  Ignorado / Não ap...|              1|
|Acre|  9|Ignorado / Não ap...| Ainda não recebeu...|  Ainda não recebeu...|              1|
|Acre|  9|Ignorado / Não ap...| Ignorado / Não ap...|          Inconclusivo|              2|
|Acre|  9|Ignorado / Não ap...| Ignorado / Não ap...|              Negativo|             58|
|Acre|  9|Ignorado / Não ap...| Ignorado / Não ap...|              Positivo|             47|
|Acre|  9|Ignorado / Não ap...|         Inconclusivo|  Ignorado / Não ap...|              2|
|Acre|  9|Ignorado / Não ap...|         Inconclusivo|              Neg

In [24]:
# new column for only cont of positivos
testagem_uf = testagem_uf.withColumn('positivos', 
                                     F.when((col('resultado_cotonete') == 'Positivo') |
                                            (col('resultado_sangue_dedo') == 'Positivo') |
                                            (col('resultado_sangue_braco') == 'Positivo'), 1).otherwise(0))

testagem_uf.show()

+----+---+--------------------+---------------------+----------------------+---------------+---------+
|  uf|mes|  resultado_cotonete|resultado_sangue_dedo|resultado_sangue_braco|total_respostas|positivos|
+----+---+--------------------+---------------------+----------------------+---------------+---------+
|Acre|  9|Ainda não recebeu...| Ignorado / Não ap...|  Ainda não recebeu...|              1|        0|
|Acre|  9|Ainda não recebeu...| Ignorado / Não ap...|  Ignorado / Não ap...|              1|        0|
|Acre|  9|Ignorado / Não ap...| Ainda não recebeu...|  Ainda não recebeu...|              1|        0|
|Acre|  9|Ignorado / Não ap...| Ignorado / Não ap...|          Inconclusivo|              2|        0|
|Acre|  9|Ignorado / Não ap...| Ignorado / Não ap...|              Negativo|             58|        0|
|Acre|  9|Ignorado / Não ap...| Ignorado / Não ap...|              Positivo|             47|        1|
|Acre|  9|Ignorado / Não ap...|         Inconclusivo|  Ignorado / Não ap.

In [25]:
# new temporary view
testagem_uf.createOrReplaceTempView('positivos_uf')

# get only positivos for uf and month, calculate the percent
positivos_uf = spark.sql('''
    SELECT uf, mes, sum(positivos) as total_positivos,
        ROUND((sum(positivos) / SUM(total_respostas)) * 100, 2) as percentual_positivos
    FROM positivos_uf
    GROUP BY uf, mes
    ORDER BY uf, mes
''')

positivos_uf.show()

+----------------+---+---------------+--------------------+
|              uf|mes|total_positivos|percentual_positivos|
+----------------+---+---------------+--------------------+
|            Acre|  9|             13|                2.63|
|            Acre| 10|             13|                2.31|
|            Acre| 11|             14|                2.34|
|         Alagoas|  9|             14|                1.39|
|         Alagoas| 10|             14|                1.37|
|         Alagoas| 11|             17|                1.56|
|           Amapá|  9|              8|                1.83|
|           Amapá| 10|             10|                2.65|
|           Amapá| 11|             10|                2.27|
|        Amazonas|  9|             12|                1.01|
|        Amazonas| 10|             13|                1.05|
|        Amazonas| 11|             14|                1.09|
|           Bahia|  9|             17|                1.07|
|           Bahia| 10|             16|  

In [26]:
# select 5 worst results for percent
positivos_uf.createOrReplaceTempView('piores_positivos_uf')

positivos_uf_percent = spark.sql('''
    SELECT uf, mes, total_positivos, percentual_positivos
    FROM piores_positivos_uf
    ORDER BY percentual_positivos DESC
    LIMIT 5
''')

positivos_uf_percent.show()

+--------+---+---------------+--------------------+
|      uf|mes|total_positivos|percentual_positivos|
+--------+---+---------------+--------------------+
|Rondônia|  9|             17|                3.61|
|Rondônia| 10|             20|                3.26|
|   Amapá| 10|             10|                2.65|
|    Acre|  9|             13|                2.63|
|Rondônia| 11|             20|                2.58|
+--------+---+---------------+--------------------+



In [27]:
# organizar ordem do notebook - região antes de estado
# escolher o quê manter
# criar queries dentro de queries para automatizar
# o que são subqueries?
# função para automatizar queries - no fim

In [28]:
# print schema
df.printSchema()

root
 |-- uf: string (nullable = true)
 |-- regiao: string (nullable = false)
 |-- semana: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- no_entrevista: integer (nullable = true)
 |-- estrato: integer (nullable = true)
 |-- upa: integer (nullable = true)
 |-- respondente: string (nullable = true)
 |-- status_domicilio: string (nullable = true)
 |-- proj_populacao: integer (nullable = true)
 |-- condicao_no_domicilio: string (nullable = true)
 |-- idade_morador: integer (nullable = true)
 |-- sexo: string (nullable = true)
 |-- cor_raca: string (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- frequenta_escola: string (nullable = true)
 |-- atividades_escolares_realizar_em_casa: string (nullable = true)
 |-- dias_dedicados_atividades_escolares: string (nullable = true)
 |-- tempo_diario_atividades_escolares: string (nullable = true)
 |-- ensino_publico_ou_privado: string (nullable = true)
 |-- tendo_aulas_presenciais: string (nullable = true)
 |-- do

In [29]:
# comorbidades - criar coluna com o total de comorbidades
# create a temporary view
df.createOrReplaceTempView('comorbidades_regiao_mes')

# group by regiao, mes => count each differente value in comorbidades
comorbidades_regiao_mes = spark.sql('''
    SELECT regiao, mes, 
        diabetes, hipertensao, doenca_respiratoria_cronica_ou_pulmao, doenca_cardiaca, depressao, cancer,
        (CASE WHEN idade_morador < 60 THEN 0 ELSE 1 END) as idoso,
        (CASE WHEN idoso = 'Sim' OR
            diabetes = 'Sim' OR 
            hipertensao = 'Sim' OR 
            doenca_respiratoria_cronica_ou_pulmao = 'Sim' OR 
            doenca_cardiaca = 'Sim' OR 
            depressao = 'Sim' OR 
            cancer = 'Sim' THEN 1 ELSE 0 END) as possui_comorbidade,
        count(*) as total_respostas
    FROM comorbidades_regiao_mes
    GROUP BY regiao, mes, diabetes, hipertensao, doenca_respiratoria_cronica_ou_pulmao, doenca_cardiaca, depressao, cancer, idoso
    ORDER BY regiao, mes, diabetes, hipertensao, doenca_respiratoria_cronica_ou_pulmao, doenca_cardiaca, depressao, cancer, idoso
''')

comorbidades_regiao_mes.show()

+------------+---+--------------------+--------------------+-------------------------------------+--------------------+--------------------+--------------------+-----+------------------+---------------+
|      regiao|mes|            diabetes|         hipertensao|doenca_respiratoria_cronica_ou_pulmao|     doenca_cardiaca|           depressao|              cancer|idoso|possui_comorbidade|total_respostas|
+------------+---+--------------------+--------------------+-------------------------------------+--------------------+--------------------+--------------------+-----+------------------+---------------+
|Centro-Oeste|  9|Ignorado / Não ap...|Ignorado / Não ap...|                 Ignorado / Não ap...|Ignorado / Não ap...|Ignorado / Não ap...|Ignorado / Não ap...|    0|                 0|            134|
|Centro-Oeste|  9|Ignorado / Não ap...|Ignorado / Não ap...|                 Ignorado / Não ap...|Ignorado / Não ap...|Ignorado / Não ap...|Ignorado / Não ap...|    1|                 0|  

In [30]:
# calculate the percent of people with comorbidades
comorbidades_regiao_mes.createOrReplaceTempView('regiao_comorbidades_percent')

comorbidades_percent = spark.sql('''
    SELECT mes, regiao, 
        sum(possui_comorbidade) as total_comorbidades,
        ROUND((sum(possui_comorbidade) / SUM(total_respostas)) * 100, 2) as percentual_comorbidade
    FROM regiao_comorbidades_percent
    GROUP BY regiao, mes
    ORDER BY regiao, mes
''')

comorbidades_percent.show()

+---+------------+------------------+----------------------+
|mes|      regiao|total_comorbidades|percentual_comorbidade|
+---+------------+------------------+----------------------+
|  9|Centro-Oeste|               115|                  0.28|
| 10|Centro-Oeste|               112|                  0.28|
| 11|Centro-Oeste|               112|                  0.27|
|  9|    Nordeste|               123|                   0.1|
| 10|    Nordeste|               116|                   0.1|
| 11|    Nordeste|               120|                   0.1|
|  9|       Norte|               101|                  0.21|
| 10|       Norte|               102|                  0.22|
| 11|       Norte|               100|                  0.21|
|  9|     Sudeste|               136|                  0.12|
| 10|     Sudeste|               131|                  0.12|
| 11|     Sudeste|               127|                  0.11|
|  9|         Sul|               130|                   0.2|
| 10|         Sul|      

In [31]:
# calculate the percent of people with comorbidades
comorbidades_regiao_mes.createOrReplaceTempView('comorbidade_regiao')

comorbidade_regiao = spark.sql('''
    SELECT regiao, 
        ROUND((SUM(CASE WHEN possui_comorbidade = 'Sim' THEN total_respostas ELSE 0 END) / SUM(total_respostas)) * 100, 2) as percentual_comorbidades
    FROM comorbidade_regiao
    GROUP BY regiao
    ORDER BY percentual_comorbidades DESC
''')

comorbidade_regiao.show()

+------------+-----------------------+
|      regiao|percentual_comorbidades|
+------------+-----------------------+
|    Nordeste|                    0.0|
|         Sul|                    0.0|
|     Sudeste|                    0.0|
|Centro-Oeste|                    0.0|
|       Norte|                    0.0|
+------------+-----------------------+



In [32]:
# add column in comorbidade_regiao_mes with sum of number of comorbidades
comorbidades_regiao_mes = comorbidades_regiao_mes.withColumn('total_comorbidades', 
                                                                F.when(col('diabetes') == 'Sim', 1).otherwise(0) +
                                                                F.when(col('hipertensao') == 'Sim', 1).otherwise(0) +
                                                                F.when(col('doenca_respiratoria_cronica_ou_pulmao') == 'Sim', 1).otherwise(0) +
                                                                F.when(col('doenca_cardiaca') == 'Sim', 1).otherwise(0) +
                                                                F.when(col('depressao') == 'Sim', 1).otherwise(0) +\
                                                                F.when(col('cancer') == 'Sim', 1).otherwise(0) +\
                                                                F.when(col('idoso') == 1, 1).otherwise(0))

comorbidades_regiao_mes.show()

+------------+---+--------------------+--------------------+-------------------------------------+--------------------+--------------------+--------------------+-----+------------------+---------------+------------------+
|      regiao|mes|            diabetes|         hipertensao|doenca_respiratoria_cronica_ou_pulmao|     doenca_cardiaca|           depressao|              cancer|idoso|possui_comorbidade|total_respostas|total_comorbidades|
+------------+---+--------------------+--------------------+-------------------------------------+--------------------+--------------------+--------------------+-----+------------------+---------------+------------------+
|Centro-Oeste|  9|Ignorado / Não ap...|Ignorado / Não ap...|                 Ignorado / Não ap...|Ignorado / Não ap...|Ignorado / Não ap...|Ignorado / Não ap...|    0|                 0|            134|                 0|
|Centro-Oeste|  9|Ignorado / Não ap...|Ignorado / Não ap...|                 Ignorado / Não ap...|Ignorado / Não

In [33]:
df.printSchema()

root
 |-- uf: string (nullable = true)
 |-- regiao: string (nullable = false)
 |-- semana: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- no_entrevista: integer (nullable = true)
 |-- estrato: integer (nullable = true)
 |-- upa: integer (nullable = true)
 |-- respondente: string (nullable = true)
 |-- status_domicilio: string (nullable = true)
 |-- proj_populacao: integer (nullable = true)
 |-- condicao_no_domicilio: string (nullable = true)
 |-- idade_morador: integer (nullable = true)
 |-- sexo: string (nullable = true)
 |-- cor_raca: string (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- frequenta_escola: string (nullable = true)
 |-- atividades_escolares_realizar_em_casa: string (nullable = true)
 |-- dias_dedicados_atividades_escolares: string (nullable = true)
 |-- tempo_diario_atividades_escolares: string (nullable = true)
 |-- ensino_publico_ou_privado: string (nullable = true)
 |-- tendo_aulas_presenciais: string (nullable = true)
 |-- do

In [35]:
# contar % valor_dinheiro > 0 e % valor_dinheiro > 0
# create a temporary view
df.createOrReplaceTempView('recebeu_ou_nao_dinheiro_mes')

recebeu_ou_nao_dinheiro_mes = spark.sql('''
    SELECT mes, uf, 
        ROUND((SUM(CASE WHEN valor_dinheiro > 0 THEN 1 ELSE 0 END) / COUNT(*)) * 100, 2) as percentual_valor_dinheiro,
        ROUND((SUM(CASE WHEN valor_dinheiro = 0 THEN 1 ELSE 0 END) / COUNT(*)) * 100, 2) as percentual_sem_valor_dinheiro
    FROM recebeu_ou_nao_dinheiro_mes
    GROUP BY mes, uf
    ORDER BY mes, uf
''')

recebeu_ou_nao_dinheiro_mes.show()

+---+-------------------+-------------------------+-----------------------------+
|mes|                 uf|percentual_valor_dinheiro|percentual_sem_valor_dinheiro|
+---+-------------------+-------------------------+-----------------------------+
|  9|               Acre|                    31.31|                        68.69|
|  9|            Alagoas|                    24.76|                        75.24|
|  9|              Amapá|                    33.33|                        66.67|
|  9|           Amazonas|                    30.97|                        69.03|
|  9|              Bahia|                    30.38|                        69.62|
|  9|              Ceará|                     28.1|                         71.9|
|  9|   Distrito Federal|                    40.76|                        59.24|
|  9|     Espírito Santo|                    40.78|                        59.22|
|  9|              Goiás|                    42.32|                        57.68|
|  9|           

In [38]:
# escolaridade por região
# create a temporary view
df.createOrReplaceTempView('escolaridade_regiao')

# group by regiao, mes => count each differente value in escolaridade
escolaridade_regiao = spark.sql('''
    SELECT regiao, mes, escolaridade, count(*) as total_respostas
    FROM escolaridade_regiao
    GROUP BY regiao, mes, escolaridade
    ORDER BY regiao, mes, escolaridade
''')

escolaridade_regiao.show()

+------------+---+--------------------+---------------+
|      regiao|mes|        escolaridade|total_respostas|
+------------+---+--------------------+---------------+
|Centro-Oeste|  9|Fundamental completa|           2520|
|Centro-Oeste|  9|Fundamental incom...|          13135|
|Centro-Oeste|  9|      Médio completo|           8600|
|Centro-Oeste|  9|    Médio incompleto|           4070|
|Centro-Oeste|  9|Pós-graduação, me...|           1325|
|Centro-Oeste|  9|       Sem instrução|           4157|
|Centro-Oeste|  9|   Superior completo|           4644|
|Centro-Oeste|  9| Superior incompleto|           2562|
|Centro-Oeste| 10|Fundamental completa|           2491|
|Centro-Oeste| 10|Fundamental incom...|          12947|
|Centro-Oeste| 10|      Médio completo|           8437|
|Centro-Oeste| 10|    Médio incompleto|           4006|
|Centro-Oeste| 10|Pós-graduação, me...|           1325|
|Centro-Oeste| 10|       Sem instrução|           4105|
|Centro-Oeste| 10|   Superior completo|         