In [1]:
# !pip install pandas pyarrow --upgrade >/dev/null 2>&1
# !pip install chdb --upgrade >/dev/null 2>&1
# !pip show chdb

In [2]:
import chdb
from datetime import datetime, timedelta
from chdb.session import Session
import chdb.dataframe as cdf
from pathlib import Path
import time
from zoneinfo import ZoneInfo
import pandas as pd
pd.options.mode.copy_on_write = True
df = None

## runProcessRawPipeline
- dropOptimisedTable() - Drops the Table from the ChDb Instance
- createOptimisedTable() - Create the Table to hold the sample data optimised for Pandas datatypes
- insertSamplesIntoOptimisedTable() - Inserts teh Raw Samples into the new Table
- exportToParquet() - Writes out the Optimised Table as a local Parquet File (Could be generalised?)
- readbackParquetFile() - Test function to see if the paquet file can be read back succesfully
- readbackSmallSample() -  Test function to see if the paquet file can be read back succesfully
- importFromPaquetintoDataFrame() - Reads the Parquet File into a dataframe and sets the correct types, timezones and index
- writeOptimisedDataFrameAsParquet(dftemp) - used native DataFrame method to write out final data frame to preserve the df settings.

## runFullAggregationPipeline
- loadOptimisedSampleData() - REads in the Paruet File Writen at the end of the runProcessRawPipeline()
- aggregate_samples_to_interval() - SQL Windowing function to aggregate the samples in the input df by the specified timeframe and get the sample timestamp when the High and Low were formed - reutrns a Clickhouse DB df result
- storeIntermediateData() - Stores the current batch into an intermediate Parquet file for insertion into the final results set Table
- insertOHLCs() - Writes the intermediate Parquet results file into a chDB Table 
- outputFinalDataSet() - Reads the whole chDB Results table for the instrument and timeframe and outputs the final Parquet file for further analysis

In [3]:
# %load ../process_OHLC.py
import chdb
from datetime import datetime, timedelta
from chdb.session import Session
import chdb.dataframe as cdf
from pathlib import Path
import time
from zoneinfo import ZoneInfo
import pandas as pd
pd.options.mode.copy_on_write = True
df = None

#Enrtry Points:

# runProcessRawPipeline()
    # dropOptimisedTable() - Drops the Table from the ChDb Instance
    # createOptimisedTable() - Create the Table to hold the sample data optimised for Pandas datatypes
    # insertSamplesIntoOptimisedTable() - Inserts teh Raw Samples into the new Table
    # exportToParquet() - Writes out the Optimised Table as a local Parquet File (Could be generalised?)
    # readbackParquetFile() - Test function to see if the paquet file can be read back succesfully
    # readbackSmallSample() - Test function to see if the paquet file can be read back succesfully
    # importFromPaquetintoDataFrame() - Reads the Parquet File into a dataframe and sets the correct types, timezones and index
    # writeOptimisedDataFrameAsParquet(dftemp) - used native DataFrame method to write out final data frame to preserve the df settings.

# runFullAggregationPipeline()
    # loadOptimisedSampleData() - REads in the Paruet File Writen at the end of the runProcessRawPipeline()
    # aggregate_samples_to_interval() - SQL Windowing function to aggregate the samples in the input df by the specified timeframe and get the sample timestamp when the High and Low were formed - reutrns a Clickhouse DB df result
    # storeIntermediateData() - Stores the current batch into an intermediate Parquet file for insertion into the final results set Table
    # insertOHLCs() - Writes the intermediate Parquet results file into a chDB Table
    # outputFinalDataSet() - Reads the whole chDB Results table for the instrument and timeframe and outputs the final Parquet file for further analysis

# GenerateDailyOHLC()
    # creates Daily Alinged OHLC Bars

def dropOptimisedTable():
    db = Session(path="/tmp/oanda_data")

    res = db.query("select count(*) from quant.ohlc_opt")
    print(res)
    db.query("drop table quant.ohlc_opt")
    
def createOptimisedTable():
    db = Session(path="/tmp/oanda_data")

    db.query("create database IF NOT EXISTS quant")
    db.query("""
                create table IF NOT EXISTS quant.ohlc_opt (
                instrument LowCardinality(String),
                tf LowCardinality(String),
                open Float64,
                high Float64,
                low Float64,
                close Float64,
                timestamp DateTime64,
                ) engine MergeTree
                PRIMARY KEY (instrument, tf, timestamp);
                """)
    res = db.query("select count(*) from quant.ohlc_opt")
    print(res)
    
def insertSamplesIntoOptimisedTable():
    db = Session(path="/tmp/oanda_data")

    sql = """INSERT INTO quant.ohlc_opt (instrument, tf, open, high, low, close, timestamp) select instrument, tf,
            max(open) as open, max(high) as high, max(low) as low, max(close) as close, timestamp 
            from quant.ohlc group by instrument, tf, timestamp Order by timestamp desc"""
    res = db.query(sql)
    res = db.query("select count(*) from quant.ohlc_opt")
    print(res)
    
def exportToParquet():
    start = time.time()

    print("Starting chDB...")
    # create a persistent session
    db = Session(path="/tmp/oanda_data") #/tmp/quantDB")

    res = db.query("""select instrument, tf, max(open) as open, max(high) as high, max(low) as low, max(close) as close,
                   timestamp from quant.ohlc_opt group by instrument, tf, timestamp Order by timestamp desc""", "Parquet")

    print(
        f"{res.rows_read()} rows | "
        f"{res.bytes_read()} bytes | "
        f"{res.elapsed()} seconds"
    )

    # export to Parquet
    path = Path("./oanda_data_opt.parquet")
    path.write_bytes(res.bytes())

    end = time.time()
    print(end - start)
    
def readbackParquetFile():
    # Readback the file and see if it has the same number of rows
    db = Session()
    start = time.time()
    res = db.query("select * from './oanda_data_opt.parquet' order by timestamp desc", "Parquet")

    end = time.time()
    print(end - start)
    print(
        f"{res.rows_read()} rows | "
        f"{res.bytes_read()} bytes | "
        f"{res.elapsed()} seconds"
    )
    
def readbackSmallSample():
    # import from Parquet
    db = Session()

    query_sql = "select * from './oanda_data_opt.parquet' order by timestamp desc limit 5"
    res = chdb.query(query_sql, "PrettyCompactNoEscapes")
    print(res, end="")
    
def importFromPaquetintoDataFrame():
    # import from Parquet and convert to pandas/numpy data types
    db = Session()
    start = time.time()

    datafile = './oanda_data_opt.parquet'
    data = f"file('{datafile}', Parquet)"

    sql = f"""SELECT * FROM {data} order by timestamp desc"""

    dfETL = chdb.query(sql, 'Dataframe')

    # Sort out Data Types to work with Pandas / Numpy
    dfETL['instrument'] = dfETL['instrument'].astype(pd.StringDtype())
    dfETL['tf'] = dfETL['tf'].astype(pd.StringDtype())
    dfETL['timestamp'] = dfETL['timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    dfETL = dfETL.set_index(['timestamp'])

    end = time.time()
    print(end - start)

    print(dfETL)
    dfETL.describe(include='all')
    dfETL.info()
    return dfETL
    
def writeOptimisedDataFrameAsParquet(dfETL):    
    #Write out optimised dataframe as parquet
    start = time.time()
    dfFile = './oanda_data_pandas.parquet'
    dfETL.to_parquet(path=dfFile, index=True)
    end = time.time()
    print(end - start)
    dfETL = None
    
def runProcessRawPipeline():
#     dropOptimisedTable()
    createOptimisedTable()
    insertSamplesIntoOptimisedTable()
    exportToParquet()
#     readbackParquetFile()
#     readbackSmallSample()
    dftemp = importFromPaquetintoDataFrame()
    writeOptimisedDataFrameAsParquet(dftemp)

def loadOptimisedSampleData(parquetFile):
    #Load optimised dataframe from parquet file
    start = time.time()
    dfFile = parquetFile #'./oanda_data_pandas.parquet'
    df = pd.read_parquet(path=dfFile)
    end = time.time()
    print("Loaded Optimised Sample Data:", end - start)
    return df

def insertOHLCs(instrument, tf, parquetFile):

    data = f"file('{parquetFile}', Parquet)"
    db = Session(path="/tmp/aggregated_oanda")
    
    db.query("create database IF NOT EXISTS quant")
    
    db.query(f"""
    create table IF NOT EXISTS quant.ohlc_agg (
    instrument LowCardinality(String),
    tf LowCardinality(String),
    open Float64,
    high Float64,
    low Float64,
    close Float64,
    interval DateTime64,
    high_timestamp DateTime64,
    low_timestamp DateTime64
    ) engine MergeTree
    PRIMARY KEY (instrument, tf, interval);
    """)

    query = f"""
    INSERT INTO quant.ohlc_agg (instrument, tf, open, high, low, close, high_timestamp, low_timestamp, interval) 
    select '{instrument}' as instrument, '{tf}' as tf, max(open) as open, max(high) as high, max(low) as low, max(close) as close, 
    max(high_timestamp) as high_timestamp, max(low_timestamp) as low_timestamp,
    interval from {data} group by instrument, tf, interval Order by interval
    """
    
    res = db.query(query)
#     res = cdf.query(sql=query, df=intermediate_df)

    res = db.query(f"select count(*) from quant.ohlc_agg where instrument='{instrument}' and tf='{tf}'")
    print(datetime.today().isoformat(" ","seconds"), ":","chDB", "has", res, "rows" )
    
def storeIntermediateData(instrument, tf, bars):
    # export to Parquet
    start = time.time()
    dfFile = f"./oanda_data_intermediate_{instrument}_{tf}.parquet"
    bars.to_parquet(path=dfFile, index=True)
    end = time.time()
    print(end - start)
    return dfFile

def outputFinalDataSet(instrument, tf):
    db = Session(path="/tmp/aggregated_oanda")
    
    query = f"select * from quant.ohlc_agg where instrument='{instrument}' and tf='{tf}' order by interval"
    dfres = db.query(query, 'dataframe')
    
    dfres['instrument'] = dfres['instrument'].astype(pd.StringDtype())
    dfres['tf'] = dfres['tf'].astype(pd.StringDtype())
    dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    
    dfres['high_timestamp']  = dfres['high_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    dfres['low_timestamp'] = dfres['low_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    dfres = dfres.set_index(['interval'])


    start = time.time()
    dfFile = f'./oanda_{instrument}_{tf}.parquet'
    dfres.to_parquet(path=dfFile, index=True)
    end = time.time()
    print(end - start)
    dfres = None

def dropFinalResultschDB():
    db = Session(path="/tmp/aggregated_oanda")
    db.query("drop table quant.ohlc_agg")
    
    
def aggregate_samples_to_interval(sample_df, interval_min, start, end, tzOffset):
    
    query_sql = f"""
    select sq.interval, sq.open, sq.high, sq.low, sq.close, sq.high_timestamp, sq.low_timestamp, count(*) as samples 
    from
        (select 
        --toStartOfInterval(addHours(timestamp,2), INTERVAL {interval_min} MINUTE, '{tzOffset}') as interval,
        toStartOfInterval(s5.timestamp, INTERVAL {interval_min} MINUTE, '{tzOffset}') as interval,
        FIRST_VALUE(s5.open) OVER (PARTITION BY interval ORDER BY s5.timestamp asc) open,
        FIRST_VALUE(s5.high) OVER (PARTITION BY interval ORDER BY s5.high desc) high,
        FIRST_VALUE(s5.low)  OVER (PARTITION BY interval ORDER BY s5.low asc) low,
        FIRST_VALUE(s5.close) OVER (PARTITION BY interval ORDER BY s5.timestamp desc) close,
        FIRST_VALUE(s5.timestamp) OVER (PARTITION BY interval ORDER BY s5.high desc, s5.timestamp) high_timestamp,
        FIRST_VALUE(s5.timestamp) OVER (PARTITION BY interval ORDER BY s5.low asc, s5.timestamp) low_timestamp
        from __s5__ as s5
        where s5.timestamp >= toDateTime('{start}', '{tzOffset}') and s5.timestamp < toDateTime('{end}', '{tzOffset}')
        --group by interval, open, high, low, close, high_timestamp, low_timestamp
        order by s5.timestamp
        ) as sq
    group by sq.interval, sq.open, sq.high , sq.low, sq.close, sq.high_timestamp, sq.low_timestamp
    order by sq.interval desc
    ;
    """
    res = cdf.query(sql=query_sql, s5=sample_df)
    dfres = res.to_pandas()
    dfres['interval'] = dfres['interval'] * 1e9
    dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo(tzOffset)))
    dfres['high_timestamp'] = dfres['high_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo(tzOffset)))
    dfres['low_timestamp'] = dfres['low_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo(tzOffset)))
#     dfres = dfres.set_index('interval', drop=False)
    return dfres
    
def runFullAggregationPipeline():
    #Process samples to aggregate
    day_increment = 1
    interval_min = 1440
    start = datetime(2017, 1, 19)
    end = datetime(2024, 1, 9)
    increment_end =  start + timedelta(days=day_increment)
    instrument = 'EUR_USD'
    tf = 'D1'
    parquetSampleFile ='./oanda_data_pandas.parquet'

    df = loadOptimisedSampleData(parquetSampleFile)

    while increment_end < end:
        agg_df = aggregate_samples_to_interval(df, interval_min, start, increment_end, tzOffset)
        start =  start + timedelta(days=day_increment)
        increment_end =  start + timedelta(days=day_increment)
        parquetFile = storeIntermediateData(instrument, tf, agg_df)
        insertOHLCs(instrument, tf, parquetFile)

    outputFinalDataSet(instrument, tf)


#### Experimental
    
def e_runFullAggregationPipeline(df, day_increment, interval_min, start, end, instrument, tf):
    #Process samples to aggregate
    tzOffset = 'EST'
    increment_end =  start + timedelta(days=day_increment)

    while increment_end < end:
        agg_df = aggregate_samples_to_interval(df, interval_min, start, increment_end, tzOffset)
        
        if(interval_min == 1440):
#             agg_df.index = agg_df.index.date
#             agg_df.index = agg_df.index + timedelta(days=1)
            dfres['interval'] = dfres['interval'].date()
            dfres['interval'] = dfres['interval'] + timedelta(days=1)
            agg_df['high_timestamp'] = agg_df['high_timestamp'] - timedelta(hours=2)
            agg_df['low_timestamp'] = agg_df['low_timestamp'] - timedelta(hours=2)
        
        start =  start + timedelta(days=day_increment)
        increment_end =  start + timedelta(days=day_increment)
#         agg_df = agg_df.reset_index()
        parquetFile = storeIntermediateData(instrument, tf, agg_df)
        insertOHLCs(instrument, tf, parquetFile)

    outputFinalDataSet(instrument, tf)

def GenerateDailyOHLC():
    day_increment = 14
    interval_min = 1440
    start = datetime(2017, 1, 19)
    end = datetime(2024, 1, 12)
    instrument = 'EUR_USD'
    tf = 'D1'
    
    parquetSampleFile ='./oanda_data_pandas.parquet' 
    dfSample = loadOptimisedSampleData(parquetSampleFile)
    dfInput = None
    dfInput = dfSample.copy()

    #Daily Specific:
    dfInput.index = dfInput.index + timedelta(hours=2)

    e_runFullAggregationPipeline(dfInput, day_increment, interval_min, start, end, instrument, tf)
    print("Finished Daily OHLC")

In [4]:
# runProcessRawPipeline()
# runFullAggregationPipeline()

## Experimental

In [None]:
# dropFinalResultschDB()
# createOptimisedTable()

GenerateDailyOHLC()

Loaded Optimised Sample Data: 5.704396486282349
0.0960233211517334
2024-01-17 17:42:58 : chDB has 11
 rows
0.011168956756591797
2024-01-17 17:43:07 : chDB has 22
 rows
0.010779619216918945
2024-01-17 17:43:16 : chDB has 33
 rows
0.010656356811523438
2024-01-17 17:43:24 : chDB has 44
 rows
0.010475873947143555
2024-01-17 17:43:32 : chDB has 55
 rows
0.01253199577331543
2024-01-17 17:43:39 : chDB has 66
 rows
0.009798765182495117
2024-01-17 17:43:50 : chDB has 77
 rows
0.01617121696472168
2024-01-17 17:44:01 : chDB has 88
 rows
0.014719009399414062
2024-01-17 17:44:10 : chDB has 99
 rows
0.011312007904052734
2024-01-17 17:44:22 : chDB has 110
 rows
0.01059579849243164
2024-01-17 17:44:33 : chDB has 121
 rows
0.00990605354309082
2024-01-17 17:44:41 : chDB has 132
 rows
0.011230707168579102
2024-01-17 17:44:51 : chDB has 143
 rows
0.013871908187866211
2024-01-17 17:45:00 : chDB has 154
 rows
0.009880781173706055
2024-01-17 17:45:09 : chDB has 165
 rows
0.010694742202758789
2024-01-17 17:45

In [None]:
day_increment = 14
interval_min = 1440
tzOffset = 'EST'
start = datetime(2023, 12, 10) #, tzinfo=ZoneInfo(tzOffset))
print("Start Date:", start)
# start = start - timedelta(hours=2)
# end = datetime(2023, 12, 14)
# end = end - timedelta(hours=2)

increment_end =  start + timedelta(hours=day_increment*24)
instrument = 'EUR_USD'
tf = 'D1'

parquetSampleFile ='./oanda_data_pandas.parquet' 
dfSample = loadOptimisedSampleData(parquetSampleFile)

dfInput = None
dfInput = dfSample.copy()

#Daily Specific:
dfInput.index = dfInput.index + timedelta(hours=2)
dfInput.info()

agg_df = aggregate_samples_to_interval(dfInput, interval_min, start, increment_end, tzOffset)

agg_df.index = agg_df.index.date
agg_df.index = agg_df.index + timedelta(days=1)
agg_df['high_timestamp'] = agg_df['high_timestamp'] - timedelta(hours=2)
agg_df['low_timestamp'] = agg_df['low_timestamp'] - timedelta(hours=2)
agg_df.info()
agg_df.tail(40)

# 2023-12-11: 1.07610, 1.07790, 1.07416, 1.07644
# 2023-12-12: 1.07644, 1.08282, 1.07603, 1.07946
# 2023-12-13: 1.07946, 1.08966, 1.07732, 1.08751

# delta = 0 (time hours =  16, 19)
# 2023-12-14 19:00:00-05:00	1.09936	1.10037	1.08886	1.08948	2023-12-14 23:43:35-05:00	2023-12-15 13:55:35-05:00	14546
# 2023-12-13 19:00:00-05:00	1.08828	1.10094	1.08792	1.09934	2023-12-14 12:58:20-05:00	2023-12-13 19:13:20-05:00	15671
# 2023-12-12 19:00:00-05:00	1.07984	1.08966	1.07732	1.08827	2023-12-13 15:09:50-05:00	2023-12-13 05:48:30-05:00	14587
# 2023-12-11 19:00:00-05:00	1.07654	1.08282	1.07613	1.07984	2023-12-12 08:30:15-05:00	2023-12-11 22:00:50-05:00	14147
# 2023-12-10 19:00:00-05:00	1.07654	1.07790	1.07416	1.07654	2023-12-11 05:18:55-05:00	2023-12-11 11:32:05-05:00	13903
# 2023-12-09 19:00:00-05:00	1.07588	1.07678	1.07584	1.07654	2023-12-10 18:07:15-05:00	2023-12-10 17:03:00-05:00	619

# 2023-12-15 07:00:00-05:00	1.09686	1.09695	1.08886	1.08948	2023-12-15 07:02:50-05:00	2023-12-15 13:55:35-05:00	6692
# 2023-12-14 19:00:00-05:00	1.09936	1.10037	1.09462	1.09684	2023-12-14 23:43:35-05:00	2023-12-15 03:30:55-05:00	7854
# 2023-12-14 07:00:00-05:00	1.09296	1.10094	1.09068	1.09934	2023-12-14 12:58:20-05:00	2023-12-14 08:15:15-05:00	7521
# 2023-12-13 19:00:00-05:00	1.08828	1.09320	1.08792	1.09297	2023-12-14 06:52:15-05:00	2023-12-13 19:13:20-05:00	8150
# 2023-12-13 07:00:00-05:00	1.07840	1.08966	1.07800	1.08827	2023-12-13 15:09:50-05:00	2023-12-13 13:07:15-05:00	7545
# 2023-12-12 19:00:00-05:00	1.07984	1.07999	1.07732	1.07838	2023-12-12 19:07:25-05:00	2023-12-13 05:48:30-05:00	7042
# 2023-12-12 07:00:00-05:00	1.07965	1.08282	1.07672	1.07984	2023-12-12 08:30:15-05:00	2023-12-12 09:35:15-05:00	7145
# 2023-12-11 19:00:00-05:00	1.07654	1.08074	1.07613	1.07966	2023-12-12 05:34:30-05:00	2023-12-11 22:00:50-05:00	7002
# 2023-12-11 07:00:00-05:00	1.07684	1.07706	1.07416	1.07654	2023-12-11 07:07:20-05:00	2023-12-11 11:32:05-05:00	6738
# 2023-12-10 19:00:00-05:00	1.07654	1.07790	1.07514	1.07684	2023-12-11 05:18:55-05:00	2023-12-11 03:03:15-05:00	7165
# 2023-12-10 07:00:00-05:00	1.07588	1.07678	1.07584	1.07654	2023-12-10 18:07:15-05:00	2023-12-10 17:03:00-05:00	619

# delta = 1 time =16
# 2023-12-14 19:00:00-05:00	1.09920	1.10037	1.08886	1.08948	2023-12-15 00:43:35-05:00	2023-12-15 14:55:35-05:00	15017
# 2023-12-13 19:00:00-05:00	1.08786	1.10094	1.08759	1.09920	2023-12-14 13:58:20-05:00	2023-12-13 19:03:30-05:00	15691
# 2023-12-12 19:00:00-05:00	1.07974	1.08966	1.07732	1.08786	2023-12-13 16:09:50-05:00	2023-12-13 06:48:30-05:00	14408
# 2023-12-11 19:00:00-05:00	1.07650	1.08282	1.07613	1.07968	2023-12-12 09:30:15-05:00	2023-12-11 23:00:50-05:00	14035
# 2023-12-10 19:00:00-05:00	1.07616	1.07790	1.07416	1.07650	2023-12-11 06:18:55-05:00	2023-12-11 12:32:05-05:00	14120
# 2023-12-09 19:00:00-05:00	1.07588	1.07619	1.07584	1.07617	2023-12-10 18:58:10-05:00	2023-12-10 18:03:00-05:00	202

# 2023-12-15 07:00:00-05:00	1.09645	1.09696	1.08886	1.08948	2023-12-15 07:59:30-05:00	2023-12-15 14:55:35-05:00	7373
# 2023-12-14 19:00:00-05:00	1.09920	1.10037	1.09462	1.09644	2023-12-15 00:43:35-05:00	2023-12-15 04:30:55-05:00	7644
# 2023-12-14 07:00:00-05:00	1.09134	1.10094	1.09068	1.09920	2023-12-14 13:58:20-05:00	2023-12-14 09:15:15-05:00	7747
# 2023-12-13 19:00:00-05:00	1.08786	1.09154	1.08759	1.09134	2023-12-13 23:43:10-05:00	2023-12-13 19:03:30-05:00	7944
# 2023-12-13 07:00:00-05:00	1.07777	1.08966	1.07740	1.08786	2023-12-13 16:09:50-05:00	2023-12-13 07:04:50-05:00	7697
# 2023-12-12 19:00:00-05:00	1.07974	1.07999	1.07732	1.07778	2023-12-12 20:07:25-05:00	2023-12-13 06:48:30-05:00	6711
# 2023-12-12 07:00:00-05:00	1.08007	1.08282	1.07672	1.07968	2023-12-12 09:30:15-05:00	2023-12-12 10:35:15-05:00	7448
# 2023-12-11 19:00:00-05:00	1.07650	1.08074	1.07613	1.08011	2023-12-12 06:34:30-05:00	2023-12-11 23:00:50-05:00	6587
# 2023-12-11 07:00:00-05:00	1.07733	1.07782	1.07416	1.07650	2023-12-11 07:33:35-05:00	2023-12-11 12:32:05-05:00	7201
# 2023-12-10 19:00:00-05:00	1.07616	1.07790	1.07514	1.07734	2023-12-11 06:18:55-05:00	2023-12-11 04:03:15-05:00	6919
# 2023-12-10 07:00:00-05:00	1.07588	1.07619	1.07584	1.07617	2023-12-10 18:58:10-05:00	2023-12-10 18:03:00-05:00	202

# delta = 2
# 2023-12-14 19:00:00-05:00	1.09957	1.10037	1.08886	1.08948	2023-12-15 01:43:35-05:00	2023-12-15 15:55:35-05:00	15306
# 2023-12-13 19:00:00-05:00	1.08772	1.10094	1.08736	1.09918	2023-12-14 14:58:20-05:00	2023-12-13 19:32:15-05:00	15781
# 2023-12-12 19:00:00-05:00	1.07934	1.08966	1.07732	1.08751	2023-12-13 17:09:50-05:00	2023-12-13 07:48:30-05:00	14404
# 2023-12-11 19:00:00-05:00	1.07624	1.08282	1.07603	1.07946	2023-12-12 10:30:15-05:00	2023-12-11 19:10:05-05:00	13922
# 2023-12-10 19:00:00-05:00	1.07588	1.07790	1.07416	1.07644	2023-12-11 07:18:55-05:00	2023-12-11 13:32:05-05:00	14060

# 2023-12-15 07:00:00-05:00	1.09602	1.09696	1.08886	1.08948	2023-12-15 08:59:30-05:00	2023-12-15 15:55:35-05:00	8046
# 2023-12-14 19:00:00-05:00	1.09957	1.10037	1.09462	1.09604	2023-12-15 01:43:35-05:00	2023-12-15 05:30:55-05:00	7260
# 2023-12-14 07:00:00-05:00	1.09039	1.10094	1.08983	1.09918	2023-12-14 14:58:20-05:00	2023-12-14 07:02:00-05:00	8155
# 2023-12-13 19:00:00-05:00	1.08772	1.09154	1.08736	1.09041	2023-12-14 00:43:10-05:00	2023-12-13 19:32:15-05:00	7626
# 2023-12-13 07:00:00-05:00	1.07876	1.08966	1.07732	1.08751	2023-12-13 17:09:50-05:00	2023-12-13 07:48:30-05:00	7991
# 2023-12-12 19:00:00-05:00	1.07934	1.07999	1.07802	1.07875	2023-12-12 21:07:25-05:00	2023-12-13 04:00:20-05:00	6413
# 2023-12-12 07:00:00-05:00	1.07934	1.08282	1.07672	1.07946	2023-12-12 10:30:15-05:00	2023-12-12 11:35:15-05:00	7751
# 2023-12-11 19:00:00-05:00	1.07624	1.07977	1.07603	1.07933	2023-12-12 06:56:05-05:00	2023-12-11 19:10:05-05:00	6171
# 2023-12-11 07:00:00-05:00	1.07670	1.07790	1.07416	1.07644	2023-12-11 07:18:55-05:00	2023-12-11 13:32:05-05:00	7610
# 2023-12-10 19:00:00-05:00	1.07588	1.07740	1.07514	1.07672	2023-12-10 22:38:15-05:00	2023-12-11 05:03:15-05:00	6450

In [None]:
# dfsample = df.iloc[1:100]

# query_sql = """
# select
#   *
# from __s5__ as s5
# order by timestamp desc limit 5;
# """

# res = cdf.query(sql=query_sql, s5=dfsample)
# print(res, end="")

In [None]:
# dfsample = df.iloc[1:100]

# query_sql = """
# select top 100
# s5.timestamp,
# s5.open,
# s5.high,
# s5.low,
# s5.close,
# RANK() OVER (ORDER BY s5.high) high_rank
# from __s5__ as s5
# order by high_rank desc, timestamp;
# """

# res = cdf.query(sql=query_sql, s5=dfsample)
# dfres = res.to_pandas()
# dfres.head(30)


In [None]:
# import datetime
# interval_min = 60
# start = datetime.datetime(2024, 1, 2)#, tzinfo=ZoneInfo('EST'))
# end =  datetime.datetime(2024, 1, 3) #, tzinfo=ZoneInfo('EST'))

# query_sql = f"""
#     select sq.interval, sq.open, sq.high, sq.low, sq.close, sq.high_timestamp, sq.low_timestamp, count(*) as samples 
#     from
#         (select 
#         toStartOfInterval(timestamp, INTERVAL {interval_min} MINUTE) as interval,
#         FIRST_VALUE(s5.open) OVER (PARTITION BY interval ORDER BY s5.timestamp asc) open,
#         FIRST_VALUE(s5.high) OVER (PARTITION BY interval ORDER BY s5.high desc) high,
#         FIRST_VALUE(s5.low)  OVER (PARTITION BY interval ORDER BY s5.low asc) low,
#         FIRST_VALUE(s5.close) OVER (PARTITION BY interval ORDER BY s5.timestamp desc) close,
#         FIRST_VALUE(timestamp) OVER (PARTITION BY interval ORDER BY s5.high desc, timestamp) high_timestamp,
#         FIRST_VALUE(timestamp) OVER (PARTITION BY interval ORDER BY s5.low asc, timestamp) low_timestamp
#         from __s5__ as s5
#         where timestamp > toDateTime('{start}', 'EST') and timestamp < toDateTime('{end}', 'EST')
#         ) as sq
#     group by sq.interval, sq.open, sq.high , sq.low, sq.close, sq.high_timestamp, sq.low_timestamp
#     order by sq.interval desc
#     ;
#     """
# sample_df = df.iloc[1:100000]
# res = cdf.query(sql=query_sql, s5=sample_df)
# dfres = res.to_pandas()
# dfres['interval'] = dfres['interval'] * 1e9
# dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# dfres['high_timestamp'] = dfres['high_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# dfres['low_timestamp'] = dfres['low_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# dfres.head(30)

In [None]:
# db = Session()
# query = 'SELECT * FROM system.time_zones LIMIT 100'
# dtz = db.query(query, 'dataframe')
# print(dtz)

In [None]:
# db = Session(path="/tmp/aggregated_oanda")
# query = f"select * from quant.ohlc_agg where instrument='{instrument}' and tf='{tf}'"
# dfres = db.query(query, 'dataframe')
# dfres['instrument'] = dfres['instrument'].astype(pd.StringDtype())
# dfres['tf'] = dfres['tf'].astype(pd.StringDtype())
# # dfres['interval'] = dfres['interval'] * 1e9
# dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# # dfres['high_timestamp'] = dfres['high_timestamp'] * 1e9
# dfres['high_timestamp']  = dfres['high_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# # dfres['low_timestamp'] = dfres['low_timestamp'] * 1e9
# dfres['low_timestamp'] = dfres['low_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# dfres = dfres.set_index(['interval'])
# # dfres.info()
# # dfres.head(50)

# instrument = 'EUR_USD'
# tf = 'H1'
# start = time.time()
# dfFile = f'./oanda_{instrument}_{tf}.parquet'
# dfres.to_parquet(path=dfFile, index=True)
# end = time.time()
# print(end - start)
# dfres = None

In [None]:
# parquetFile = './oanda_EUR_USD_D1.parquet'
# df = loadOptimisedSampleData(parquetFile)
# df.info()
# df.tail(40)

In [None]:
date1 = datetime(2024, 1, 15, 19,1)
tzOffset = 'EST'
dtstart = datetime(2024, 1, 15) #, tzinfo=ZoneInfo(tzOffset))

print(dtstart)

shifted = start + timedelta(hours=1)
print(shifted)
x=1
num_of_samples = 288

date_list = [dtstart + timedelta(minutes=x*15) for x in range(num_of_samples)]
value_list = [0 + x for x in range(num_of_samples)]

d = {'timestamp':date_list, 'open':value_list}
dfDates = pd.DataFrame(data=d)
dfDates.head(20)

start = datetime(2024, 1, 2)#, tzinfo=ZoneInfo('EST'))
end =  datetime(2024, 1, 3) #, tzinfo=ZoneInfo('EST'))
interval_min = 1440

dfDates['timestamp'] = dfDates['timestamp'] + timedelta(hours=2)

query_sql = f"""
    select sq.interval, sq.open, count(*) as samples 
    from
        (select 
        --toStartOfInterval(addHours(timestamp,2), INTERVAL {interval_min} MINUTE, '{tzOffset}') as interval,
        toStartOfInterval(timestamp, INTERVAL {interval_min} MINUTE, '{tzOffset}') as interval,
        FIRST_VALUE(s5.open) OVER (PARTITION BY interval ORDER BY s5.timestamp asc) open
        from __s5__ as s5
        --where timestamp > '{start}' and timestamp < '{end}'
        --where s5.timestamp > toDateTime('{start}', '{tzOffset}') and s5.timestamp < toDateTime('{end}', '{tzOffset}')
        order by s5.timestamp
        ) as sq
    group by sq.interval, sq.open
    order by sq.interval desc
    ;
    """
res = cdf.query(sql=query_sql, s5=dfDates)
dfres = res.to_pandas()
dfres['interval'] = dfres['interval'] * 1e9
dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo(tzOffset)))
dfres = dfres.set_index('interval')

dfres.head(20)

In [None]:
# db = Session(path="/tmp/oanda_data")

# # res = db.query("select * from quant.ohlc order by timestamp desc limit 5")

# # print(res)

# #23342449
# res = db.query("select count(*) from quant.ohlc --group by instrument, tf, timestamp ")

# print(res)

In [7]:
instrument = 'EUR_USD'
tf = 'D1'
datafile = f"./oanda_data_intermediate_{instrument}_{tf}.parquet"
data = f"file('{datafile}', Parquet)"
# data = f"file('./oanda_data_intermediate_EUR_USD_H1.parquet', Parquet)"
db = Session()

query = f"""
select '{instrument}' as instrument, '{tf}' as tf, max(open) as open, max(high) as high, max(low) as low, max(close) as close, 
max(high_timestamp) as high_timestamp, max(low_timestamp) as low_timestamp,
interval from {data} group by instrument, tf, interval Order by interval
"""

query = f"""
select * from {data}
"""

res = db.query(query, "PrettyCompactNoEscapes")
print(res)
# print(datetime.today().isoformat(" ","seconds"), ":","chDB", "has", res, "rows" )

dfi = loadOptimisedSampleData(datafile)
dfi.head(10)

┌────open─┬────high─┬─────low─┬───close─┬────────────────high_timestamp─┬─────────────────low_timestamp─┬─samples─┬─__index_level_0__─┐
│ 1.07687 │ 1.07882 │ 1.07626 │ 1.07814 │ 2017-02-02 01:01:25.000000000 │ 2017-02-01 22:34:35.000000000 │    2191 │        2017-02-02 │
│ 1.07974 │ 1.08076 │ 1.07316 │ 1.07688 │ 2017-02-01 12:31:15.000000000 │ 2017-02-01 19:00:00.000000000 │   13180 │        2017-02-01 │
│  1.0697 │ 1.08124 │ 1.06844 │ 1.07974 │ 2017-01-31 16:02:30.000000000 │ 2017-01-31 07:44:15.000000000 │   13835 │        2017-01-31 │
│ 1.07227 │ 1.07404 │ 1.06202 │ 1.06958 │ 2017-01-30 01:45:55.000000000 │ 2017-01-30 13:01:50.000000000 │   12477 │        2017-01-30 │
│ 1.06794 │ 1.07252 │ 1.06583 │ 1.06978 │ 2017-01-27 14:10:50.000000000 │ 2017-01-27 07:48:45.000000000 │   12420 │        2017-01-27 │
│ 1.07493 │ 1.07655 │ 1.06577 │ 1.06825 │ 2017-01-26 01:42:20.000000000 │ 2017-01-26 16:44:10.000000000 │   12999 │        2017-01-26 │
│ 1.07297 │   1.077 │ 1.07112 │ 1.07487 │ 2017-0

Unnamed: 0,open,high,low,close,high_timestamp,low_timestamp,samples
2017-02-02,1.07687,1.07882,1.07626,1.07814,2017-02-01 20:01:25-05:00,2017-02-01 17:34:35-05:00,2191
2017-02-01,1.07974,1.08076,1.07316,1.07688,2017-02-01 07:31:15-05:00,2017-02-01 14:00:00-05:00,13180
2017-01-31,1.0697,1.08124,1.06844,1.07974,2017-01-31 11:02:30-05:00,2017-01-31 02:44:15-05:00,13835
2017-01-30,1.07227,1.07404,1.06202,1.06958,2017-01-29 20:45:55-05:00,2017-01-30 08:01:50-05:00,12477
2017-01-27,1.06794,1.07252,1.06583,1.06978,2017-01-27 09:10:50-05:00,2017-01-27 02:48:45-05:00,12420
2017-01-26,1.07493,1.07655,1.06577,1.06825,2017-01-25 20:42:20-05:00,2017-01-26 11:44:10-05:00,12999
2017-01-25,1.07297,1.077,1.07112,1.07487,2017-01-25 07:26:15-05:00,2017-01-25 02:13:50-05:00,13139
2017-01-24,1.0767,1.07752,1.07204,1.07313,2017-01-24 10:02:50-05:00,2017-01-24 15:01:40-05:00,13393
2017-01-23,1.07031,1.07694,1.06948,1.07639,2017-01-23 16:54:40-05:00,2017-01-22 17:22:00-05:00,13593
2017-01-20,1.06628,1.07095,1.06252,1.07012,2017-01-20 14:59:15-05:00,2017-01-20 08:01:45-05:00,13767


In [10]:
readbackSmallSample()

┌─instrument─┬─tf─┬────open─┬────high─┬─────low─┬───close─┬───────────────timestamp─┐
│ EUR_USD    │ S5 │ 1.09508 │ 1.09508 │ 1.09508 │ 1.09508 │ 2024-01-12 21:58:00.000 │
│ EUR_USD    │ S5 │ 1.09508 │ 1.09508 │ 1.09506 │ 1.09508 │ 2024-01-12 21:57:55.000 │
│ EUR_USD    │ S5 │ 1.09508 │ 1.09508 │ 1.09508 │ 1.09508 │ 2024-01-12 21:57:50.000 │
│ EUR_USD    │ S5 │   1.095 │   1.095 │ 1.09498 │ 1.09498 │ 2024-01-12 21:57:45.000 │
│ EUR_USD    │ S5 │ 1.09498 │ 1.09498 │ 1.09498 │ 1.09498 │ 2024-01-12 21:57:40.000 │
└────────────┴────┴─────────┴─────────┴─────────┴─────────┴─────────────────────────┘


In [21]:
instrument = 'EUR_USD'
tf = 'D1'
db = Session(path="/tmp/aggregated_oanda")
    
query = f"select * from quant.ohlc_agg where instrument='{instrument}' and tf='{tf}' order by interval"
# query = f"select count(*) from (select * from quant.ohlc_agg where instrument='{instrument}' and tf='{tf}' group by instrument, tf, open, high,low, close, interval, high_timestamp, low_timestamp)"
dfres = db.query(query, 'dataframe')

dfres['instrument'] = dfres['instrument'].astype(pd.StringDtype())
dfres['tf'] = dfres['tf'].astype(pd.StringDtype())
dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
dfres['high_timestamp']  = dfres['high_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
dfres['low_timestamp'] = dfres['low_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
dfres = dfres.set_index(['interval'])
dfres.info()
dfres.tail(50)

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 30 entries, 2023-10-31 19:00:00-05:00 to 2023-12-06 19:00:00-05:00
Data columns (total 8 columns):
 #   Column          Non-Null Count  Dtype              
---  ------          --------------  -----              
 0   instrument      30 non-null     string             
 1   tf              30 non-null     string             
 2   open            30 non-null     float64            
 3   high            30 non-null     float64            
 4   low             30 non-null     float64            
 5   close           30 non-null     float64            
 6   high_timestamp  30 non-null     datetime64[ns, EST]
 7   low_timestamp   30 non-null     datetime64[ns, EST]
dtypes: datetime64[ns, EST](2), float64(4), string(2)
memory usage: 2.1 KB


Unnamed: 0_level_0,instrument,tf,open,high,low,close,high_timestamp,low_timestamp
interval,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-10-31 19:00:00-05:00,EUR_USD,D1,1.05692,1.05776,1.05166,1.0569,2023-11-01 03:02:55-05:00,2023-11-01 14:42:30-05:00
2023-11-01 19:00:00-05:00,EUR_USD,D1,1.05704,1.06678,1.0567,1.06224,2023-11-02 08:53:50-05:00,2023-11-01 17:08:25-05:00
2023-11-02 19:00:00-05:00,EUR_USD,D1,1.06197,1.07469,1.06149,1.07286,2023-11-03 13:05:35-05:00,2023-11-02 20:50:30-05:00
2023-11-05 19:00:00-05:00,EUR_USD,D1,1.0725,1.07564,1.07165,1.07177,2023-11-06 03:39:10-05:00,2023-11-06 16:24:25-05:00
2023-11-06 19:00:00-05:00,EUR_USD,D1,1.07164,1.07224,1.07052,1.07066,2023-11-06 18:28:30-05:00,2023-11-06 23:46:55-05:00
2023-11-06 19:00:00-05:00,EUR_USD,D1,1.07113,1.07142,1.06642,1.07,2023-11-06 22:34:50-05:00,2023-11-07 09:38:45-05:00
2023-11-07 19:00:00-05:00,EUR_USD,D1,1.07006,1.07162,1.06594,1.07093,2023-11-08 11:57:55-05:00,2023-11-08 05:38:00-05:00
2023-11-08 19:00:00-05:00,EUR_USD,D1,1.07072,1.07256,1.06603,1.06681,2023-11-09 09:58:25-05:00,2023-11-09 14:43:50-05:00
2023-11-09 19:00:00-05:00,EUR_USD,D1,1.06668,1.06929,1.06562,1.06847,2023-11-10 08:56:30-05:00,2023-11-10 02:33:00-05:00
2023-11-12 19:00:00-05:00,EUR_USD,D1,1.06842,1.06908,1.06812,1.06833,2023-11-12 18:59:00-05:00,2023-11-12 20:25:05-05:00


In [None]:
# 2023-11-30 19:00:00-05:00	EUR_USD	D1	1.08862	1.09128	1.08860	1.09078	2023-11-30 21:43:35-05:00	2023-11-30 17:05:00-05:00

# 2023-11-30 19:00:00-05:00	EUR_USD	D1	1.09078	1.09124	1.08289	1.08833	2023-12-01 00:59:25-05:00	2023-12-01 11:01:15-05:00
# 2023-11-30 19:00:00-05:00	EUR_USD	D1	1.08862	1.09128	1.08860	1.09078	2023-11-30 21:43:35-05:00	2023-11-30 17:05:00-05:00

In [24]:
instrument = 'EUR_USD'
tf = 'D1'
datafile = f"../data/oanda_{instrument}_{tf}.parquet"


dfi = loadOptimisedSampleData(datafile)
dfi.head(100)

Loaded Optimised Sample Data: 0.008879423141479492


Unnamed: 0_level_0,instrument,tf,open,high,low,close,high_timestamp,low_timestamp
interval,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-10-31 19:00:00-05:00,EUR_USD,D1,1.05692,1.05776,1.05166,1.0569,2023-11-01 03:02:55-05:00,2023-11-01 14:42:30-05:00
2023-11-01 19:00:00-05:00,EUR_USD,D1,1.05704,1.06678,1.0567,1.06224,2023-11-02 08:53:50-05:00,2023-11-01 17:08:25-05:00
2023-11-02 19:00:00-05:00,EUR_USD,D1,1.06197,1.07469,1.06149,1.07286,2023-11-03 13:05:35-05:00,2023-11-02 20:50:30-05:00
2023-11-05 19:00:00-05:00,EUR_USD,D1,1.0725,1.07564,1.07165,1.07177,2023-11-06 03:39:10-05:00,2023-11-06 16:24:25-05:00
2023-11-06 19:00:00-05:00,EUR_USD,D1,1.07164,1.07224,1.07052,1.07066,2023-11-06 18:28:30-05:00,2023-11-06 23:46:55-05:00
2023-11-06 19:00:00-05:00,EUR_USD,D1,1.07113,1.07142,1.06642,1.07,2023-11-06 22:34:50-05:00,2023-11-07 09:38:45-05:00
2023-11-07 19:00:00-05:00,EUR_USD,D1,1.07006,1.07162,1.06594,1.07093,2023-11-08 11:57:55-05:00,2023-11-08 05:38:00-05:00
2023-11-08 19:00:00-05:00,EUR_USD,D1,1.07072,1.07256,1.06603,1.06681,2023-11-09 09:58:25-05:00,2023-11-09 14:43:50-05:00
2023-11-09 19:00:00-05:00,EUR_USD,D1,1.06668,1.06929,1.06562,1.06847,2023-11-10 08:56:30-05:00,2023-11-10 02:33:00-05:00
2023-11-12 19:00:00-05:00,EUR_USD,D1,1.06842,1.06908,1.06812,1.06833,2023-11-12 18:59:00-05:00,2023-11-12 20:25:05-05:00


In [25]:
instrument = 'EUR_USD'
tf = 'D1'
datafile = f"../data/oanda_data_intermediate_{instrument}_{tf}.parquet"
dfi = loadOptimisedSampleData(datafile)
dfi.head(10)

Loaded Optimised Sample Data: 0.0878593921661377


Unnamed: 0,interval,open,high,low,close,high_timestamp,low_timestamp,samples
2023-12-13,2023-12-12 19:00:00-05:00,1.07934,1.07999,1.07843,1.07874,2023-12-12 19:07:25-05:00,2023-12-12 21:53:00-05:00,2318
2023-12-12,2023-12-11 19:00:00-05:00,1.07624,1.08282,1.07603,1.07946,2023-12-12 08:30:15-05:00,2023-12-11 17:10:05-05:00,13922
2023-12-11,2023-12-10 19:00:00-05:00,1.07588,1.0779,1.07416,1.07644,2023-12-11 05:18:55-05:00,2023-12-11 11:32:05-05:00,14060
2023-12-08,2023-12-07 19:00:00-05:00,1.07944,1.08008,1.07237,1.0761,2023-12-07 20:02:40-05:00,2023-12-08 08:31:05-05:00,15237
2023-12-07,2023-12-06 19:00:00-05:00,1.07651,1.08178,1.07551,1.0793,2023-12-07 12:50:05-05:00,2023-12-07 02:00:35-05:00,12105


In [56]:
instrument = 'EUR_USD'
tf = 'D1'
tzOffset = 'EST'

datafile = f"../data/oanda_{instrument}_{tf}.parquet"
data = f"file('{datafile}', Parquet)"

db = Session()

db.query("create database IF NOT EXISTS quant1")

db.query(f"""
create table IF NOT EXISTS quant1.ohlc_agg (
instrument LowCardinality(String),
tf LowCardinality(String),
open Float64,
high Float64,
low Float64,
close Float64,
interval DateTime64,
high_timestamp DateTime64,
low_timestamp DateTime64
) engine MergeTree
PRIMARY KEY (instrument, tf, interval);
""")

query = f"""
INSERT INTO quant1.ohlc_agg (instrument, tf, open, high, low, close, high_timestamp, low_timestamp, interval) 
select '{instrument}' as instrument, '{tf}' as tf, max(open) as open, max(high) as high, max(low) as low, max(close) as close, 
toDateTime(max(high_timestamp), '{tzOffset}') as high_timestamp, toDateTime(max(low_timestamp), '{tzOffset}') as low_timestamp,
interval from {data} group by instrument, tf, interval Order by interval
"""

res = db.query(query)
# #     res = cdf.query(sql=query, df=intermediate_df)
query = f"select * from quant1.ohlc_agg where instrument='{instrument}' and tf='{tf}'"
# res = db.query()
# print(res)

dfres = db.query(query, 'dataframe')

dfres['instrument'] = dfres['instrument'].astype(pd.StringDtype())
dfres['tf'] = dfres['tf'].astype(pd.StringDtype())
# dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))

dfres['interval'] = pd.to_datetime(dfres['interval']).dt.date
dfres['high_timestamp']  = dfres['high_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
dfres['low_timestamp'] = dfres['low_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
dfres = dfres.set_index(['interval'])
dfres.info()
dfres.tail(50)

<class 'pandas.core.frame.DataFrame'>
Index: 13 entries, 2023-12-01 to 2023-12-19
Data columns (total 8 columns):
 #   Column          Non-Null Count  Dtype              
---  ------          --------------  -----              
 0   instrument      13 non-null     string             
 1   tf              13 non-null     string             
 2   open            13 non-null     float64            
 3   high            13 non-null     float64            
 4   low             13 non-null     float64            
 5   close           13 non-null     float64            
 6   high_timestamp  13 non-null     datetime64[ns, EST]
 7   low_timestamp   13 non-null     datetime64[ns, EST]
dtypes: datetime64[ns, EST](2), float64(4), string(2)
memory usage: 936.0+ bytes


Unnamed: 0_level_0,instrument,tf,open,high,low,close,high_timestamp,low_timestamp
interval,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-12-01,EUR_USD,D1,1.09078,1.09124,1.08289,1.08833,2023-12-01 00:59:25-05:00,2023-12-01 11:01:15-05:00
2023-12-04,EUR_USD,D1,1.08786,1.0895,1.08041,1.08363,2023-12-03 18:27:55-05:00,2023-12-04 10:43:40-05:00
2023-12-05,EUR_USD,D1,1.08354,1.08476,1.0778,1.07983,2023-12-05 00:59:55-05:00,2023-12-05 13:42:10-05:00
2023-12-06,EUR_USD,D1,1.0794,1.08048,1.07587,1.07644,2023-12-06 09:27:50-05:00,2023-12-06 15:39:50-05:00
2023-12-07,EUR_USD,D1,1.07686,1.08178,1.07578,1.0793,2023-12-07 12:50:05-05:00,2023-12-07 02:00:35-05:00
2023-12-08,EUR_USD,D1,1.07944,1.08008,1.07237,1.0761,2023-12-07 20:02:40-05:00,2023-12-08 08:31:05-05:00
2023-12-11,EUR_USD,D1,1.07588,1.0779,1.07416,1.07644,2023-12-11 05:18:55-05:00,2023-12-11 11:32:05-05:00
2023-12-12,EUR_USD,D1,1.07624,1.08282,1.07603,1.07946,2023-12-12 08:30:15-05:00,2023-12-11 17:10:05-05:00
2023-12-13,EUR_USD,D1,1.07934,1.08966,1.07843,1.08751,2023-12-13 15:09:50-05:00,2023-12-13 05:48:30-05:00
2023-12-14,EUR_USD,D1,1.08772,1.10094,1.08736,1.09918,2023-12-14 12:58:20-05:00,2023-12-13 17:32:15-05:00


In [60]:
instrument = 'EUR_USD'
tf = 'D1'
tzOffset = 'EST'

datafile = f"../data/oanda_{instrument}_{tf}.parquet"
data = f"file('{datafile}', Parquet)"

db = Session()

db.query("create database IF NOT EXISTS quant1")
# db.query(f"""drop table quant1.ohlc_agg""")
db.query(f"""
create table IF NOT EXISTS quant1.ohlc_agg (
instrument LowCardinality(String),
tf LowCardinality(String),
open Float64,
high Float64,
low Float64,
close Float64,
interval DateTime64,
high_timestamp DateTime64,
low_timestamp DateTime64
) engine MergeTree
PRIMARY KEY (instrument, tf, interval);
""")

query = f"""
INSERT INTO quant1.ohlc_agg (instrument, tf, open, high, low, close, high_timestamp, low_timestamp, interval) 
select '{instrument}' as instrument, '{tf}' as tf, max(open) as open, max(high) as high, max(low) as low, max(close) as close, 
toDateTime(max(high_timestamp), '{tzOffset}') as high_timestamp, toDateTime(max(low_timestamp), '{tzOffset}') as low_timestamp,
interval from {data} group by instrument, tf, interval Order by interval
"""

res = db.query(query)
# #     res = cdf.query(sql=query, df=intermediate_df)
query = f"select * from quant1.ohlc_agg where instrument='{instrument}' and tf='{tf}'"
# res = db.query()
# print(res)

dfres = db.query(query, 'dataframe')

dfres['instrument'] = dfres['instrument'].astype(pd.StringDtype())
dfres['tf'] = dfres['tf'].astype(pd.StringDtype())
# dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))

dfres['interval'] = pd.to_datetime(dfres['interval']).dt.date
dfres['high_timestamp']  = dfres['high_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
dfres['low_timestamp'] = dfres['low_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
dfres = dfres.set_index(['interval'])
dfres.info()
dfres.tail(50)

<class 'pandas.core.frame.DataFrame'>
Index: 13 entries, 2023-12-01 to 2023-12-19
Data columns (total 8 columns):
 #   Column          Non-Null Count  Dtype              
---  ------          --------------  -----              
 0   instrument      13 non-null     string             
 1   tf              13 non-null     string             
 2   open            13 non-null     float64            
 3   high            13 non-null     float64            
 4   low             13 non-null     float64            
 5   close           13 non-null     float64            
 6   high_timestamp  13 non-null     datetime64[ns, EST]
 7   low_timestamp   13 non-null     datetime64[ns, EST]
dtypes: datetime64[ns, EST](2), float64(4), string(2)
memory usage: 936.0+ bytes


Unnamed: 0_level_0,instrument,tf,open,high,low,close,high_timestamp,low_timestamp
interval,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-12-01,EUR_USD,D1,1.09078,1.09124,1.08289,1.08833,2023-12-01 00:59:25-05:00,2023-12-01 11:01:15-05:00
2023-12-04,EUR_USD,D1,1.08786,1.0895,1.08041,1.08363,2023-12-03 18:27:55-05:00,2023-12-04 10:43:40-05:00
2023-12-05,EUR_USD,D1,1.08365,1.08476,1.08305,1.08366,2023-12-05 00:59:55-05:00,2023-12-05 13:42:10-05:00
2023-12-06,EUR_USD,D1,1.0794,1.08048,1.07587,1.07644,2023-12-06 09:27:50-05:00,2023-12-06 15:39:50-05:00
2023-12-07,EUR_USD,D1,1.07686,1.08178,1.07621,1.0793,2023-12-07 12:50:05-05:00,2023-12-07 02:00:35-05:00
2023-12-08,EUR_USD,D1,1.07944,1.08008,1.07237,1.0761,2023-12-07 20:02:40-05:00,2023-12-08 08:31:05-05:00
2023-12-11,EUR_USD,D1,1.07661,1.0779,1.07584,1.07662,2023-12-11 05:18:55-05:00,2023-12-11 11:32:05-05:00
2023-12-12,EUR_USD,D1,1.07624,1.08282,1.07603,1.07946,2023-12-12 08:30:15-05:00,2023-12-11 17:10:05-05:00
2023-12-13,EUR_USD,D1,1.07934,1.08966,1.07843,1.08751,2023-12-13 15:09:50-05:00,2023-12-13 05:48:30-05:00
2023-12-14,EUR_USD,D1,1.08772,1.10094,1.08736,1.09918,2023-12-14 12:58:20-05:00,2023-12-13 17:32:15-05:00


In [64]:
instrument = 'EUR_USD'
tf = 'D1'
# datafile = f"../data/oanda_data_intermediate_{instrument}_{tf}.parquet"
datafile = f"../data/oanda_{instrument}_{tf}.parquet"
dfi = loadOptimisedSampleData(datafile)
dfi.head(10)

Loaded Optimised Sample Data: 0.020707130432128906


Unnamed: 0_level_0,instrument,tf,open,high,low,close,high_timestamp,low_timestamp
interval,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-11-30 19:00:00-05:00,EUR_USD,D1,1.09078,1.09124,1.08289,1.08833,2023-12-01 00:59:25-05:00,2023-12-01 11:01:15-05:00
2023-12-03 19:00:00-05:00,EUR_USD,D1,1.08786,1.0895,1.08041,1.08363,2023-12-03 18:27:55-05:00,2023-12-04 10:43:40-05:00
2023-12-04 19:00:00-05:00,EUR_USD,D1,1.08354,1.08464,1.08305,1.08366,2023-12-04 19:39:15-05:00,2023-12-04 17:05:00-05:00
2023-12-04 19:00:00-05:00,EUR_USD,D1,1.08365,1.08476,1.0778,1.07983,2023-12-05 00:59:55-05:00,2023-12-05 13:42:10-05:00
2023-12-05 19:00:00-05:00,EUR_USD,D1,1.0794,1.08048,1.07587,1.07644,2023-12-06 09:27:50-05:00,2023-12-06 15:39:50-05:00
2023-12-06 19:00:00-05:00,EUR_USD,D1,1.07686,1.07724,1.07621,1.0765,2023-12-06 19:38:20-05:00,2023-12-06 21:55:00-05:00
2023-12-06 19:00:00-05:00,EUR_USD,D1,1.07651,1.08178,1.07551,1.0793,2023-12-07 12:50:05-05:00,2023-12-07 02:00:35-05:00
2023-12-07 19:00:00-05:00,EUR_USD,D1,1.07944,1.08008,1.07237,1.0761,2023-12-07 20:02:40-05:00,2023-12-08 08:31:05-05:00
2023-12-10 19:00:00-05:00,EUR_USD,D1,1.07588,1.0774,1.07584,1.07662,2023-12-10 20:38:15-05:00,2023-12-10 17:03:00-05:00
2023-12-10 19:00:00-05:00,EUR_USD,D1,1.07661,1.0779,1.07416,1.07644,2023-12-11 05:18:55-05:00,2023-12-11 11:32:05-05:00


In [68]:
instrument = 'EUR_USD'
tf = 'D1'
# datafile = f"../data/oanda_data_intermediate_{instrument}_{tf}.parquet"
datafile = f"../data/oanda_{instrument}_{tf}.parquet"
dfi = loadOptimisedSampleData(datafile)
dfi.head(10)

Loaded Optimised Sample Data: 0.15960240364074707


Unnamed: 0_level_0,instrument,tf,open,high,low,close,high_timestamp,low_timestamp
interval,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-11-30 19:00:00-05:00,EUR_USD,D1,1.09067,1.09124,1.08289,1.08833,2023-12-01 00:59:25-05:00,2023-12-01 11:01:15-05:00
2023-12-03 19:00:00-05:00,EUR_USD,D1,1.08786,1.0895,1.08041,1.08363,2023-12-03 18:27:55-05:00,2023-12-04 10:43:40-05:00
2023-12-04 19:00:00-05:00,EUR_USD,D1,1.08354,1.08464,1.08305,1.08442,2023-12-04 19:39:15-05:00,2023-12-04 17:05:00-05:00
2023-12-04 19:00:00-05:00,EUR_USD,D1,1.08441,1.08476,1.0778,1.07983,2023-12-05 00:59:55-05:00,2023-12-05 13:42:10-05:00
2023-12-05 19:00:00-05:00,EUR_USD,D1,1.0794,1.08048,1.07587,1.07644,2023-12-06 09:27:50-05:00,2023-12-06 15:39:50-05:00
2023-12-06 19:00:00-05:00,EUR_USD,D1,1.07686,1.07724,1.07578,1.07604,2023-12-06 19:38:20-05:00,2023-12-06 23:53:25-05:00
2023-12-06 19:00:00-05:00,EUR_USD,D1,1.07604,1.08178,1.07551,1.0793,2023-12-07 12:50:05-05:00,2023-12-07 02:00:35-05:00
2023-12-07 19:00:00-05:00,EUR_USD,D1,1.07944,1.08008,1.07237,1.0761,2023-12-07 20:02:40-05:00,2023-12-08 08:31:05-05:00
2023-12-10 19:00:00-05:00,EUR_USD,D1,1.07588,1.0774,1.07584,1.07631,2023-12-10 20:38:15-05:00,2023-12-10 17:03:00-05:00
2023-12-10 19:00:00-05:00,EUR_USD,D1,1.0763,1.0779,1.07416,1.07644,2023-12-11 05:18:55-05:00,2023-12-11 11:32:05-05:00


In [51]:
instrument = 'EUR_USD'
tf = 'D1'
# datafile = f"../data/oanda_data_intermediate_{instrument}_{tf}.parquet"
datafile = f"../data/oanda_{instrument}_{tf}.parquet"
dfi = loadOptimisedSampleData(datafile)
display = dfi[['open','high','low', 'close','first_sample', 'last_sample', 'samples']]
display.head(20)

# 5 delta

Loaded Optimised Sample Data: 0.00843358039855957


Unnamed: 0,open,high,low,close,first_sample,last_sample,samples
2023-12-11,1.07588,1.0779,1.07416,1.07644,2023-12-10 17:03:00-05:00,2023-12-11 16:57:25-05:00,14060
2023-12-12,1.07624,1.08282,1.07603,1.07946,2023-12-11 17:03:00-05:00,2023-12-12 16:58:00-05:00,13922
2023-12-13,1.07934,1.08966,1.07732,1.08751,2023-12-12 17:03:00-05:00,2023-12-13 16:58:00-05:00,14404
2023-12-14,1.08772,1.10094,1.08736,1.09918,2023-12-13 17:03:00-05:00,2023-12-14 16:58:00-05:00,15781
2023-12-15,1.09957,1.10037,1.08886,1.08948,2023-12-14 17:03:00-05:00,2023-12-15 16:57:40-05:00,15306
2023-12-18,1.08976,1.09312,1.08921,1.09236,2023-12-17 17:03:00-05:00,2023-12-18 16:57:55-05:00,14199
2023-12-19,1.09188,1.09876,1.09148,1.09809,2023-12-18 17:03:00-05:00,2023-12-19 16:57:55-05:00,14694
2023-12-20,1.09809,1.09852,1.09298,1.09434,2023-12-19 17:03:00-05:00,2023-12-20 16:57:50-05:00,14349
2023-12-21,1.09404,1.10127,1.09352,1.10122,2023-12-20 17:03:00-05:00,2023-12-21 16:57:55-05:00,14732
2023-12-22,1.10102,1.10404,1.0994,1.10146,2023-12-21 17:03:00-05:00,2023-12-22 16:57:55-05:00,14533


In [69]:
instrument = 'EUR_USD'
tf = 'D1'
# datafile = f"../data/oanda_data_intermediate_{instrument}_{tf}.parquet"
datafile = f"../data/oanda_{instrument}_{tf}.parquet"
dfi = loadOptimisedSampleData(datafile)
display = dfi[['open','high','low', 'close','first_sample', 'last_sample', 'samples']]
display.head(20)

# 5 delta

Loaded Optimised Sample Data: 0.00909113883972168


Unnamed: 0,open,high,low,close,first_sample,last_sample,samples
2023-12-11,1.07588,1.0779,1.07416,1.07644,2023-12-10 19:03:00-05:00,2023-12-11 18:57:25-05:00,14060
2023-12-12,1.07624,1.08282,1.07603,1.07946,2023-12-11 19:03:00-05:00,2023-12-12 18:58:00-05:00,13922
2023-12-13,1.07934,1.08966,1.07732,1.08751,2023-12-12 19:03:00-05:00,2023-12-13 18:58:00-05:00,14404
2023-12-14,1.08772,1.10094,1.08736,1.09918,2023-12-13 19:03:00-05:00,2023-12-14 18:58:00-05:00,15781
2023-12-15,1.09957,1.10037,1.08886,1.08948,2023-12-14 19:03:00-05:00,2023-12-15 18:57:40-05:00,15306
2023-12-18,1.08976,1.09312,1.08921,1.09236,2023-12-17 19:03:00-05:00,2023-12-18 18:57:55-05:00,14199
2023-12-19,1.09188,1.09876,1.09148,1.09809,2023-12-18 19:03:00-05:00,2023-12-19 18:57:55-05:00,14694
2023-12-20,1.09809,1.09852,1.09298,1.09434,2023-12-19 19:03:00-05:00,2023-12-20 18:57:50-05:00,14349
2023-12-21,1.09404,1.10127,1.09352,1.10122,2023-12-20 19:03:00-05:00,2023-12-21 18:57:55-05:00,14732
2023-12-22,1.10102,1.10404,1.0994,1.10146,2023-12-21 19:03:00-05:00,2023-12-22 18:57:55-05:00,14533


In [57]:
instrument = 'EUR_USD'
tf = 'W1'
datafile = f"../data/oanda_data_intermediate_EUR_USD_W1.parquet"
# datafile = f"../data/oanda_{instrument}_{tf}.parquet"
dfi = loadOptimisedSampleData(datafile)
# dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# dfi.index = dfi.index.date
dfi.head(20)

# 4 delta

Loaded Optimised Sample Data: 0.00610804557800293


Unnamed: 0,interval,open,high,low,close,high_timestamp,low_timestamp,samples,first_sample,last_sample
0,2024-01-07 19:00:00-05:00,1.09487,1.09996,1.09102,1.09508,2024-01-11 10:30:00-05:00,2024-01-09 13:09:15-05:00,71947,2024-01-07 19:03:00-05:00,2024-01-12 18:58:00-05:00


In [66]:
instrument = 'EUR_USD'
tf = 'W1'
# datafile = f"../data/oanda_data_intermediate_{instrument}_{tf}.parquet"
datafile = f"../data/oanda_{instrument}_{tf}.parquet"
dfi = loadOptimisedSampleData(datafile)
# dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# dfi.index = dfi.index.date
display = dfi[['open','high','low', 'close','first_sample', 'last_sample', 'samples']]
display.head(20)

# 4 delta

Loaded Optimised Sample Data: 0.005912303924560547


Unnamed: 0,open,high,low,close,first_sample,last_sample,samples
2023-12-11,1.07588,1.10094,1.07416,1.08948,2023-12-10 17:03:00-05:00,2023-12-15 16:57:40-05:00,73473
2023-12-18,1.08976,1.10404,1.08921,1.10146,2023-12-17 17:03:00-05:00,2023-12-22 16:57:55-05:00,72507
2023-12-25,1.10086,1.11396,1.10079,1.10374,2023-12-25 17:03:00-05:00,2023-12-29 16:57:55-05:00,52829
2024-01-01,1.1044,1.10453,1.0877,1.09423,2024-01-01 17:03:00-05:00,2024-01-05 16:57:55-05:00,57366
2024-01-08,1.09487,1.09996,1.09102,1.09508,2024-01-07 17:03:00-05:00,2024-01-12 16:58:00-05:00,71947


In [72]:
instrument = 'EUR_USD'
tf = 'M15'
# datafile = f"../data/oanda_data_intermediate_{instrument}_{tf}.parquet"
datafile = f"../data/oanda_{instrument}_{tf}.parquet"
dfi = loadOptimisedSampleData(datafile)
# dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
# dfi.index = dfi.index.date
display = dfi[['open','high','low', 'close','first_sample', 'last_sample', 'samples']]
display.head(20)

Loaded Optimised Sample Data: 0.009447813034057617


Unnamed: 0_level_0,open,high,low,close,first_sample,last_sample,samples
interval,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2023-12-10 17:00:00-05:00,1.07588,1.07614,1.07584,1.0761,2023-12-10 17:03:00-05:00,2023-12-10 17:14:35-05:00,55
2023-12-10 17:15:00-05:00,1.07608,1.07611,1.07593,1.076,2023-12-10 17:15:00-05:00,2023-12-10 17:29:45-05:00,28
2023-12-10 17:30:00-05:00,1.076,1.07616,1.07584,1.07616,2023-12-10 17:30:00-05:00,2023-12-10 17:44:55-05:00,72
2023-12-10 17:45:00-05:00,1.07615,1.07619,1.07614,1.07617,2023-12-10 17:45:05-05:00,2023-12-10 17:59:30-05:00,47
2023-12-10 18:00:00-05:00,1.07616,1.07678,1.07615,1.07658,2023-12-10 18:00:00-05:00,2023-12-10 18:14:55-05:00,121
2023-12-10 18:15:00-05:00,1.07658,1.0767,1.07644,1.0765,2023-12-10 18:15:00-05:00,2023-12-10 18:29:50-05:00,101
2023-12-10 18:30:00-05:00,1.07649,1.07658,1.07626,1.07638,2023-12-10 18:30:00-05:00,2023-12-10 18:44:55-05:00,108
2023-12-10 18:45:00-05:00,1.07639,1.07654,1.07634,1.07654,2023-12-10 18:45:00-05:00,2023-12-10 18:59:55-05:00,87
2023-12-10 19:00:00-05:00,1.07654,1.07692,1.07652,1.07686,2023-12-10 19:00:00-05:00,2023-12-10 19:14:55-05:00,156
2023-12-10 19:15:00-05:00,1.07684,1.07702,1.07664,1.07682,2023-12-10 19:15:00-05:00,2023-12-10 19:29:55-05:00,150


In [73]:
dropFinalResultschDB()

In [37]:
    instrument = 'EUR_USD'
    tf = 'W1'
    db = Session(path="/tmp/aggregated_oanda")
    
    query = f"select * from quant.ohlc_agg where instrument='{instrument}' and tf='{tf}' order by interval"
    dfres = db.query(query, 'dataframe')
    dfres['instrument'] = dfres['instrument'].astype(pd.StringDtype())
    dfres['tf'] = dfres['tf'].astype(pd.StringDtype())
    dfres['high_timestamp']  = dfres['high_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    dfres['low_timestamp'] = dfres['low_timestamp'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    dfres['first_sample'] = dfres['first_sample'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    dfres['last_sample'] = dfres['last_sample'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    
    dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    # if tf not in ['D1', 'W1', 'M1']:
    #     dfres['interval'] = dfres['interval'].astype(pd.DatetimeTZDtype(tz=ZoneInfo('EST')))
    dfres = dfres.set_index(['interval'])
    dfres.index = dfres.index.date
    print(dfres)

           instrument  tf     open     high      low    close  \
2023-12-09    EUR_USD  W1  1.07588  1.10094  1.07416  1.08948   
2023-12-16    EUR_USD  W1  1.08976  1.10404  1.08921  1.10146   
2023-12-23    EUR_USD  W1  1.10086  1.11396  1.10079  1.10374   
2023-12-30    EUR_USD  W1  1.10440  1.10453  1.08770  1.09423   
2024-01-06    EUR_USD  W1  1.09487  1.09996  1.09102  1.09508   

                      high_timestamp             low_timestamp  \
2023-12-09 2023-12-14 12:58:20-05:00 2023-12-11 11:32:05-05:00   
2023-12-16 2023-12-22 10:03:40-05:00 2023-12-17 18:38:20-05:00   
2023-12-23 2023-12-28 04:37:50-05:00 2023-12-25 17:03:00-05:00   
2023-12-30 2024-01-01 17:59:45-05:00 2024-01-05 08:31:35-05:00   
2024-01-06 2024-01-11 08:30:00-05:00 2024-01-09 11:09:15-05:00   

                        first_sample               last_sample  samples  
2023-12-09 2023-12-10 17:03:00-05:00 2023-12-15 16:57:40-05:00    73473  
2023-12-16 2023-12-17 17:03:00-05:00 2023-12-22 16:57:55-05:00  