## Trabalhando com Big Data usando Dask

In [None]:
# Instalando dask
#!pip install dask

#Link para download do dataframe usado nesta aula (kaggle)
# https://www.kaggle.com/datasets/ealtman2019/ibm-transactions-for-anti-money-laundering-aml

### Bibliotecas

In [None]:
import pandas as pd
import dask.dataframe as dd
import numpy as np

In [None]:
# Configurando pandas pra que mostre somente float com 2 casas decimais e evite notação científica (1min) 50.000.000.000 5e10
pd.options.display.float_format = "{:,.2f}".format

### Lendo Bigdata com Pandas
- Podemos passar o parametro nrows=x para ler somente x linhas em um read_csv
- Mas vamos tentar ler toda a base de dados pra ver o que acontece

In [None]:
caminho = r'C:\Users\devda\Documents\GitHub\01-DataFrames\Big Data financeiro'

dfp = pd.read_csv(f'{caminho}\LI-Large_Trans.csv', nrows=5000) # Chamamos de chunk = pedaço
dfp.head(3)

### Fazendo análise com pandas por chunks

In [None]:
colunas = ['Payment Format', 'Receiving Currency', 'Amount Received']

dfp_conc = pd.DataFrame() # dataframe vazio
cont = 0

for dfp_chunk in pd.read_csv(f'{caminho}\LI-Large_Trans.csv', chunksize=5000):

    dfp_chunk = dfp_chunk.groupby(['Payment Format'])['Amount Received'].sum().rename('total').to_frame().reset_index()
    dfp_conc = pd.concat([dfp_conc, dfp_chunk])
    display(dfp_chunk)
    cont += 1
    if cont = 2:
        break

print("---------------------Concatenou-----------------------")
display(dfp_conc)
print("------------------Agrupou novamente-------------------")
tabela_final = dfp_conc.groupby(['Payment Format'])['total'].sum().rename('total').to_frame().reset_index()
display(tabela_final)


### Objetivo (Primeiro faça com Dask):
- Somar todos os  valores recebidos em dolar dos diversos formatos de pagamento.

In [None]:
# separando colunas que iremos trabalhar
colunas = ['Payment Format', 'Receiving Currency', 'Amount Received']
dfp = dfp[colunas]

dfp_dolar = dfp.loc[dfp['Receiving Currency'] == 'US Dollar']

dfp_objetivo = dfp_dolar.groupby(['Payment Format', 'Receiving Currency'])['Amount Received'].sum().rename("total received").to_frame().reset_index()

dfp_sorted = dfp_objetivo.sort_values(by=["total received"], ascending=True)

display(dfp_sorted)



### O que é o dask?

Dask DataFrame é uma estrutura de dados distribuída e paralelizada para manipulação de dados em grandes conjuntos de dados. O Dask DataFrame é semelhante ao pandas DataFrame, mas com a diferença de que ele é projetado para trabalhar em conjunto com o Dask, um framework de computação paralela.

O Dask DataFrame é composto por várias partições, que são fragmentos dos dados que são distribuídos em vários nós ou máquinas em um cluster. Cada partição contém uma seção dos dados, e essas partições são processadas em paralelo para acelerar o processamento de grandes conjuntos de dados.

As partições no Dask DataFrame são criadas automaticamente com base no tamanho dos dados e nos recursos do sistema disponíveis. Quanto mais recursos estiverem disponíveis, mais partições serão criadas, o que pode acelerar ainda mais o processamento. No entanto, muitas partições também podem levar a um overhead excessivo, pois há um custo associado à coordenação de muitas tarefas.

O Dask DataFrame permite que os usuários trabalhem com grandes conjuntos de dados que não cabem na memória de um único computador. Ele também oferece recursos para manipulação de dados distribuídos, processamento paralelo, cálculo em cluster e escalabilidade, tornando-o uma ferramenta útil para análise de dados em larga escala.

Lembre-se de que, ao usar o Dask, é importante escolher a estrutura de dados correta com base no tipo de operação que você deseja realizar. Para trabalhar com arrays distribuídos, use o dask.array, para dataframes distribuídos, use o dask.dataframe e para bags distribuídos, use o dask.bag.

### Lendo Bigdata com dask
- Agora vamos tentar ler com dask mas usando computer()
- Sem compute() vai haver estouro de memoria como no pandas

In [None]:
df = dd.read_csv(f'{caminho}\LI-Large_Trans.csv')

In [None]:
# carrega informações básicas do df (3min)
df.persist()

### Etapa inicial: Limpesa dos dados
- Tratar dados ausentes (missing values)
- Padronizar dados (conversão dos dados)
- Verificar consistência dos dados (dados condizentes, negativos, infinitos, etc)
- Remover Outliers
- Normalização (escala)


### Vamos começar verificando missing values
- Note que isnull() não vai servir, pois ainda não temos informações sobre os dados, a não ser que carregue todos os dados pra memória com o compute(), mas isso vai levar a estouro

In [None]:
df.isnull().sum()

### Solução
- Temos que definir o que queremos (objetivo) e separar somente os dados necessários da tabela, ai sim trabalhamos com aquele conjunto de dado restrito.

Objetivo:
- Somar todos os  valores recebidos em dolar dos diversos formatos de pagamento.

In [None]:
# separando colunas que iremos trabalhar
colunas = ['Payment Format', 'Receiving Currency', 'Amount Received']
# criando um df com o necessário
df_rec = df[colunas]

In [None]:
# Verificando nulos (1min)
df_rec.isnull().sum().compute()

Não detectamos missing values nas colunas, podemos prosseguir com a análise.
As colunas estão com tipos de dados corretos.


In [None]:
df_rec.describe().compute()

Os dados estão consistentes.

### Agora vamos executar o objetivo:
- Somar todos os  valores recebidos em dolar dos diversos formatos de pagamento.

In [None]:
# Filtrando somente dollar (1min)
df_dolar = df_rec.loc[df_rec['Receiving Currency'] == 'US Dollar'].compute()

In [None]:
# Salve o resultado em uma nova tabela assim poderá salva-la evitando nova consulta custosa. (6seg)
df_objetivo = df_dolar.groupby(['Payment Format', 'Receiving Currency'])['Amount Received'].sum().sort_values().rename('Total Received Amount').to_frame().reset_index()

In [None]:
display(df_objetivo)

OBS: ordenar uma tabela so pode ser realizado apos um compute() pois lembre-se que com dask não sabe quem são os dados, somente apos carregar em memória e "saber" sobre os dados podemos ordena-lo.