In [1]:
import pandas as pd
import numpy as np

import asyncio
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

In [35]:
# Função de carga de dados

def load_data(data_url:str, tipo:str= 'csv', date_f:list= [], **kwargs) -> 'DataFrame':
    '''
    Função para carregar um conjunto de dados
    :rtype: pd.Dataframe
    :param data_url: caminho completo do arquivo a ser carregado
    :param tipo: tipo de arquivo: csv|xls|xlsx|json etc
    :param date_f: lista com os nomes dos campos a ser convertidos para datetime 
    :param kwargs: argumentos especificos para a carga do arquivo, conforme o parametro 'tipo'
    :return: Pandas DataFrame
    '''

    if tipo == 'csv':
        
        if 'chunksize' in kwargs:
            
            kwargs['iterator'] = True
            data = pd.DataFrame()
            _df_chunked = pd.read_csv(data_url, **kwargs)            
            for _df in _df_chunked:
                
                data = data.append(_df)
        else:            
            data = pd.read_csv(data_url, **kwargs)
            
    elif tipo == 'xls' or tipo == 'xlsx':
        
        data = pd.read_excel(data_url, **kwargs)
        
    elif tipo == 'json':
        
        import json
        
        with open(data_url) as f:
            data = json.load(f)
            
        return data
    
    else:
        if 'chunksize' in kwargs:
            
            kwargs['iterator'] = True
            data = pd.DataFrame()
            _df_chunked = pd.read_table(data_url, **kwargs)            
            for _df in _df_chunked:
                
                data = data.append(_df)
        else:            
            data = pd.read_table(data_url, **kwargs)
    
    
    for dt_field in date_f:
        data[dt_field] = pd.to_datetime(data[dt_field])
        
    return data

In [None]:
# Funções de Transformação dos dados

async def transform_dfbr(df, cols:list= []) -> 'DataFrame':
    '''
    Função para transformar o DataFrame df_br, filtra os dados para o Brasil, seleciona colunas especificadas
    e adiciona novas colunas
    :param df: dataframe com dados sobre a covid do Brasil
    :param cols: lista com os nomes das colunas que deverão conter no DataFrame de retorno. Não sendo informado,
    retorna todas as colunas do DataFrame.    
    :return: Pandas DataFrame
    '''
    
    colunas = cols if len(cols) > 0 else df.columns.to_list()
    
    # Filtra os dados para o Brasil e seleciona colunas específicas
    _df_BR = df.query("state == 'TOTAL'")[colunas]

    # cria novas colunas
    _df_BR['activeCases'] = _df_BR['totalCases'] - _df_BR['deaths'] - _df_BR['recovered']
    _df_BR['activeCasesMS'] = _df_BR['totalCasesMS'] - _df_BR['deathsMS'] - _df_BR['recovered']
    _df_BR['activeCasesDiff'] = _df_BR['activeCases'] - _df_BR['activeCasesMS']
    _df_BR['deathsDiff'] = _df_BR['deaths'] - _df_BR['deathsMS']
    _df_BR['newVaccinated'] = _df_BR['vaccinated'].diff()
    _df_BR['newVaccinated_second'] = _df_BR['vaccinated_second'].diff()
    
    return _df_BR


async def transform_popuf(df_munic) -> 'DataFrame':
    '''
    Função para transformar o DataFrame df_popmunic, trata o dado de POPULAÇÃO ESTIMADA e retonar outro dataframe 
    agregado por UF
    :param df_munic: dataframe com dados populacionais dos municípios
    :return: Pandas DataFrame
    '''
    
    df_munic['POPULAÇÃO ESTIMADA'] = df_munic['POPULAÇÃO ESTIMADA'].apply(lambda x: str(x).split('(')[0])
    df_munic['POPULAÇÃO ESTIMADA'] = df_munic['POPULAÇÃO ESTIMADA'].astype(int)
    _popuf = df_munic[['UF', 'POPULAÇÃO ESTIMADA']].groupby('UF').sum().reset_index()
    
    return _popuf


async def transform_dfcities(df_cities, df_gps_cities) -> 'DataFrame':
    '''
    Função para transformar o DataFrame df_cities, acrescentando informações de latitude e longitude a partir do df_gps_cities
    :param df_cities: dataframe com dados de covid por município
    :param df_gps_cities: dataframe com dados de coordenadas geográficas dos municípios
    :return: Pandas DataFrame
    '''
    
    # removendo as linhas cujo campo ibgeID está faltando
    df_gps_cities = df_gps_cities.dropna(subset=['ibgeID'])
    
    # convertendo o tipo da coluna ibeID do df_gps_cities para o mesmo tipo da coluna ibgeID do df_cities
    df_gps_cities['ibgeID'] = df_gps_cities['ibgeID'].astype(int)
    
    # definindo as colunas 'lat' e 'lon' no df_cities com base no 'ibgeID' do df_gps_cities
    df_cities['lat'] = df_cities['ibgeID'].map(df_gps_cities.set_index('ibgeID')['lat'])
    df_cities['lon'] = df_cities['ibgeID'].map(df_gps_cities.set_index('ibgeID')['lon'])
    
    # filtra pela data mais recente
    _df = df_cities.query('date == @df_cities.date.max()')
    
    return _df
    

In [36]:
# Setando variáveis
url_br = "https://raw.githubusercontent.com/wcota/covid19br/master/cases-brazil-states.csv"
url_cities = "https://github.com/wcota/covid19br/blob/master/cases-brazil-cities-time.csv.gz?raw=true"
url_popmunic = 'datasets/originais/populacao_2020.xls'
url_gpscities = "https://raw.githubusercontent.com/wcota/covid19br/master/gps_cities.csv"
url_geojson_br = 'geojson/brasil-uf-compressed.json'
chunk_size = 50000

In [4]:
import time

In [37]:
# Testantdo a função load_data

D_ARGS = {
    'df_br': dict(data_url=url_br, date_f=['date']),
    'df_cities': dict(data_url=url_cities, date_f=['date'], compression='gzip', chunksize=chunk_size),
    'df_popmunic': dict(data_url=url_popmunic, tipo='xls', sheet_name='Municípios', skiprows=1, skipfooter=16),
    'df_gpscities': dict(data_url=url_gpscities),
    'gj_br': dict(data_url=url_geojson_br, tipo='json'),
}

datasets = {}

start = time.perf_counter()

with ThreadPoolExecutor() as executor:
    
    # Inicia as tarefas de carga e atribui a cada tarefa o nome da chave correspondente a cada conjunto de dados
    future_data_loader = {executor.submit(load_data, **valor): chave for chave, valor in D_ARGS.items()}
    
    for task in as_completed(future_data_loader):
        
        try:
            datasets[future_data_loader[task]] = task.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (future_data_loader[task], exc))
        else:
            print('dataset %r loaded - %d rows' % (future_data_loader[task], len(datasets[future_data_loader[task]])))


end = time.perf_counter()
print("tempo de execução {}".format(end-start))

dataset 'gj_br' loaded - 3 rows
dataset 'df_popmunic' loaded - 5570 rows
dataset 'df_br' loaded - 12138 rows
dataset 'df_cities' loaded - 2092289 rows
tempo de execução 41.14326110000002


In [None]:
# Testantdo as funções de transformação
start = time.perf_counter()

# df_br = await load_data(data_url=url_br, date_f=['date']) # tempo de execução 0.38707780838012695
# df_cities = await load_data(data_url=url_cities, date_f=['date'], compression='gzip', iterator=True, chunksize=chunk_size) # tempo de execução 34.51947283744812
# df_popmunic = await load_data(data_url=url_popmunic, tipo='xls', sheet_name='Municípios', skiprows=1, skipfooter=16) # tempo de execução 0.37621641159057617

df_br, df_cities, df_popmunic = await asyncio.gather(

)

end = time.perf_counter()
print("tempo de execução {}".format(end-start))

In [None]:
df_cities.shape