# Sumarização eficiente de dados para dashboards

Este notebook tem como objetivo demonstrar técnicas para sumarizar dados de forma eficiente, facilitando cálculos de média, desvio padrão e contagem de valores únicos em um dashboard. Ao utilizar essas técnicas, podemos reduzir o consumo de memória do computador ou cluster e aumentar a velocidade de processamento, tornando as análises mais dinâmicas e eficazes.

As fórmulas abaixo estão divididas da seguinte forma: à esquerda do ponto e vírgula temos as estatísticas da População, e à direita temos as estatísticas da Amostra.

## Média

Para calcular a média de dados sumarizados precisamos ter uma variável com a soma dos valores e outra variável com a quantidade deles. A média, tanto da População quanto da Amostra, é calculada pela divisão da soma dos valores pela soma das quantidades:

$$
\mu = \frac{1}{N} \sum_{i=1}^{N} x_{i} \text{ ; } \bar{x} = \frac{1}{n} \sum_{i=1}^{n} x_{i} \\~\\
S = \sum_{i=1}^{N} x_{i} \\~\\
\mu = \frac{S}{N} \text{ ; } \bar{x} = \frac{S}{n}
$$

Onde:

* $ \mu $ é a média da População.
* $ \bar{x} $ é a média da Amostra.
* $ N $ é a quantidade de valores da População.
* $ n $ é a quantidade de valores da Amostra.
* $ x_i $ é cada valor da População ou da Amostra.

## Desvio padrão

Para calcular o desvio padrão, tanto da População quanto da Amostra, são necessárias as mesmas variáveis utilizadas para calcular a média, além de uma variável com a soma dos valores ao quadrado (não confundir com a soma ao quadrado dos valores). Dado que a fórmula do desvio padrão é a seguinte:

$$
\sigma = \sqrt{\frac{1}{N} \sum_{i=1}^{N} \left(x_{i} - \mu\right)^2} \text{ ; } s = \sqrt{\frac{1}{n-1} \sum_{i=1}^{n} \left(x_{i} - \bar{x}\right)^2}
$$

Podemos inferir que:

$$
\sigma = \sqrt{\frac{1}{N} \sum_{i=1}^{N} \left(x_{i} - \frac{S}{N}\right)^2} \text{ ; } s = \sqrt{\frac{1}{n-1} \sum_{i=1}^{n} \left(x_{i} - \frac{S}{n}\right)^2} \\~\\
\sigma = \sqrt{\frac{1}{N} \sum_{i=1}^{N} \left(x_{i}^2 - 2x_{i}\frac{S}{N} + \left(\frac{S}{N}\right)^2\right)} \text{ ; } s = \sqrt{\frac{1}{n-1} \sum_{i=1}^{n} \left(x_{i}^2 - 2x_{i}\frac{S}{n} + \left(\frac{S}{n}\right)^2\right)} \\~\\
\sigma = \sqrt{\frac{1}{N} \left( \sum_{i=1}^{N} x_{i}^2 - \sum_{i=1}^{N} 2x_{i}\frac{S}{N} + \sum_{i=1}^{N} \left(\frac{S}{N}\right)^2 \right)} \text{ ; } s = \sqrt{\frac{1}{n-1} \left( \sum_{i=1}^{n} x_{i}^2 - \sum_{i=1}^{n} 2x_{i}\frac{S}{n} + \sum_{i=1}^{n} \left(\frac{S}{n}\right)^2 \right)} \\~\\
\sigma = \sqrt{\frac{1}{N} \left( \sum_{i=1}^{N} x_{i}^2 - 2 \frac{S}{N} \sum_{i=1}^{N} x_{i} + N \frac{S^2}{N^2} \right)} \text{ ; } s = \sqrt{\frac{1}{n-1} \left( \sum_{i=1}^{n} x_{i}^2 - 2 \frac{S}{n} \sum_{i=1}^{n} x_{i} + n \frac{S^2}{n^2} \right)} \\~\\
\sigma = \sqrt{\frac{1}{N} \left( \sum_{i=1}^{N} x_{i}^2 - 2 \frac{S}{N} S + \frac{S^2}{N}\right)} \text{ ; } s = \sqrt{\frac{1}{n-1} \left( \sum_{i=1}^{n} x_{i}^2 - 2 \frac{S}{n} S + \frac{S^2}{n}\right)} \\~\\
\sigma = \sqrt{\frac{1}{N} \left( \sum_{i=1}^{N} x_{i}^2 - 2 \frac{S^2}{N} + \frac{S^2}{N}\right)} \text{ ; } s = \sqrt{\frac{1}{n-1} \left( \sum_{i=1}^{n} x_{i}^2 - 2 \frac{S^2}{n} + \frac{S^2}{n}\right)} \\~\\
\sigma = \sqrt{\frac{1}{N} \left( \sum_{i=1}^{N} x_{i}^2 - \frac{S^2}{N}\right)} \text{ ; } s = \sqrt{\frac{1}{n-1} \left( \sum_{i=1}^{n} x_{i}^2 - \frac{S^2}{n}\right)}
$$

Onde:

* $ \sigma $ é o desvio padrão da População.
* $ s $ é o desvio padrão da Amostra.

## Contagem de valores únicos

Para calcular uma estimativa da quantidade de valores únicos para grandes conjuntos de dados podemos utilizar o HyperLogLog. O HyperLogLog é um algoritmo que funciona dividindo os elementos em "buckets" com base nos primeiros bits de seus hashes e, em seguida, estimando a cardinalidade com base no maior valor de bits consecutivos observados em cada "bucket".

1. Divisão em buckets:
    * Os elementos do conjunto de dados são hashizados, geralmente usando funções de hash como o MurmurHash.
    * Os bits iniciais dos hashes são usados para determinar em qual "bucket" cada elemento será colocado.

1. Estimativa da cardinalidade:
    * Para cada "bucket", o HyperLogLog registra o comprimento do maior sufixo de zeros no valor binário do hash.
    * A estimativa da cardinalidade total é feita combinando as informações de todos os "buckets" usando a fórmula de estimação do HyperLogLog.

A fórmula de estimação do HyperLogLog é:

$$
E = \alpha \cdot m^2 \cdot \left(\sum_{j=1}^{m}{2^{-M[j]}}\right)^{-1}
$$

Onde:

* $ E $ é a estimativa da cardinalidade total.
* $ m $ é o número de "buckets".
* $ M[j] $ é o valor registrado no "bucket".
* $ \alpha $ é uma constante de correção que depende do valor de m.

A constante de correção $ \alpha $ é usada para compensar o viés introduzido pelo algoritmo e é definida como:

$$
\alpha = \begin{cases}
    0,7213 / (1 + 1,079/m) \quad \quad \text{ se } m \leq 64 \\
    0,7213 / (1 + 1,079/m)^{1,3} \quad \text{ se } m > 64
\end{cases}
$$

O HyperLogLog é eficaz para estimar a cardinalidade de conjuntos de dados muito grandes com uma quantidade relativamente pequena de memória. Ele oferece uma boa precisão com um consumo de memória constante, independentemente do tamanho do conjunto de dados original. Vamos utilizar o HyperLogLog++, que propõe diversas melhorias no algoritmo HyperLogLog para reduzir os requisitos de memória e aumentar a precisão em algumas faixas de cardinalidades:

* Uma função de hash de 64 bits é usada em vez de uma de 32 bits. Isso reduz as colisões de hash para cardinalidades grandes, permitindo remover a correção de grande alcance;
* Alguns vieses são encontrados para cardinalidades pequenas ao mudar da contagem linear para a contagem HyperLogLog. Uma correção de viés empírica é proposta para mitigar o problema;
* Uma representação esparsa dos registros é proposta para reduzir os requisitos de memória para cardinalidades pequenas, que podem ser posteriormente transformadas em uma representação densa se a cardinalidade aumentar.

Por sorte, existem bibliotecas prontas em Python para utilizar o HyperLogLog++, de forma que não precisamos criar funções para efetuar os cálculos mencionados acima. Vamos utilizar o seguinte: https://ekzhu.com/datasketch/hyperloglog.html

In [51]:
# Importa as bibliotecas necessárias
import re
import time
import copy
import hashlib
import numpy as np
import pandas as pd
from typing import Union
from string import ascii_uppercase
from datasketch import HyperLogLogPlusPlus

In [52]:
# Define todas as funções que serão utilizadas
def gerar_chave(id: int, segredo: str = '_&@~*$*') -> str:
    id_str = str(id) + segredo
    chave = hashlib.sha256(id_str.encode()).hexdigest()[:32]
    chave = re.sub(r'(.{8})(.{4})(.{4})(.{4})(.{12})', r'\1-\2-\3-\4-\5', chave.upper())
    return chave

def media_sumarizados(soma: Union[int, float], quantidade: Union[int, float]) -> float:
    soma = float(soma)
    quantidade = float(quantidade)
    media = np.divide(soma, quantidade)

    return media

def desvio_padrao_sumarizados(soma: Union[int, float], quantidade: Union[int, float], soma_dados_ao_quadrado: Union[int, float], amostra: bool = False) -> float:
    soma = float(soma)
    soma_dados_ao_quadrado = float(soma_dados_ao_quadrado)
    quantidade = float(quantidade)

    soma_ao_quadrado_dados = np.power(soma, 2)
    media_dados_ao_quadrado = np.divide(soma_ao_quadrado_dados, quantidade)
    quantidade = quantidade - 1 if amostra else quantidade
    desvio_padrao = np.sqrt(np.divide(soma_dados_ao_quadrado - media_dados_ao_quadrado, quantidade))

    return desvio_padrao

def hyper_log_log_update_pandas(df: Union[pd.core.groupby.generic.SeriesGroupBy, pd.core.groupby.generic.DataFrameGroupBy], coluna_calculo: str = '', precisao: int = 16) -> Union[pd.DataFrame, pd.Series]:
    valores = df[coluna_calculo] if coluna_calculo else df
    hll = HyperLogLogPlusPlus(p=precisao)
    
    for valor in valores:
        hll.update(str(valor).encode('utf-8'))

    return hll

def hyper_log_log_merge_pandas(df: Union[pd.DataFrame, pd.Series], coluna_hll: str = '') -> HyperLogLogPlusPlus:
    hlls = df[coluna_hll] if coluna_hll else df
    hll_total = None

    for hll in hlls:
        if hll_total is None:
            hll_total = copy.copy(hll)
        else:
            hll_total.merge(hll)
    
    return hll_total

def espaco_dataframe_memoria(df: pd.DataFrame) -> str:
    n = 0
    potencia = 2**10
    total_bytes = df.memory_usage(deep=True).sum()
    categorias_potencias = {0: '', 1: 'K', 2: 'M', 3: 'G', 4: 'T'}

    while total_bytes > potencia and n < 4:
        total_bytes /= potencia
        n += 1
        
    return f'{total_bytes:_.2f} {categorias_potencias[n]}B'.replace('.', ',').replace('_', '.')

def diferenca_bytes_dataframes(df_bruto: pd.DataFrame, df_sumarizado: pd.DataFrame) -> str:
    total_bytes_df_bruto = df_bruto.memory_usage(deep=True).sum()
    total_bytes_df_sumarizado = df_sumarizado.memory_usage(deep=True).sum()
    diferenca_percentual_bytes = total_bytes_df_sumarizado / total_bytes_df_bruto

    return f'O DataFrame bruto possui {espaco_dataframe_memoria(df_bruto)}.\n' + \
           f'O DataFrame sumarizado possui {espaco_dataframe_memoria(df_sumarizado)}.\n' + \
           f'O DataFrame sumarizado possui {diferenca_percentual_bytes:_.6%} do peso do DataFrame bruto, uma redução de {(1 - diferenca_percentual_bytes):_.6%}'.replace('.', ',').replace('_', '.') + '.'

In [53]:
# Cria uma massa de dados aleatorios
quantidade = 1_000_000
dados = np.random.randint(0, 10_000, quantidade)
dados = dados.astype(float)
dados

array([4829., 4467., 3512., ..., 2543., 9469., 4595.])

In [None]:
# Calcula a média e o desvio padrão da População
media_populacao = dados.mean()
desvio_padrao_populacao = dados.std(ddof=0)

print(f'média: {media_populacao:_.2f}'.replace('.', ',').replace('_', '.'))
print(f'desvio padrão: {desvio_padrao_populacao:_.2f}'.replace('.', ',').replace('_', '.'))

In [None]:
# Calcula o desvio padrão da População com os dados sumarizados
soma_dados = dados.sum()
soma_dados_ao_quadrado = np.power(dados, 2).sum()
estatisticas_dados_sumarizados = obtemMediaDesvioPadraoDadosSumarizados(
    soma=soma_dados,
    soma_dados_ao_quadrado=soma_dados_ao_quadrado,
    quantidade=quantidade,
    amostra=False)

print(f'média com os dados sumarizados: {estatisticas_dados_sumarizados[0]:_.2f}'.replace('.', ',').replace('_', '.'))
print(f'desvio padrão com os dados sumarizados: {estatisticas_dados_sumarizados[1]:_.2f}'.replace('.', ',').replace('_', '.'))

In [54]:
# Divide os dados em partições e cria um DataFrame
qtde_categorias = len(ascii_uppercase) 
probabilidades = np.arange((qtde_categorias * 2) + 2, 2, -2)
probabilidades = (1 / probabilidades.sum()) * probabilidades

dados_categorizados = pd.DataFrame(dict(categoria=np.random.choice(list(ascii_uppercase), quantidade, p=probabilidades), quantidade=dados))
dados_categorizados

Unnamed: 0,categoria,quantidade
0,L,4829.0
1,C,4467.0
2,L,3512.0
3,Y,3199.0
4,S,3946.0
...,...,...
999995,U,1921.0
999996,T,3914.0
999997,H,2543.0
999998,Q,9469.0


In [98]:
# Insere datas no DataFrame
data_inicial = np.datetime64('2019-12-29')
data_final = np.datetime64('2023-12-30')
qtde_dias = (data_final - data_inicial).item().days
todas_datas = np.arange(0, qtde_dias + 1, 1) + data_inicial

probabilidades = np.array([0.95 ** i for i in range(qtde_dias + 1, 0, -1)])
probabilidades = (1 / probabilidades.sum()) * probabilidades

if 'data' in dados_categorizados.columns:
    dados_categorizados.drop(columns=['data'], inplace=True)

dados_categorizados.insert(0, 'data', np.random.choice(todas_datas, quantidade, p=probabilidades))
dados_categorizados

Unnamed: 0,data,categoria,consumidor,quantidade
0,2023-10-16,L,C825C923-FE99-EDF8-76FF-BD60186D8C0D,4829.0
1,2023-12-30,C,7CC94E71-F117-6241-1F1C-348C58E6208B,4467.0
2,2023-12-25,L,AC3C786E-A2D7-0F21-6EB3-2FDBB02DB3EA,3512.0
3,2023-11-17,Y,3DFAE9D4-7D37-D5F5-5ED4-ECCDD70F153F,3199.0
4,2023-10-30,S,4A20D973-26D0-AFDA-647E-05DBEAE9062E,3946.0
...,...,...,...,...
999995,2023-12-21,U,236CFF68-6360-B518-BCDE-6E8849E51F60,1921.0
999996,2023-12-29,T,B810DC52-374C-C9EE-FEB8-F68DEF49913F,3914.0
999997,2023-12-30,H,483CD49F-B7C2-C2FB-C931-869CDF087DA7,2543.0
999998,2023-12-06,Q,50109C03-BE1A-AE7A-A7F8-4C40A1BD6763,9469.0


In [56]:
# Insere consumidores aleatórios no DataFrame
chaves_consumidores = np.arange(0, (quantidade * 4) // 5, 1)
gerar_chave_numpy = np.vectorize(gerar_chave)
chaves_consumidores = gerar_chave_numpy(chaves_consumidores)

if 'consumidor' in dados_categorizados.columns:
    dados_categorizados.drop(columns=['consumidor'], inplace=True)

dados_categorizados.insert(2, 'consumidor', np.random.choice(chaves_consumidores, quantidade))
dados_categorizados

Unnamed: 0,data,categoria,consumidor,quantidade
0,2022-05-06,L,C825C923-FE99-EDF8-76FF-BD60186D8C0D,4829.0
1,2022-07-20,C,7CC94E71-F117-6241-1F1C-348C58E6208B,4467.0
2,2021-11-18,L,AC3C786E-A2D7-0F21-6EB3-2FDBB02DB3EA,3512.0
3,2020-12-03,Y,3DFAE9D4-7D37-D5F5-5ED4-ECCDD70F153F,3199.0
4,2021-09-03,S,4A20D973-26D0-AFDA-647E-05DBEAE9062E,3946.0
...,...,...,...,...
999995,2021-04-19,U,236CFF68-6360-B518-BCDE-6E8849E51F60,1921.0
999996,2022-05-16,T,B810DC52-374C-C9EE-FEB8-F68DEF49913F,3914.0
999997,2021-10-27,H,483CD49F-B7C2-C2FB-C931-869CDF087DA7,2543.0
999998,2022-02-24,Q,50109C03-BE1A-AE7A-A7F8-4C40A1BD6763,9469.0


In [57]:
dados_categorizados.groupby('categoria', as_index=False).consumidor.nunique()

Unnamed: 0,categoria,consumidor
0,A,68145
1,B,66358
2,C,63829
3,D,61038
4,E,59490
5,F,56209
6,G,53767
7,H,51489
8,I,48542
9,J,46277


In [58]:
dados_hll = dados_categorizados.groupby('categoria', as_index=False).consumidor.apply(hyper_log_log_update_pandas)

In [61]:
dados_hll.consumidor.apply(lambda x: x.count().astype(int))

0     68097
1     66326
2     63608
3     61261
4     59554
5     56206
6     53568
7     51734
8     48608
9     46482
10    43900
11    40969
12    39072
13    36374
14    33647
15    31050
16    28765
17    26077
18    23495
19    20997
20    18362
21    15815
22    13172
23    10572
24     7803
25     5341
Name: consumidor, dtype: int64

In [104]:
dados_categorizados.groupby('categoria', as_index=False).consumidor.nunique().assign(
    hll=dados_hll.consumidor.apply(lambda x: x.count().astype(int)),
    delta=lambda x: (np.divide(x.hll, x.consumidor) - 1).apply(lambda y: f'{y:.10%}'.replace('.', ','))
)

Unnamed: 0,categoria,consumidor,hll,delta
0,A,68145,68097,"-0,0704380365%"
1,B,66358,66326,"-0,0482232738%"
2,C,63829,63608,"-0,3462376036%"
3,D,61038,61261,"0,3653461778%"
4,E,59490,59554,"0,1075811061%"
5,F,56209,56206,"-0,0053372236%"
6,G,53767,53568,"-0,3701154984%"
7,H,51489,51734,"0,4758297889%"
8,I,48542,48608,"0,1359647316%"
9,J,46277,46482,"0,4429846360%"


In [60]:
hyper_log_log_merge_pandas(dados_hll, 'consumidor').count().astype(int)

573951

In [62]:
dados_categorizados.consumidor.nunique()

570953

In [107]:
print(f'''{(np.divide(hyper_log_log_merge_pandas(
    dados_hll, 'consumidor').count().astype(int),
    dados_categorizados.consumidor.nunique()
) - 1):.10%}'''.replace('.', ','))

0,5250870037%


In [None]:
# Calcula a média e o desvio padrão amostral a partir dos dados originais
inicio_df_bruto = time.time()
media_e_desvio_padrao_amostral = dados_categorizados.groupby('categoria', as_index=False).agg(
    media=('quantidade', lambda x: x.mean()),
    desvio_padrao=('quantidade', lambda x: x.std(ddof=1))
)
fim_df_bruto = time.time()
tempo_df_bruto = fim_df_bruto - inicio_df_bruto

print(f'Tempo de execução: {tempo_df_bruto:.6f} segundos'.replace('.', ',') + '.')
display(media_e_desvio_padrao_amostral)

In [None]:
# Sumariza os dados para ocupar menos espaço na memória
dados_sumarizados = dados_categorizados.groupby('categoria', as_index=False).agg(
    soma=('quantidade', lambda x: x.sum()),
    soma_ao_quadrado=('quantidade', lambda x: np.power(x, 2).sum()),
    quantidade=('quantidade', lambda x: len(x)))

dados_sumarizados = dados_sumarizados.astype({
    'categoria': str,
    'soma': float,
    'soma_ao_quadrado': float,
    'quantidade': float
})

dados_sumarizados

In [None]:
# Quantidade de bytes que os DataFrames estão ocupando na memória e percentual de redução de um DataFrame para o outro
print(obtemDiferencaBytesEntreDataFrames(dados_categorizados, dados_sumarizados))

In [None]:
# Calcula a média e o desvio padrão amostral a partir dos dados sumarizados
inicio_df_sumarizado = time.time()
media_e_desvio_padrao_amostral_sumarizados = dados_sumarizados.apply(lambda x: obtemMediaDesvioPadraoDadosSumarizados(
    soma=x['soma'],
    soma_dados_ao_quadrado=x['soma_ao_quadrado'],
    quantidade=x['quantidade'],
    amostra=True), axis=1, result_type='expand')
fim_df_sumarizado = time.time()
tempo_df_sumarizado = fim_df_sumarizado - inicio_df_sumarizado
delta_tempo = tempo_df_sumarizado / tempo_df_bruto

print(f'Tempo de execução: {tempo_df_sumarizado:.6f} segundos'.replace('.', ',') + '.')
print(f'A execução no DataFrame sumarizado gastou {delta_tempo:.6%} do tempo de execução no DataFrame bruto, uma diferença de {(1 - delta_tempo):.6%}'.replace('.', ',') + '.')
display(media_e_desvio_padrao_amostral_sumarizados)

In [None]:
# Insere a categoria no DataFrame
media_e_desvio_padrao_amostral_sumarizados.insert(0, 'categoria', dados_sumarizados.categoria)
media_e_desvio_padrao_amostral_sumarizados.rename(columns={
    0: 'media_dados_sumarizados',
    1: 'desvio_padrao_dados_sumarizados'
}, inplace=True)
media_e_desvio_padrao_amostral_sumarizados

In [None]:
# Verifica se os valores batem
media_e_desvio_padrao_amostral = media_e_desvio_padrao_amostral.merge(media_e_desvio_padrao_amostral_sumarizados, how='inner', on='categoria')
media_e_desvio_padrao_amostral['valores_iguais'] = (media_e_desvio_padrao_amostral.media.round(2) == media_e_desvio_padrao_amostral.media_dados_sumarizados.round(2)) & \
                                                   (media_e_desvio_padrao_amostral.desvio_padrao.round(2) == media_e_desvio_padrao_amostral.desvio_padrao_dados_sumarizados.round(2))
media_e_desvio_padrao_amostral

## Vantagens

Além de ser possível calcular a média e o desvio padrão a partir de dados sumarizados, o que garante uma redução absurda na alocação de memória (conforme visto acima) e também no tempo de execução, podemos aproveitar a tabela sumarizada para calcular mais facilmente e rapidamente a média e o desvio padrão entre outros grupos a partir dos dados sumarizados. Seguindo o exemplo acima, vamos agrupar as categorias em cinco super categorias, e então calcular suas médias e desvio padrão.

In [None]:
# Obtém as cinco super categorias
super_categorias = dict(enumerate(ascii_uppercase, 0))
super_categorias = {x: f'Categoria {i % 5}' for i, x in super_categorias.items()}
super_categorias

In [None]:
dados_sumarizados.insert(1, 'super_categorias', dados_sumarizados.categoria.map(super_categorias))
dados_sumarizados

In [None]:
# Calcula a média e o desvio padrão de cada super categoria
inicio_sumarizado = time.time()

media_e_desvio_padrao_super_categorias_sumarizados = dados_sumarizados.groupby('super_categorias', as_index=False).sum(numeric_only=True).apply(lambda x: obtemMediaDesvioPadraoDadosSumarizados(
    soma=x['soma'],
    soma_dados_ao_quadrado=x['soma_ao_quadrado'],
    quantidade=x['quantidade'],
    amostra=True), axis=1, result_type='expand')
fim_df_sumarizado = time.time()
tempo_df_sumarizado = fim_df_sumarizado - inicio_sumarizado

print(f'Tempo de execução: {tempo_df_sumarizado:.6f} segundos'.replace('.', ',') + '.')
display(media_e_desvio_padrao_super_categorias_sumarizados)

In [None]:
# Insere as categorias no DataFrame
media_e_desvio_padrao_super_categorias_sumarizados.insert(0, 'categorias', ['Categoria 0', 'Categoria 1', 'Categoria 2', 'Categoria 3', 'Categoria 4'])
media_e_desvio_padrao_super_categorias_sumarizados.rename(columns={
    0: 'media_dados_sumarizados',
    1: 'desvio_padrao_dados_sumarizados'
}, inplace=True)
media_e_desvio_padrao_super_categorias_sumarizados

In [None]:
# Calcula a média e o desvio padrão de cada super categoria a partir dos dados brutos
