In [1]:
import pandas as pd 
import numpy as np 
import dask.dataframe as dd
from pathlib import PurePath
from pathlib import PurePath, Path
import sys
import time
import os
import json
from tqdm import tqdm, tqdm_notebook
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from statsmodels.graphics.tsaplots import plot_acf
%matplotlib inline
import seaborn as sns
sns.set_theme()

In [97]:
def procesar_csv(filepath):
    # Leemos el CSV
    df = pd.read_csv(filepath, header=None)
    
    # Asignamos los nombres de columnas
    df.columns = ['id', 'price', 'qty', 'base_qty', 'time', 'buy_sell']
    
    df = df[df['id'] != 'id']
    
    # Convertimos columnas a numéricas
    df['price'] = pd.to_numeric(df['price'], errors='coerce')
    df['qty'] = pd.to_numeric(df['qty'], errors='coerce')
    df['time'] = pd.to_numeric(df['time'], errors='coerce')
    
    df['buy_sell'] = df['buy_sell'].apply(lambda x: 1 if x in ['True', 'tr'] else (-1 if x in ['False', 'fs'] else float('nan')))
    
    # Eliminamos valores NaN
    df.dropna(subset=['price', 'qty', 'time'], inplace=True)
    
    if df['time'].max() > 1e10:  
        df['time'] = df['time'] / 1000
    
    # Value
    df['value'] = df['price'] * df['qty']
    
    # Datetime
    df['dates'] = pd.to_datetime(df['time'], unit='s')
    
    df['dates'] = df['dates'].astype('datetime64[us]')
    
    # Índice
    df.set_index('dates', inplace=True)
    
    # Eliminamos columnas
    df.drop(columns=['time', 'base_qty', 'buy_sell'], inplace=True)
    
    # Eliminamos duplicados
    df.drop_duplicates(inplace=True)
    
    df['id'] = df['id'].astype(str)
    df['id'] = pd.to_numeric(df['id'], errors='coerce')
    
    return df

# Uso
filepath = r"D:\BTCUSDT-trades-2022-2024\BTCUSDT-trades-2023-10.csv"
df = procesar_csv(filepath)

data_dir = PurePath(r"D:\parquet")

save_par = data_dir / 'BTCUSDT2023-10.parq'

# Verificamos si el archivo ya existe
if not os.path.exists(save_par):
    df.to_parquet(save_par)
    print(f"Archivo guardado en {save_par}")
else:
    print(f"El archivo ya existe en {save_par}. No se guardó una nueva copia.")


  df = pd.read_csv(filepath, header=None)


Archivo guardado en D:\parquet\BTCUSDT2023-10.parq


In [5]:
df = pd.read_parquet(save_par)


df.head(50)

Unnamed: 0_level_0,id,price,qty,value
dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-07-01 00:00:04.059000,3898066116,30460.2,0.004,121.8408
2023-07-01 00:00:04.062000,3898066117,30460.3,0.017,517.8251
2023-07-01 00:00:04.068999,3898066118,30460.3,0.015,456.9045
2023-07-01 00:00:04.076000,3898066119,30460.3,0.012,365.5236
2023-07-01 00:00:04.076999,3898066120,30460.2,0.023,700.5846
2023-07-01 00:00:04.078000,3898066121,30460.2,0.051,1553.4702
2023-07-01 00:00:04.078000,3898066122,30460.2,0.049,1492.5498
2023-07-01 00:00:04.079000,3898066123,30460.2,0.4,12184.08
2023-07-01 00:00:04.082000,3898066124,30460.2,0.211,6427.1022
2023-07-01 00:00:04.082000,3898066125,30460.2,0.158,4812.7116


In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 80040560 entries, 2023-07-01 00:00:04.059000 to 2023-07-31 23:59:59.928000
Data columns (total 4 columns):
 #   Column  Dtype  
---  ------  -----  
 0   id      object 
 1   price   float64
 2   qty     float64
 3   value   float64
dtypes: float64(3), object(1)
memory usage: 3.0+ GB


In [94]:
df = pd.read_parquet(r"D:\parquet\BTCUSDT2023-11.parq")

df['id'] = pd.to_numeric(df['id'], errors='coerce')

df = df.sort_values(by = 'id')

df.head(10)

Unnamed: 0_level_0,id,price,qty,value
dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-11-01 00:00:05.027000,4244484021,34651.4,0.01,346.514
2023-11-01 00:00:05.028999,4244484022,34651.4,0.05,1732.57
2023-11-01 00:00:05.028999,4244484023,34651.4,0.062,2148.3868
2023-11-01 00:00:05.028999,4244484024,34651.4,0.003,103.9542
2023-11-01 00:00:05.028999,4244484025,34651.4,0.116,4019.5624
2023-11-01 00:00:05.032999,4244484026,34651.4,0.001,34.6514
2023-11-01 00:00:05.051000,4244484027,34651.4,0.004,138.6056
2023-11-01 00:00:05.051000,4244484028,34651.4,0.012,415.8168
2023-11-01 00:00:05.055000,4244484029,34651.3,0.001,34.6513
2023-11-01 00:00:05.056000,4244484030,34651.4,0.002,69.3028


In [98]:
df = df.sort_values(by = 'id')

df.head(10)

Unnamed: 0_level_0,id,price,qty,value
dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-10-01 00:00:04.732000,4130731517,26951.0,0.003,80.853
2023-10-01 00:00:04.757999,4130731518,26950.9,0.005,134.7545
2023-10-01 00:00:04.760000,4130731519,26951.0,0.001,26.951
2023-10-01 00:00:04.768999,4130731520,26950.9,0.023,619.8707
2023-10-01 00:00:04.770999,4130731521,26951.0,0.001,26.951
2023-10-01 00:00:04.796000,4130731522,26950.9,0.301,8112.2209
2023-10-01 00:00:04.796000,4130731523,26950.9,0.013,350.3617
2023-10-01 00:00:04.796000,4130731524,26950.9,0.004,107.8036
2023-10-01 00:00:04.796000,4130731525,26950.9,0.028,754.6252
2023-10-01 00:00:04.796000,4130731526,26950.9,0.001,26.9509


In [101]:
df.to_parquet(save_par)

In [119]:
df = pd.read_parquet(r"D:\parquet\BTCUSDT2023-01.parq")

df.head()

Unnamed: 0_level_0,id,price,qty,value
dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-01-01 00:00:04.118000,3166744581,16537.5,0.004,66.15
2023-01-01 00:00:04.177999,3166744582,16537.6,0.116,1918.3616
2023-01-01 00:00:04.177999,3166744583,16537.6,0.034,562.2784
2023-01-01 00:00:04.197999,3166744584,16537.6,0.082,1356.0832
2023-01-01 00:00:04.209000,3166744585,16537.5,0.002,33.075


### Juntar los parquet en un solo df 

In [3]:
# Dask job Monitor Dashboard
from dask.distributed import Client
client = Client() 
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 31.89 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:53031,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 31.89 GiB

0,1
Comm: tcp://127.0.0.1:53052,Total threads: 4
Dashboard: http://127.0.0.1:53056/status,Memory: 7.97 GiB
Nanny: tcp://127.0.0.1:53034,
Local directory: C:\Users\oliva\AppData\Local\Temp\dask-scratch-space\worker-z97gr7d8,Local directory: C:\Users\oliva\AppData\Local\Temp\dask-scratch-space\worker-z97gr7d8

0,1
Comm: tcp://127.0.0.1:53060,Total threads: 4
Dashboard: http://127.0.0.1:53061/status,Memory: 7.97 GiB
Nanny: tcp://127.0.0.1:53035,
Local directory: C:\Users\oliva\AppData\Local\Temp\dask-scratch-space\worker-e2bw990k,Local directory: C:\Users\oliva\AppData\Local\Temp\dask-scratch-space\worker-e2bw990k

0,1
Comm: tcp://127.0.0.1:53051,Total threads: 4
Dashboard: http://127.0.0.1:53054/status,Memory: 7.97 GiB
Nanny: tcp://127.0.0.1:53036,
Local directory: C:\Users\oliva\AppData\Local\Temp\dask-scratch-space\worker-c9tfxk6p,Local directory: C:\Users\oliva\AppData\Local\Temp\dask-scratch-space\worker-c9tfxk6p

0,1
Comm: tcp://127.0.0.1:53053,Total threads: 4
Dashboard: http://127.0.0.1:53058/status,Memory: 7.97 GiB
Nanny: tcp://127.0.0.1:53037,
Local directory: C:\Users\oliva\AppData\Local\Temp\dask-scratch-space\worker-azc3a0ye,Local directory: C:\Users\oliva\AppData\Local\Temp\dask-scratch-space\worker-azc3a0ye


In [3]:
%%time
# Lista de archivos parquet
parquet_files = [
    r"D:\parquet\BTCUSDT2023-01.parq",
    r"D:\parquet\BTCUSDT2023-02.parq",
    r"D:\parquet\BTCUSDT2023-03.parq",
    r"D:\parquet\BTCUSDT2023-04.parq",
    r"D:\parquet\BTCUSDT2023-05.parq",
    r"D:\parquet\BTCUSDT2023-06.parq",
    r"D:\parquet\BTCUSDT2023-07.parq",
    r"D:\parquet\BTCUSDT2023-08.parq",
    r"D:\parquet\BTCUSDT2023-09.parq",
    r"D:\parquet\BTCUSDT2023-10.parq",
    r"D:\parquet\BTCUSDT2023-11.parq",
    r"D:\parquet\BTCUSDT2023-12.parq"
]

# Leer cada archivo Parquet en un DataFrame de Dask
dfs = [dd.read_parquet(file) for file in parquet_files]

# Combinar todos los DataFrames en uno solo
ddf = dd.concat(dfs, axis=0)

# Ordenar por id
ddf = ddf.sort_values(by = 'id')

# Verificar la estructura del DataFrame
print(ddf.head())

                                    id    price    qty      value
dates                                                            
2023-01-01 00:00:04.118000  3166744581  16537.5  0.004    66.1500
2023-01-01 00:00:04.177999  3166744582  16537.6  0.116  1918.3616
2023-01-01 00:00:04.177999  3166744583  16537.6  0.034   562.2784
2023-01-01 00:00:04.197999  3166744584  16537.6  0.082  1356.0832
2023-01-01 00:00:04.209000  3166744585  16537.5  0.002    33.0750
CPU times: total: 1min 42s
Wall time: 6min 32s


In [4]:
# Número filas
num_rows = ddf.shape[0].compute()

# Número columnas
num_columns = ddf.shape[1]

computed_shape = (num_rows, num_columns)
print(computed_shape)

(1258263601, 4)


In [5]:
ddf.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 4 entries, id to value
dtypes: float64(3), int64(1)

### Las divisiones facilitan, y evitan errores, en operaciones posteriores

In [9]:
ddf.index = ddf.index.astype('datetime64[ns]')

# Calculamos divisiones
start_date = ddf.index.min().compute()
end_date = ddf.index.max().compute()
divisions = pd.date_range(start=start_date, end=end_date, periods=ddf.npartitions + 1).astype('datetime64[ns]')

In [10]:
divisions

DatetimeIndex([   '2023-01-01 00:00:04.118000',
               '2023-01-23 19:30:03.856062500',
                  '2023-02-15 15:00:03.594125',
               '2023-03-10 10:30:03.332187500',
                  '2023-04-02 06:00:03.070250',
               '2023-04-25 01:30:02.808312500',
                  '2023-05-17 21:00:02.546375',
               '2023-06-09 16:30:02.284437500',
                  '2023-07-02 12:00:02.022500',
               '2023-07-25 07:30:01.760562500',
                  '2023-08-17 03:00:01.498625',
               '2023-09-08 22:30:01.236687500',
                  '2023-10-01 18:00:00.974750',
               '2023-10-24 13:30:00.712812500',
                  '2023-11-16 09:00:00.450875',
               '2023-12-09 04:30:00.188937500',
                  '2023-12-31 23:59:59.927000'],
              dtype='datetime64[ns]', freq=None)

In [11]:
divisions = divisions.tolist()

In [12]:
divisions

[Timestamp('2023-01-01 00:00:04.118000'),
 Timestamp('2023-01-23 19:30:03.856062500'),
 Timestamp('2023-02-15 15:00:03.594125'),
 Timestamp('2023-03-10 10:30:03.332187500'),
 Timestamp('2023-04-02 06:00:03.070250'),
 Timestamp('2023-04-25 01:30:02.808312500'),
 Timestamp('2023-05-17 21:00:02.546375'),
 Timestamp('2023-06-09 16:30:02.284437500'),
 Timestamp('2023-07-02 12:00:02.022500'),
 Timestamp('2023-07-25 07:30:01.760562500'),
 Timestamp('2023-08-17 03:00:01.498625'),
 Timestamp('2023-09-08 22:30:01.236687500'),
 Timestamp('2023-10-01 18:00:00.974750'),
 Timestamp('2023-10-24 13:30:00.712812500'),
 Timestamp('2023-11-16 09:00:00.450875'),
 Timestamp('2023-12-09 04:30:00.188937500'),
 Timestamp('2023-12-31 23:59:59.927000')]

In [28]:
ddf.npartitions

12

In [13]:
# Índice ajustado
ddf = ddf.reset_index(drop=True).set_index(ddf.index, sorted=True, divisions=divisions)

print(ddf)
print(ddf.divisions)

Dask DataFrame Structure:
                                  id    price      qty    value
npartitions=16                                                 
2023-01-01 00:00:04.118000000  int64  float64  float64  float64
2023-01-23 19:30:03.856062500    ...      ...      ...      ...
...                              ...      ...      ...      ...
2023-12-09 04:30:00.188937500    ...      ...      ...      ...
2023-12-31 23:59:59.927000000    ...      ...      ...      ...
Dask Name: set_index, 20 graph layers
(Timestamp('2023-01-01 00:00:04.118000'), Timestamp('2023-01-23 19:30:03.856062500'), Timestamp('2023-02-15 15:00:03.594125'), Timestamp('2023-03-10 10:30:03.332187500'), Timestamp('2023-04-02 06:00:03.070250'), Timestamp('2023-04-25 01:30:02.808312500'), Timestamp('2023-05-17 21:00:02.546375'), Timestamp('2023-06-09 16:30:02.284437500'), Timestamp('2023-07-02 12:00:02.022500'), Timestamp('2023-07-25 07:30:01.760562500'), Timestamp('2023-08-17 03:00:01.498625'), Timestamp('2023-09-08

In [14]:
def is_sorted(df):
    return df.index.is_monotonic_increasing

sorted_check = ddf.map_partitions(is_sorted).compute()
print(sorted_check)

0     True
1     True
2     True
3     True
4     True
5     True
6     True
7     True
8     True
9     True
10    True
11    True
12    True
13    True
14    True
15    True
dtype: bool


In [15]:
ddf.npartitions

16

In [16]:
ddf.divisions

(Timestamp('2023-01-01 00:00:04.118000'),
 Timestamp('2023-01-23 19:30:03.856062500'),
 Timestamp('2023-02-15 15:00:03.594125'),
 Timestamp('2023-03-10 10:30:03.332187500'),
 Timestamp('2023-04-02 06:00:03.070250'),
 Timestamp('2023-04-25 01:30:02.808312500'),
 Timestamp('2023-05-17 21:00:02.546375'),
 Timestamp('2023-06-09 16:30:02.284437500'),
 Timestamp('2023-07-02 12:00:02.022500'),
 Timestamp('2023-07-25 07:30:01.760562500'),
 Timestamp('2023-08-17 03:00:01.498625'),
 Timestamp('2023-09-08 22:30:01.236687500'),
 Timestamp('2023-10-01 18:00:00.974750'),
 Timestamp('2023-10-24 13:30:00.712812500'),
 Timestamp('2023-11-16 09:00:00.450875'),
 Timestamp('2023-12-09 04:30:00.188937500'),
 Timestamp('2023-12-31 23:59:59.927000'))

Dask requiere que las particiones del DataFrame estén definidas explícitamente para poder realizar operaciones de series temporales como el resampling de manera eficiente. Cuando trabajas con Dask, es fundamental que el DataFrame tenga divisiones conocidas para que el resampling funcione correctamente.

In [17]:
ddf.known_divisions

True

## Barras de dolares y dias (resample) una vez tenemos divisiones

In [5]:
ddf['index'] = 1
ddf['index'] = ddf['index'].cumsum()-1

In [7]:
ddf = ddf.reset_index()

In [9]:
ddf.head()

Unnamed: 0,dates,id,price,qty,value,index
0,2023-01-01 00:00:04.118000,3166744581,16537.5,0.004,66.15,0
1,2023-01-01 00:00:04.177999,3166744582,16537.6,0.116,1918.3616,1
2,2023-01-01 00:00:04.177999,3166744583,16537.6,0.034,562.2784,2
3,2023-01-01 00:00:04.197999,3166744584,16537.6,0.082,1356.0832,3
4,2023-01-01 00:00:04.209000,3166744585,16537.5,0.002,33.075,4


In [10]:
ddf.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 6 entries, dates to index
dtypes: datetime64[ns](1), float64(3), int64(2)

In [13]:
%%time
# Parámetros
traded_dollar_v = 100000000

# Creamos la columna 'group' basada en el volumen de dolar acumulado (qty)
ddf['group'] = (ddf['value'].cumsum() / traded_dollar_v).astype(np.int64)

# Agrupamos y calculamos
ddf_dollar_bars = ddf.groupby('group').agg({
    'price': ['first', 'last', 'max', 'min'],
    'qty': 'sum',  
    'value': 'sum',
    'dates': 'min'  
})

# Columnas resultantes
ddf_dollar_bars.columns = ['open', 'close', 'high', 'low', 'volume', 'dollar_volume', 'timestamp']

# Computamos el resultado -> pandas df
df_dollar_bars_dd = ddf_dollar_bars.compute()

print(df_dollar_bars_dd.head())

          open    close     high      low    volume  dollar_volume  \
group                                                                
0      16537.5  16538.5  16540.9  16504.0  6051.460   9.999960e+07   
1      16538.5  16524.0  16557.1  16519.5  6045.730   9.999421e+07   
2      16524.0  16510.1  16532.3  16508.8  6051.973   9.997919e+07   
3      16510.0  16521.8  16545.7  16508.0  6052.334   1.000267e+08   
4      16521.8  16502.6  16530.3  16488.0  6057.291   9.999053e+07   

                       timestamp  
group                             
0     2023-01-01 00:00:04.118000  
1     2023-01-01 01:10:16.415000  
2     2023-01-01 03:05:26.072000  
3     2023-01-01 05:10:45.694999  
4     2023-01-01 07:06:36.842999  
CPU times: total: 6min 28s
Wall time: 7min 36s


In [15]:
df_dollar_bars_dd.info()

<class 'pandas.core.frame.DataFrame'>
Index: 41958 entries, 0 to 41957
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   open           41958 non-null  float64       
 1   close          41958 non-null  float64       
 2   high           41958 non-null  float64       
 3   low            41958 non-null  float64       
 4   volume         41958 non-null  float64       
 5   dollar_volume  41958 non-null  float64       
 6   timestamp      41958 non-null  datetime64[ns]
dtypes: datetime64[ns](1), float64(6)
memory usage: 2.6 MB


In [19]:
df_dollar_bars_dd.iloc[-1]

open                                42260.4
close                               42314.0
high                                42315.6
low                                 42222.2
volume                             1829.044
dollar_volume                 77321647.5879
timestamp        2023-12-31 23:34:56.260000
Name: 41957, dtype: object

In [24]:
df_dollar_bars_dd.to_parquet(r'D:\\bars\dol_vol_bars.parquet',index=False)

## Dollar bars desde aquí

In [5]:
dollar_bars = pd.read_parquet(r"D:\bars\dollar_bars.parquet")

In [7]:
dollar_bars = dollar_bars.set_index('timestamp')

dollar_bars.head()

Unnamed: 0_level_0,open,close,high,low,volume,dollar_volume
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2023-01-01 00:00:04.118000,16537.5,16538.5,16540.9,16504.0,6051.46,99999600.0
2023-01-01 01:10:16.415000,16538.5,16524.0,16557.1,16519.5,6045.73,99994210.0
2023-01-01 03:05:26.072000,16524.0,16510.1,16532.3,16508.8,6051.973,99979190.0
2023-01-01 05:10:45.694999,16510.0,16521.8,16545.7,16508.0,6052.334,100026700.0
2023-01-01 07:06:36.842999,16521.8,16502.6,16530.3,16488.0,6057.291,99990530.0
