UNIVERSIDADE DE BRASÍLIA - DEPARTAMENTO DE ESTATÍSTICA\
Elaine de Oliveira
## Projeto de Computação Eficiente, parte 2

Utilizando os arquivos baixados no trabalho anterior, realizo as seguintes manipulações de dados:

        1) Carregamento das colunas 'estabelecimento_uf', 'vacina_descricao_dose', 'estabelecimento_municipio_codigo' de todos os arquivos
        2) Leitura do csv com os codigos das regiões de saúde
        3) Junção das duas tabelas anteriores
        4) Contagem da quantidade de vacinados por região 
        5) Divisão em faixas alta e baixa
        6) Visualização das cinco regiões com menos vacinados em cada faixa
        7) Retorno da tabela para o python

As manipulações serão feitas utilizando novos quatro métodos: **SQLite, MongoDB, Spark e Polars**. Como o SQLite e o MongoDB não possuem funções para leitura de csv nativas, usarei o pyarrow para a leitura dos arquivos neste dois casos.

In [2]:
%load_ext memory_profiler
%load_ext line_profiler

In [3]:
#Leitura dos arquivos com o pyarrow
import pyarrow.dataset as ds
from pyarrow import csv
import pyarrow as pa
import pandas as pd

dir = 'dados/'
dataset = ds.dataset(dir, 
        format=ds.CsvFileFormat(parse_options=csv.ParseOptions(delimiter=';'), convert_options=csv.ConvertOptions(null_values=['None',''])))
vacinas = dataset.scanner(columns=['estabelecimento_uf', 'vacina_descricao_dose', 'estabelecimento_municipio_codigo']).to_batches()

#### Primeiro Método: SQLite3

In [3]:
import sqlite3
def sqlite_func():
    #Conecta ao sqlite
    con = sqlite3.connect("ce_trab2.db")

    #1 Cria tabela vacinas e carrega os dados
    con.execute('CREATE TABLE vacinas(estabelecimento_uf, vacina_descricao_dose, estabelecimento_municipio_codigo)')
    for i in vacinas:
        d = i.to_pylist()
        con.executemany('INSERT INTO vacinas VALUES(:estabelecimento_uf,:vacina_descricao_dose,:estabelecimento_municipio_codigo)',d)

    #2 Cria tabela IBGE e carrega os dados
    #colunas do csv ['UF', 'Município', 'Cód IBGE', 'Cód Região de Saúde', 'Nome da Região de Saúde']
    pd.read_csv('Tabela_codigos.csv', index_col=0).to_sql('ibge', con, index=False)
    
    #3/4 Junta as tabelas e conta vacinados por região de saúde
    con.execute('''
    CREATE TABLE joined AS
    SELECT ibge.'Cód Região de Saúde', ibge.'Nome da Região de Saúde', ibge.UF, count(*) as count
    FROM vacinas
    LEFT JOIN ibge ON vacinas.'estabelecimento_municipio_codigo' = ibge.'Cód IBGE'
    GROUP BY ibge.'Cód Região de Saúde'
    ORDER BY count DESC
    ''')

    #5 Cria as faixas de vacinação
    con.execute('ALTER TABLE joined ADD faixa')
    con.execute(''' 
    UPDATE joined
    SET faixa='Baixa' where count < (SELECT MIN(count) from (SELECT * FROM joined LIMIT (SELECT COUNT(*)/2 FROM joined)))
    ''')
    con.execute(''' 
    UPDATE joined
    SET faixa='Alta' where count >= (SELECT MIN(count) from (SELECT * FROM joined LIMIT (SELECT COUNT(*)/2 FROM joined)))
    ''')

    #6 Menos vacinados Alta/Baixa
    alta = pd.read_sql_query("SELECT * FROM joined WHERE faixa='Alta' ORDER BY count ASC LIMIT 5", con)
    baixa = pd.read_sql_query("SELECT * FROM joined WHERE faixa='Baixa' ORDER BY count ASC LIMIT 5", con)

    #7 Retorna tabela para o pandas
    df = pd.read_sql_query('SELECT * FROM joined', con)

    #Fecha a conexão
    con.commit()
    con.close()

    return df, alta, baixa


%lprun -f sqlite_func sqlite_func()


Timer unit: 1e-07 s

Total time: 92.0157 s
File: <ipython-input-3-122a11be7eaf>
Function: sqlite_func at line 2

Line #      Hits         Time  Per Hit   % Time  Line Contents
     2                                           def sqlite_func():
     3                                               #Conecta ao sqlite
     4         1    1266000.0 1266000.0      0.1      con = sqlite3.connect("ce_trab2.db")
     5                                           
     6                                               #1 Cria tabela vacinas e carrega os dados
     7         1    4715749.0 4715749.0      0.5      con.execute('CREATE TABLE vacinas(estabelecimento_uf, vacina_descricao_dose, estabelecimento_municipio_codigo)')
     8      8261    4170200.0    504.8      0.5      for i in vacinas:
     9      8261  458247196.0  55471.2     49.8          d = i.to_pylist()
    10      8261  273624830.0  33122.5     29.7          con.executemany('INSERT INTO vacinas VALUES(:estabelecimento_uf,:vacina_descri

#### Segundo Método: MongoDB com pymongo

In [5]:
import pymongo
from pymongo import MongoClient
import math
def pymongo_func():
    #Conecta com o Mongo
    client = MongoClient('localhost', 27017)
    trab2 = client.trab2

    #1 Cria tabela vacinas e carrega os dados
    vacs = trab2.vacinas
    for i in vacinas:
        vacs.insert_many(i.to_pylist())

    #2 Cria tabela códigos e carrega os dados
    cods = trab2.codigos
    cods.insert_many(csv.read_csv('Tabela_codigos.csv').to_pylist())

    #3/4 Juntas as tabelas e conta vacinados
    pipe = vacs.aggregate([
        { "$group":{
                "_id": '$estabelecimento_municipio_codigo',
                "count": {"$sum": 1}
        } },
        { "$lookup":{
            "from": "codigos",
            "localField": "_id",
            'foreignField': 'Cód IBGE',
            'as': 'muni'
        } },
        { "$group":{
                "_id": {'$arrayElemAt': ['$muni.Cód Região de Saúde',0]},
                "Nome": {'$first': '$muni.Nome da Região de Saúde'},
                "UF": {'$first': "$muni.UF"},
                "count": {'$sum': '$count'}
        }},
        {'$sort': {'count': -1}},
        {'$out': 'joined'}
        ])

    #5 Crias as faixas de vacinação
    median = [i for i in trab2.joined.find()][math.floor(trab2.joined.estimated_document_count()/2)]['count']
    pipe = trab2.joined.aggregate([
                {"$project":{
                        '-id': 1, 'Nome': 1, 'UF': 1, 'count': 1,
                        'faixa' :{ '$cond': { 'if': { '$lte': [ "$count", median ] }, 'then': 'Baixa', 'else': 'Alta' }}
                }},
                {'$out': 'joined'}
        ])

    #6 Menos Vacinados Alta/Baixa 
    alta = pd.DataFrame([i for i in trab2.joined.find({'faixa': 'Alta'})][-5:])
    baixa = pd.DataFrame([i for i in trab2.joined.find({'faixa': 'Baixa'})][-5:])
    
    #7 Retorna 
    df = pd.DataFrame([i for i in trab2.joined.find()])

    return df,alta,baixa

%lprun -f pymongo_func pymongo_func()

Timer unit: 1e-07 s

Total time: 101.076 s
File: <ipython-input-5-3f557e96a83f>
Function: pymongo_func at line 4

Line #      Hits         Time  Per Hit   % Time  Line Contents
     4                                           def pymongo_func():
     5                                               #Conecta com o Mongo
     6         1      42430.0  42430.0      0.0      client = MongoClient('localhost', 27017)
     7         1        311.0    311.0      0.0      trab2 = client.trab2
     8                                           
     9                                               #1 Cria tabela vacinas e carrega os dados
    10         1        396.0    396.0      0.0      vacs = trab2.vacinas
    11      1606    1383817.0    861.7      0.1      for i in vacinas:
    12      1606  798313666.0 497082.0     79.0          vacs.insert_many(i.to_pylist())
    13                                           
    14                                               #2 Cria tabela códigos e carre

#### Terceiro Método: Spark com pyspark

In [6]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pyspark.sql.functions as F

def spark_func():
    spark = SparkSession.builder.appName("trab2").getOrCreate()

    vacs = spark.read.option("delimiter", ";")\
        .option("header", "true")\
        .option("inferSchema", "true").csv(['dados/'+i for i in os.listdir('dados')])\
        .select('estabelecimento_uf', 'vacina_descricao_dose', 'estabelecimento_municipio_codigo')

    cods = spark.read\
        .option("header", "true")\
        .option("inferSchema", "true").csv('Tabela_codigos.csv')

    joined = vacs.join(cods, vacs.estabelecimento_municipio_codigo == cods['Cód IBGE'], 'left')\
        .groupBy(['Cód Região de Saúde','Nome da Região de Saúde','UF']).count().sort(col('count').desc())
    s = F.lit(joined.approxQuantile('count',[0.5],0.1)[0])
    joined = joined.withColumn('Faixa', when(joined['count'] > s, 'Alta').otherwise('Baixa'))

    alta = joined[joined['Faixa'] == 'Alta'].tail(5)
    baixa = joined[joined['Faixa'] == 'Baixa'].tail(5)
    df = joined.toPandas()

    return df, alta, baixa

%lprun -f spark_func spark_func()

Timer unit: 1e-07 s

Total time: 40.3304 s
File: <ipython-input-6-ffb1830a8d63>
Function: spark_func at line 6

Line #      Hits         Time  Per Hit   % Time  Line Contents
     6                                           def spark_func():
     7         1   39575641.0 39575641.0      9.8      spark = SparkSession.builder.appName("trab2").getOrCreate()
     8                                           
     9         2       7258.0   3629.0      0.0      vacs = spark.read.option("delimiter", ";")\
    10         1          4.0      4.0      0.0          .option("header", "true")\
    11         2       2111.0   1055.5      0.0          .option("inferSchema", "true").csv(['dados/'+i for i in os.listdir('dados')])\
    12         1          6.0      6.0      0.0          .select('estabelecimento_uf', 'vacina_descricao_dose', 'estabelecimento_municipio_codigo')
    13                                           
    14         2       6622.0   3311.0      0.0      cods = spark.read\
    15

#### Quarto Método: Polars

In [2]:
import polars as pl

def polars_func():
    df = pl.scan_csv('dados/*.csv', sep=';', has_header=True).select(['estabelecimento_uf', 'vacina_descricao_dose', 'estabelecimento_municipio_codigo'])\
        .join(pl.scan_csv('Tabela_codigos.csv'), how = 'left', left_on='estabelecimento_municipio_codigo', right_on='Cód IBGE')\
        .groupby(['Cód Região de Saúde', 'Nome da Região de Saúde', 'UF']).agg(pl.count())\
        .with_columns((pl.col('count') > pl.col('count').median()).apply(lambda x: 'Alta' if x else 'Baixa').alias('Faixa'))\
        .sort(by='count', reverse=True)\
        .collect()
    
    alta = df.filter(pl.col('Faixa') == 'Alta').tail(5)
    baixa = df.filter(pl.col('Faixa') == 'Baixa').tail(5)
    
    return df, alta, baixa

%lprun -f polars_func polars_func()

Timer unit: 1e-07 s

Total time: 0.0165127 s
File: <ipython-input-2-12173ba1b5c5>
Function: polars_func at line 3

Line #      Hits         Time  Per Hit   % Time  Line Contents
     3                                           def polars_func():
     4         2        836.0    418.0      0.5      df = pl.scan_csv('dados/*.csv', sep=';', has_header=True).select(['estabelecimento_uf', 'vacina_descricao_dose', 'estabelecimento_municipio_codigo'])\
     5         1       2404.0   2404.0      1.5          .join(pl.scan_csv('Tabela_codigos.csv'), how = 'left', left_on='estabelecimento_municipio_codigo', right_on='Cód IBGE')\
     6         2         54.0     27.0      0.0          .groupby(['Cód Região de Saúde', 'Nome da Região de Saúde', 'UF']).agg(pl.count())\
     7         1        457.0    457.0      0.3          .with_columns((pl.col('count') > pl.col('count').median()).apply(lambda x: 'Alta' if x else 'Baixa').alias('Faixa'))\
     8         1          3.0      3.0      0.0         

#### Resultados

Os métodos MongoDB e Spark tiveram tempos bem inconsistentes, às vezes alguns minutos, às vezes durando 6-7 minutos. Acredito que o motivo seja o uso de outros recursos computacionais, como a leitura dos arquivos no HD e a forma que eles reprocessam informações.

(parte 1)
    Datatable - 12s
    Pandas - 16s
(parte 2)

    SQLite - 92s
    MongoDB - 101s
    Spark - 40s
    Polars - **5.9s**

- Todos os métodos tem um gargalo na leitura dos arquivos, pois o HD é lento na leitura.
- O Polars foi o mais rápido de todos os métodos testados até agora. Por ele ser lazy, ele consegue otimizar a leitura dos arquivos e o processamento.
- Todos os métodos tiveram picos de memória parecidos, na faixa de 2GB.
- Diferente os outros métodos, o Spark também utiliza 100% do processador para realizar as operações