In [1]:
### Version 1: Chunk will only give you tick-data by the day and hour your select:

from pathlib import Path
import pandas as pd
import numpy as np
import gzip

pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


class DWX_TICK_DATA_IO():
    
    def __init__(self,
                 _format='{}_{}_{}_{}',
                 _extension='.log.gz',
                 _delimiter=',',
                 _path='D:/Tick data/EURUSD/EURUSD'): #Input the directory address that has the gzip file to unzip and merge
        
        self._format = _format
        self._extension = _extension
        self._delimiter = _delimiter
        self._path = _path
        self._symbol_df = None
        
    ##########################################################################
    
    # Return list of files for BID and ASK each.
    def _find_symbol_files_(self, _symbol='EURUSD',
                                  _date='2022-10-02',
                                  _hour='15'):
        
        print(f"Searching in path: {self._path}")
        if _date == '':
            _fs = [filename.name for filename in Path(self._path).glob(f'{_symbol}_*{self._extension}')]
        else:
            if _hour == '':
                _fs = [filename.name for filename in Path(self._path).glob(f'{_symbol}_*{_date}_*{self._extension}')]
            else:
                _fs = [filename.name for filename in Path(self._path).glob(f'{_symbol}_*{_date}_{_hour}*{self._extension}')]
        
        print(f"Files found: {_fs}")
    
        if len(_fs) > 0:
            return [f'{self._path}/{_f}' for _f in _fs if 'BID' in _f], [f'{self._path}/{_f}' for _f in _fs if 'ASK' in _f]
        else:
            print('[WARNING] No files found for {} - {} - {}'.format(_symbol, _date, _hour))
            return None, None
    
    ##########################################################################
    
    def _construct_data_(self, _filename):
        
        _df = pd.DataFrame([line.strip().decode().split(self._delimiter) for line in gzip.open(_filename) if len(line) > 10])
        
        if 'BID' in _filename:
            _df.columns = ['timestamp','bid_price','bid_size']
        elif 'ASK' in _filename:
            _df.columns = ['timestamp','ask_price','ask_size']
            
        _df.set_index('timestamp', drop=True, inplace=True)
        
        return _df.apply(pd.to_numeric)
    
    ##########################################################################
    
    def _get_symbol_as_dataframe_(self, _symbol='EURUSD',
                                        _date='',
                                        _hour='',
                                        _convert_epochs=True,
                                        _check_integrity=False,
                                        _calc_spread = False,
                                        _reindex=['ask_price','bid_price'],
                                        _precision='tick',
                                        _daily_start=22,
                                        _symbol_digits=5):
        
        """
        See http://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html
        for .resample() rule / frequency strings.        
        """
        
        print('[INFO] Finding symbol files.. please wait..')
        
        _bid_files, _ask_files = self._find_symbol_files_(_symbol,_date,_hour)
        
        if _bid_files is None or _ask_files is None:
            return None
        
        print('[INFO] Processing BID ({}) / ASK ({}) files.. please wait..'.format(len(_bid_files), len(_ask_files)))
        
        # BIDS
        _bids = pd.concat([self._construct_data_(_bid_files[i]) 
                    for i in range(0, len(_bid_files)) if ((print('\rBIDS: {} / {} - {}'
                                  .format(i+1,len(_bid_files),_bid_files[i]), end="", flush=True) or 1==1))], axis=0, sort=True)
        print('')
        
        # ASKS
        _asks = pd.concat([self._construct_data_(_ask_files[i]) 
                    for i in range(0, len(_ask_files)) if ((print('\rASKS: {} / {} - {}'
                                  .format(i+1,len(_ask_files),_ask_files[i]), end="", flush=True) or 1==1))], axis=0, sort=True)
                
        _df = _asks.merge(_bids, how='outer', left_index=True, right_index=True, copy=False).fillna(method='ffill').dropna()
            
        # Calculate spread?
        if _calc_spread:
            _df['spread'] = abs(np.diff(_df[['ask_price','bid_price']]))
        
        # Convert timestamps?
        if _convert_epochs:
            _df.index = pd.to_datetime(_df.index, unit='ms')
        
        # Reindex to selected columns?
        if len(_reindex) > 0:
            _df = _df.reindex(_reindex, axis=1)
            
        # Resample?
        if _precision != 'tick':
            _df['mid_price'] = round((_df.ask_price + _df.bid_price) / 2, _symbol_digits)
            
            if _precision not in ['B','C','D','W','24H']:
                _df = _df.mid_price.resample(rule=_precision).ohlc()
            else:
                _df = _df.mid_price.resample(rule=_precision, base=_daily_start).ohlc().dropna()
            
        # Check data integrity?
        if _check_integrity:
            
            print('\n\n[INFO] Checking data integrity..')
            self._integrity_check_(_df)
        
        return _df
    
    ##########################################################################
    
    def _integrity_check_(self, _df):
        
        if isinstance(_df, pd.DataFrame) == False:
            
            print('[ERROR] Input must be a Pandas DataFrame')
            
        else:
            
            _diff = _df.index.to_series().diff()
            
            print('\n[TEST #1] Data Frequency Statistics\n--')
            print(_diff.describe())
            
            print('\n[TEST #2] Mode of Gap Distribution\n--')
            print(_diff.value_counts().head(1))
            
            print('\n[TEST #3] Hourly Spread Distribution\n--')
            _df.groupby(_df.index.hour).spread.mean().plot(
                    xticks=range(0,24), 
                    title='Average Spread by Hour (UTC)')
            
    ##########################################################################


In [2]:
### Version 2: On the other hand, merge by each year. Input _date & _hour are now only _year.

from pathlib import Path
import pandas as pd
import numpy as np
import gzip
from datetime import datetime, timedelta

pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

class DWX_TICK_DATA_IO:
    
    def __init__(self, 
                 _format='{}_{}_{}_{}', 
                 _extension='.log.gz', 
                 _delimiter=',', 
                 _path='D:/Tick data/EURUSD/EURUSD'):
        
        self._format = _format
        self._extension = _extension
        self._delimiter = _delimiter
        self._path = _path
        self._symbol_df = None
        
    ##########################################################################
    
    def _find_symbol_files_(self, _symbol='EURUSD', _year='2024'):
        
        print(f"Searching in path: {self._path}")
        
        files = [filename.name for filename in Path(self._path).glob(f'{_symbol}_BID_{_year}-*{self._extension}')] + \
                [filename.name for filename in Path(self._path).glob(f'{_symbol}_ASK_{_year}-*{self._extension}')]

        print(f"Files found: {files}")
    
        if len(files) > 0:
            return [f'{self._path}/{_f}' for _f in files if 'BID' in _f], [f'{self._path}/{_f}' for _f in files if 'ASK' in _f]
        else:
            print(f'[WARNING] No files found for {_symbol} in {_year}')
            return None, None
    
    ##########################################################################
    
    def _construct_data_(self, _filename, chunksize=100000):
        try:
            # Read data from gzip file in chunks
            with gzip.open(_filename, 'rt') as f:
                chunk_list = []
                chunk_iter = pd.read_csv(f, delimiter=self._delimiter, chunksize=chunksize, header=None)
                for chunk in chunk_iter:
                    if 'BID' in _filename:
                        chunk.columns = ['timestamp', 'bid_price', 'bid_size']
                    elif 'ASK' in _filename:
                        chunk.columns = ['timestamp', 'ask_price', 'ask_size']
                    else:
                        raise ValueError(f"Unexpected filename format: {_filename}")
                    
                    chunk.set_index('timestamp', drop=True, inplace=True)
                    chunk_list.append(chunk.apply(pd.to_numeric))
                
                return pd.concat(chunk_list, axis=0)
        
        except Exception as e:
            print(f"[ERROR] Failed to construct data from {_filename}: {e}")
            return pd.DataFrame()
    
    ##########################################################################
    
    def _get_symbol_as_dataframe_(self, _symbol='EURUSD', _year='2024', _convert_epochs=True, _check_integrity=False, _calc_spread=False, _reindex=['ask_price', 'bid_price'], _precision='tick', _daily_start=22, _symbol_digits=5):
        print('[INFO] Finding symbol files.. please wait..')
        
        _bid_files, _ask_files = self._find_symbol_files_(_symbol, _year)
        
        if _bid_files is None or _ask_files is None:
            return None
        
        print(f'[INFO] Processing BID ({len(_bid_files)}) / ASK ({len(_ask_files)}) files.. please wait..')
        
        # Process BIDS in chunks
        bid_chunks = []
        for i in range(len(_bid_files)):
            bid_chunk = self._construct_data_(_bid_files[i])
            bid_chunks.append(bid_chunk)
            print(f'Processed BID file {i+1}/{len(_bid_files)}')

        _bids = pd.concat(bid_chunks, axis=0, sort=True)
        print('')
        
        # Process ASKS in chunks
        ask_chunks = []
        for i in range(len(_ask_files)):
            ask_chunk = self._construct_data_(_ask_files[i])
            ask_chunks.append(ask_chunk)
            print(f'Processed ASK file {i+1}/{len(_ask_files)}')

        _asks = pd.concat(ask_chunks, axis=0, sort=True)
        
        _df = _asks.merge(_bids, how='outer', left_index=True, right_index=True, copy=False).fillna(method='ffill').dropna()
            
        # Calculate spread?
        if _calc_spread:
            _df['spread'] = abs(_df['ask_price'] - _df['bid_price'])
        
        # Convert timestamps?
        if _convert_epochs:
            _df.index = pd.to_datetime(_df.index, unit='ms')
        
        # Reindex to selected columns?
        if len(_reindex) > 0:
            _df = _df.reindex(_reindex, axis=1)
            
        # Resample?
        if _precision != 'tick':
            _df['mid_price'] = round((_df.ask_price + _df.bid_price) / 2, _symbol_digits)
            
            if _precision not in ['B', 'C', 'D', 'W', '24H']:
                _df = _df.mid_price.resample(rule=_precision).ohlc()
            else:
                _df = _df.mid_price.resample(rule=_precision, origin='start_day').ohlc().dropna()        
        return _df

In [3]:
### Extrapolate Data From Existing Files

# Instantiate the class
io = DWX_TICK_DATA_IO()

# Call the method on the instance
_eurusd = io._get_symbol_as_dataframe_(_symbol='EURUSD', 
                                       _year='2022',
                                       _convert_epochs=True,
                                       _calc_spread = True,
                                       _reindex=['ask_price','bid_price', 'spread'],
                                       _precision='tick')

# Print or process the dataframe as needed
if _eurusd is not None:
    print(_eurusd)
else:
    print("No data available for the specified parameters.")

[INFO] Finding symbol files.. please wait..
Searching in path: D:/Tick data/EURUSD/EURUSD
Files found: ['EURUSD_BID_2022-01-02_22.log.gz', 'EURUSD_BID_2022-01-02_23.log.gz', 'EURUSD_BID_2022-01-03_00.log.gz', 'EURUSD_BID_2022-01-03_01.log.gz', 'EURUSD_BID_2022-01-03_02.log.gz', 'EURUSD_BID_2022-01-03_03.log.gz', 'EURUSD_BID_2022-01-03_04.log.gz', 'EURUSD_BID_2022-01-03_05.log.gz', 'EURUSD_BID_2022-01-03_06.log.gz', 'EURUSD_BID_2022-01-03_07.log.gz', 'EURUSD_BID_2022-01-03_08.log.gz', 'EURUSD_BID_2022-01-03_09.log.gz', 'EURUSD_BID_2022-01-03_10.log.gz', 'EURUSD_BID_2022-01-03_11.log.gz', 'EURUSD_BID_2022-01-03_12.log.gz', 'EURUSD_BID_2022-01-03_13.log.gz', 'EURUSD_BID_2022-01-03_14.log.gz', 'EURUSD_BID_2022-01-03_15.log.gz', 'EURUSD_BID_2022-01-03_16.log.gz', 'EURUSD_BID_2022-01-03_17.log.gz', 'EURUSD_BID_2022-01-03_18.log.gz', 'EURUSD_BID_2022-01-03_19.log.gz', 'EURUSD_BID_2022-01-03_20.log.gz', 'EURUSD_BID_2022-01-03_21.log.gz', 'EURUSD_BID_2022-01-03_22.log.gz', 'EURUSD_BID_2022-01-0

In [5]:
### Checking data intergrity
print(_eurusd.shape)

print(_eurusd.tail(10))
print(_eurusd.head(10))

(46047960, 3)
                         ask_price  bid_price   spread
timestamp                                             
2022-12-30 21:54:51.793    1.07024    1.06985  0.00039
2022-12-30 21:54:52.093    1.07024    1.06982  0.00042
2022-12-30 21:54:52.194    1.07028    1.06978  0.00050
2022-12-30 21:54:52.894    1.07023    1.06981  0.00042
2022-12-30 21:54:52.995    1.07026    1.06980  0.00046
2022-12-30 21:54:53.095    1.07035    1.06974  0.00061
2022-12-30 21:54:53.295    1.07021    1.06981  0.00040
2022-12-30 21:54:55.298    1.07039    1.06975  0.00064
2022-12-30 21:54:55.498    1.07039    1.06970  0.00069
2022-12-30 21:54:57.146    1.07042    1.06976  0.00066
                         ask_price  bid_price   spread
timestamp                                             
2022-01-02 22:05:00.458    1.13772    1.13678  0.00094
2022-01-02 22:05:14.681    1.13780    1.13669  0.00111
2022-01-02 22:05:14.712    1.13790    1.13660  0.00130
2022-01-02 22:05:16.123    1.13775    1.13664  0.00

In [14]:
_eurusd.to_csv('_tick_EURUSD_2017' , sep=',', encoding='utf-8', index=True)