In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly 

import datetime 
import os
import re

In [None]:
class DataManager:
    def __init__(self, data_dir: str):
        self.data_dir = data_dir
        self.list_of_files = self.get_list_of_files()

    def split_filename(self, name):
        base = name.replace('.csv', '')
        if '_' in base:            return base.split('_')
        elif '-' in base:          return base.split('-')
        else:
            match = re.match(r'([a-z0-9]+?)(usd|usdt|btc|btcf0|gbp|eth|ust|ustf0|eutf0|testusdt|testusdtf0|jpy|eur|xaut|eut|cnht|mxnt|try|mim|xch)$', base)
            if match:
                return [match.group(1), match.group(2)]
            else:
                return [base, None]
            
    def filter_by_currency(self, currency=None):
        if currency is None:
            return self.list_of_files
        if not currency:
            return []
        if not isinstance(currency, str):
            raise ValueError("Currency must be a string")
        else:
            return [f for f in self.list_of_files if f[0][1] == currency.lower()]
        
    def get_list_of_files(self):
        return [(self.split_filename(f),f) for f in os.listdir(self.data_dir) if f.endswith('.csv')]
    
    def load_selected(self, selected_files, number_of_files=None, nrows = None, time_indexed=True):
        if number_of_files is not None:
            selected_files = selected_files[:number_of_files]
        if not selected_files:
            raise ValueError("No files selected for loading.")
        data = {}
        for file in selected_files:
            file_path = os.path.join(self.data_dir, file[1])
            if not os.path.exists(file_path):
                raise FileNotFoundError(f"File {file_path} does not exist.")
            if nrows is not None:
                df = pd.read_csv(file_path, nrows=nrows)
            else:
                df = pd.read_csv(file_path)
            df['time'] = pd.to_datetime(df['time'], unit='ms')
            if time_indexed:
                df.set_index('time', inplace=True)
            # df.loc[:, 'time'] = df['time'].apply(lambda x: datetime.datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S')).astype('datetime64[ns]')
            df = self.validate_and_clean(df)
            if df.empty:
                print(f"Warning: DataFrame for {file[0][0]}/{file[0][1]} is empty after loading.")
                continue

            data[file[0][0]+"/"+file[0][1]] = df
        return data
    
    def validate_and_clean(self, df, freq='1min', max_gap_minutes=10):
        print("Duplicate index check:", df.index.duplicated().any())
        df = df[~df.index.duplicated(keep='first')]
    
        print("Convert to numeric and reindexing to full range.")
        for col in ['open', 'high', 'low', 'close', 'volume']:
            df[col] = pd.to_numeric(df[col], errors='coerce')
        full_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq=freq)
        df = df.reindex(full_index)

        print("Forward filling missing values.")
        missing = df.isnull().any(axis=1)
        gap_lengths = missing.astype(int).groupby((~missing).cumsum()).sum()
        long_gaps = gap_lengths[gap_lengths > max_gap_minutes]
        if not long_gaps.empty:
            print(f"Found long gaps longer than {max_gap_minutes} minutes: {long_gaps.index.tolist()}")
        else:
            print("No long gaps found.")
        long_gap_indices = gap_lengths[gap_lengths > max_gap_minutes].index
        
        print("Interpolating missing values.")
        df.interpolate(method='linear', limit=max_gap_minutes, inplace=True)
        df.ffill(inplace=True)

        df['long_gap_flag'] = False
        for idx in long_gap_indices:
            df.loc[(~missing).cumsum() == idx, 'long_gap_flag'] = True

        print("Flagging outliers.")
        for col in ['open', 'high', 'low', 'close']:
            df[f'{col}_outlier'] = (df[col] <= 0) | (df[col] > df[col].shift(1) * 5)

        return df

    def synchronize_dataframes(self, df1, df2, how='inner'):
        common_index = df1.index.union(df2.index) if how == 'outer' else df1.index.intersection(df2.index)
        df1 = df1.reindex(common_index).ffill()
        df2 = df2.reindex(common_index).ffill()
        return df1, df2

    def resample_data(self, df, freq):
        resampled = df.resample(freq).agg({
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last',
            'volume': 'sum'
        })
        return resampled.dropna()

    def split_data(self, df, train_frac, val_frac):
        assert train_frac + val_frac < 1.0, "Train + Val fractions must be < 1.0"
        n = len(df)
        train_end = int(n * train_frac)
        val_end = train_end + int(n * val_frac)

        train = df.iloc[:train_end]
        val = df.iloc[train_end:val_end]
        test = df.iloc[val_end:]

        return train, val, test

    


In [201]:
data_dir = '/Users/macsnaxx/Personal Projects/Crypto_Arbitrage/crypto_data_2020-2024'

datapack = DataManager(data_dir)
gab = datapack.filter_by_currency('usd')
gab = datapack.load_selected(gab, 5, 1000)


Duplicate index check: False
Convert to numeric and reindexing to full range.
Forward filling missing values.
Found long gaps longer than 10 minutes: [0]
Interpolating missing values.
Flagging outliers.
Duplicate index check: False
Convert to numeric and reindexing to full range.
Forward filling missing values.
Found long gaps longer than 10 minutes: [526, 919, 954, 993]
Interpolating missing values.
Flagging outliers.
Duplicate index check: False
Convert to numeric and reindexing to full range.
Forward filling missing values.
Found long gaps longer than 10 minutes: [2, 3, 6, 7, 10, 16, 18, 19, 20, 25, 26, 31, 32, 34, 35, 36, 37, 42, 43, 44, 45, 47, 49, 51, 52, 53, 54, 56, 58, 59, 67, 69, 70, 75, 76, 77, 86, 92, 93, 102, 103, 105, 107, 109, 112, 113, 114, 118, 119, 120, 121, 123, 124, 126, 127, 128, 130, 134, 135, 138, 140, 141, 142, 145, 149, 151, 154, 157, 166, 169, 173, 174, 177, 178, 179, 182, 189, 191, 196, 200, 202, 203, 204, 209, 211, 212, 214, 217, 219, 224, 237, 238, 241, 243,

In [3]:
import os
import re
import pandas as pd
import numpy as np

class FileReader:
    def __init__(self, data_dir: str):
        self.data_dir = data_dir
        self.list_of_files = self.get_list_of_files()
        self.number_of_files = len(self.list_of_files)
        
    def get_list_of_files(self):
        return [(self.split_filename(f), f) for f in os.listdir(self.data_dir) if f.endswith('.csv')]


    def split_filename(self, name):
        base = name.replace('.csv', '')
        if '_' in base:
            return base.split('_')
        elif '-' in base:
            return base.split('-')
        else:
            match = re.match(r'([a-z0-9]+?)(usd|usdt|btc|btcf0|gbp|eth|ust|ustf0|eutf0|testusdt|testusdtf0|jpy|eur|xaut|eut|cnht|mxnt|try|mim|xch)$', base)
            return [match.group(1), match.group(2)] if match else [base, None]


    def filter_by_currency(self, currency=None):
        if currency is None:
            return self.list_of_files
        if not isinstance(currency, str):
            raise ValueError("Currency must be a string")
        return [f for f in self.list_of_files if f[0][1] == currency.lower()]

    def load_latest_data(self, selected_files, number_of_files=None, nrows=None, min_rows=105040):
        if number_of_files is not None:
            selected_files = selected_files[:number_of_files]
        if not selected_files:
            raise ValueError("No files selected for loading.")

        data = {}
        for file in selected_files:
            path = os.path.join(self.data_dir, file[1])
            if not os.path.exists(path):
                continue

            df = pd.read_csv(path)
            if len(df) < min_rows:
                print(f"Skipping {file[1]}: not enough data.")
                continue
            df['time'] = pd.to_datetime(df['time'], unit='ms')
            df.set_index('time', inplace=True)
            if nrows is not None:
                df = df.tail(nrows)
            df.sort_index(inplace=True)
            data[file[0][0] + "/" + file[0][1]] = df

        return data
    
    
    

In [4]:
data_dir = '/Users/macsnaxx/Personal Projects/Crypto_Arbitrage/crypto_data_2020-2024'

files = FileReader(data_dir)
selc = files.filter_by_currency('usd')
raw_dataset = files.load_latest_data(selc)

Skipping tonusd.csv: not enough data.
Skipping galausd.csv: not enough data.
Skipping velo-usd.csv: not enough data.
Skipping waxusd.csv: not enough data.
Skipping planetsusd.csv: not enough data.
Skipping boousd.csv: not enough data.
Skipping astusd.csv: not enough data.
Skipping briseusd.csv: not enough data.
Skipping exousd.csv: not enough data.
Skipping suku-usd.csv: not enough data.
Skipping band-usd.csv: not enough data.
Skipping chzusd.csv: not enough data.
Skipping boson-usd.csv: not enough data.
Skipping arbusd.csv: not enough data.
Skipping testalgotestusd.csv: not enough data.
Skipping xautusd.csv: not enough data.
Skipping genusd.csv: not enough data.
Skipping reefusd.csv: not enough data.
Skipping luxousd.csv: not enough data.
Skipping pngusd.csv: not enough data.
Skipping gmtusd.csv: not enough data.
Skipping laiusd.csv: not enough data.
Skipping sidus_usd.csv: not enough data.
Skipping dora_usd.csv: not enough data.
Skipping swmusd.csv: not enough data.
Skipping polisusd

In [None]:
class DataPreprocessor:
    def __init__(self, data):
        self.data = data
        # self.close_data = self.create_close_data(data)

    def get_dataframe_in_date_range(self, start, end, freq, pairs=None, margin = 50):
        return_data = {}
        expected = pd.date_range(start, end, freq=freq)
        for pair, df in self.data.items():
            if pairs is not None and pair not in pairs:
                continue
            actual = df.reindex(expected)[start:end].resample(freq).last()
            fre  = round((actual.isna().sum().max()/len(actual))*100, 2)
            if fre > margin:
                continue
            print("Getting pair - ", pair)
            print("Expected - ", len(actual), " | Actual - ", len(actual.dropna()))
            print('Missing in range: ',fre, "%")
            return_data[pair] = actual.dropna(how = 'all')
            print("---")
        return return_data


    def validate_and_clean(self, df, max_gap_minutes=10):
        print("Duplicate index check:", df.index.duplicated().any())
        df = df[~df.index.duplicated(keep='first')]
    
        print("Convert to numeric and reindexing to full range.")
        for col in ['open', 'high', 'low', 'close', 'volume']:
            df[col] = pd.to_numeric(df[col], errors='coerce')
        full_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq=freq)
        df = df.reindex(full_index)

        print("Forward filling missing values.")
        missing = df.isnull().any(axis=1)
        gap_lengths = missing.astype(int).groupby((~missing).cumsum()).sum()
        long_gaps = gap_lengths[gap_lengths > max_gap_minutes]
        if not long_gaps.empty:
            print(f"Found long gaps longer than {max_gap_minutes} minutes: {long_gaps.index.tolist()}")
        else:
            print("No long gaps found.")
        long_gap_indices = gap_lengths[gap_lengths > max_gap_minutes].index
        
        print("Interpolating missing values.")
        df.interpolate(method='linear', limit=max_gap_minutes, inplace=True)
        df.ffill(inplace=True)

        df['long_gap_flag'] = False
        for idx in long_gap_indices:
            df.loc[(~missing).cumsum() == idx, 'long_gap_flag'] = True

        print("Flagging outliers.")
        for col in ['open', 'high', 'low', 'close']:
            df[f'{col}_outlier'] = (df[col] <= 0) | (df[col] > df[col].shift(1) * 5)

        return df

    def synchronize_dataframes(self, df1, df2, how='inner'):
        common_index = df1.index.union(df2.index) if how == 'outer' else df1.index.intersection(df2.index)
        df1 = df1.reindex(common_index).ffill()
        df2 = df2.reindex(common_index).ffill()
        return df1, df2

    def resample_data(self, df, freq):
        resampled = df.resample(freq).agg({
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last',
            'volume': 'sum'
        })
        return resampled.dropna()

    def split_data(self, df, train_frac, val_frac):
        assert train_frac + val_frac < 1.0, "Train + Val fractions must be < 1.0"
        n = len(df)
        train_end = int(n * train_frac)
        val_end = train_end + int(n * val_frac)

        train = df.iloc[:train_end]
        val = df.iloc[train_end:val_end]
        test = df.iloc[val_end:]

        return train, val, test

    


    def prepare_for_model():
        pass 

    def create_close_data(self, data):
        close_data = {}
        for pair, df in data.items():
            if 'close' in df.columns:
                close_data[pair] = df['close']
            else:
                print(f"Warning: 'close' column not found in {pair}. Skipping.")
        return pd.DataFrame(close_data)


In [7]:
data = DataPreprocessor(raw_dataset)

In [9]:
st = pd.Timestamp('2021-01-01')
ed = pd.Timestamp('2022-01-01')
fred = data.get_dataframe_in_date_range(st, ed, '1h')

Getting pair -  xrp/usd
Expected -  8761  | Actual -  8655
Missing in range:  1.21 %
---
Getting pair -  eos/usd
Expected -  8761  | Actual -  7266
Missing in range:  17.06 %
---
Getting pair -  zec/usd
Expected -  8761  | Actual -  5739
Missing in range:  34.49 %
---
Getting pair -  trx/usd
Expected -  8761  | Actual -  6909
Missing in range:  21.14 %
---
Getting pair -  ust/usd
Expected -  8761  | Actual -  8019
Missing in range:  8.47 %
---
Getting pair -  eth/usd
Expected -  8761  | Actual -  8725
Missing in range:  0.41 %
---
Getting pair -  neo/usd
Expected -  8761  | Actual -  5771
Missing in range:  34.13 %
---
Getting pair -  ltc/usd
Expected -  8761  | Actual -  8393
Missing in range:  4.2 %
---
Getting pair -  xmr/usd
Expected -  8761  | Actual -  4929
Missing in range:  43.74 %
---
Getting pair -  omg/usd
Expected -  8761  | Actual -  5739
Missing in range:  34.49 %
---
Getting pair -  bsv/usd
Expected -  8761  | Actual -  4627
Missing in range:  47.19 %
---
Getting pair - 

In [161]:
all_data = data.create_close_data(fred)
# all_data = (all_data - all_data.mean())/all_data.std()

In [176]:
from statsmodels.tsa.stattools import coint
import pandas as pd
from itertools import combinations

def test_cointegration_and_rank(base_data, significance_level=0.05):
    results = []

    for pair1, pair2 in combinations(base_data.columns,2):
        try:
            
            x = base_data[pair1]
            y =  base_data[pair2]

            if len(x.dropna()) < 30 or len(y.dropna()) < 30:
                continue

            score, pvalue, _ = coint(x, y)
        

            results.append({
                'pair_1': pair1.rstrip("/usd"),
                'pair_2': pair2.rstrip("/usd"),
                'score': score,
                'p_value': pvalue,
                'cointegrated': pvalue < significance_level
            })

        except Exception as e:
            print(f"Error testing {pair1} vs {pair2}: {e}")
            continue

    results_df = pd.DataFrame(results).sort_values(by='p_value').reset_index(drop=True)
    return results_df

st = pd.Timestamp('2021-05-10 09:00:00')
ed = pd.Timestamp('2021-05-11 09:00:00')

test_cointegration_and_rank(all_data[st:ed].fillna(0))

Unnamed: 0,pair_1,pair_2,score,p_value,cointegrated
0,neo,ltc,-18.085699,1.754755e-29,True
1,neo,btc,-18.031197,1.825176e-29,True
2,neo,dsh,-17.899075,2.030105e-29,True
3,neo,omg,-17.882449,2.059746e-29,True
4,etc,xtz,-17.472872,3.181394e-29,True
...,...,...,...,...,...
100,ust,dsh,-0.794067,9.354790e-01,False
101,ust,xtz,-0.753544,9.405412e-01,False
102,ust,omg,-0.753438,9.405538e-01,False
103,ust,neo,-0.682181,9.485271e-01,False


In [4]:
class DataPreprocessor:
    def __init__(self):
        pass

    def validate_and_clean(self, df, freq='1min', max_gap=100):
        print("Validating and cleaning data...")
        print("Duplicate index check:", df.index.duplicated().any())
        df = df[~df.index.duplicated(keep='first')]
        df = df[['open', 'high', 'low', 'close', 'volume']].copy()

        print("Convert to numeric and reindexing to full range.")
        for col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
        full_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq=freq)
        df = df.reindex(full_index)

        # print("Forward filling missing values.")
        # missing = df.isnull().any(axis=1)
        # print("Missing values detected:", missing.sum())
        # gap_lengths = missing.astype(int).groupby((~missing).cumsum()).sum()
        # print("Maximum gap length:", max(gap_lengths), "minutes")
        # print("Allowable gaps:", max_gap, "minutes")
        # print("Gaps longer than max_gap:", gap_lengths[gap_lengths > max_gap].index)
        # if any(gap_lengths > max_gap):
        #     return None  

        # df.interpolate(method='linear', limit=max_gap, inplace=True)
        # df.ffill(inplace=True)

        print("Flagging outliers.")
        for col in ['open', 'high', 'low', 'close', 'volume']:
            df[f'{col}_outlier'] = (df[col] <= 0)
            df[f'{col}_outlier'] |= ((df[col] > df[col].shift(1) * 5) | (df[col] < df[col].shift(1) / 5))
            returns = df[col].pct_change()
            z_scores = (returns - returns.rolling(24).mean()) / returns.rolling(24).std()
            df[f'{col}_outlier'] |= (np.abs(z_scores) > 5)

        return df

    def resample_data(self, df, freq='5min'):
        return df.resample(freq).agg({
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last',
            'volume': 'sum'
        }).dropna()

    def split_data(self, df, train_frac=0.6, val_frac=0.2):
        assert train_frac + val_frac < 1.0
        n = len(df)
        train_end = int(n * train_frac)
        val_end = train_end + int(n * val_frac)
        return df.iloc[:train_end], df.iloc[train_end:val_end], df.iloc[val_end:]
    
    def check_missing_ohlcv(self,
        df: pd.DataFrame,
        start_time: pd.Timestamp,
        end_time: pd.Timestamp,
        freq: str,
        pair_name: str,
    ):

        df = df.copy()
        if not isinstance(df.index, pd.DatetimeIndex):
            if 'timestamp' in df.columns:
                df.index = pd.to_datetime(df['timestamp'])
            else:
                raise ValueError("DataFrame must have a DatetimeIndex or a 'timestamp' column")

        # 2. Define what timestamps we *should* have
        expected_index = pd.date_range(start=start_time, end=end_time, freq=freq)
        expected_count = len(expected_index)

        # 3. Restrict to the requested window
        windowed = df.loc[start_time:end_time]

        # 4. Reindex to “expose” any completely missing rows
        reindexed = windowed.reindex(expected_index)

        # 5. Count row‐level gaps (all‐NaN rows == missing timestamp)
        missing_mask = reindexed.isnull().all(axis=1)
        missing_timestamps = missing_mask.sum()
        missing_list = reindexed[missing_mask].index.to_list()

        # 6. Count column‐wise nulls (for partial gaps)
        missing_by_column = reindexed.isnull().sum().to_dict()

        # 7. Pack up
        result = {
            'pair': pair_name,
            'expected_timestamps': expected_count,
            'actual_timestamps': expected_count - missing_timestamps,
            'missing_timestamps': missing_timestamps,
            'missing_timestamps_list': missing_list,
            'missing_by_column': missing_by_column,
        }

        # 8. (Optional) print a quick summary
        print(f"[{pair_name}] Expected: {expected_count:,} rows, "
            f"Found: {result['actual_timestamps']:,}, "
            f"Missing rows: {missing_timestamps:,}")
        for col, n in missing_by_column.items():
            if n:
                print(f"  – {col!r}: {n:,} nulls")

        return result
    


    def prepare_data_for_modeling(self, currency, number_of_files=5, nrows='7d', resample_freq='5min', start_time=None, end_time=None, freq='1min'):
        if isinstance(nrows, str):
            nrows = self.duration_to_rows(nrows, freq=resample_freq)
            print(f"Converted nrows='{nrows}' based on duration for frequency='{resample_freq}'")

        files = self.filter_by_currency(currency)
        raw_data = self.load_latest_data(files, number_of_files, nrows)
        for pair_name, df in raw_data.items():
            self.check_missing_data_in_range(
                df=df,
                start_time=start_time,
                end_time=end_time,
                freq=freq,
                pair_name=pair_name
            )
        
        prepared = {}
        for symbol, df in raw_data.items():
            print(f"Processing {symbol} with {len(df)} rows.")
            cleaned = self.validate_and_clean(df, freq='1min')  # always validate at 1min base
            if cleaned is None:
                print(f"Skipping {symbol}: too many missing values.")
                continue
            resampled = self.resample_data(cleaned, freq=resample_freq)
            if len(resampled) < 100:
                print(f"Skipping {symbol}: not enough data after resampling.")
                continue
            train, val, test = self.split_data(resampled)
            prepared[symbol] = {
                'train': train,
                'val': val,
                'test': test
            }

        return prepared

    def duration_to_rows(self, duration_str: str, freq: str) -> int:
        """
        Converts a time duration like '1d', '2w', '3mo' into a number of rows based on frequency.
        freq: a pandas-compatible frequency string like '1min', '5min', '1H', '1D'
        """
        freq_offset = pd.tseries.frequencies.to_offset(freq)
        freq_timedelta = freq_offset.delta or pd.to_timedelta(freq_offset)

        duration_str = duration_str.strip().lower()
        duration_map = {
            'd': 'D',
            'w': 'W',
            'mo': 'M',
            'y': 'Y'
        }

        match = re.match(r'(\d+)([a-z]+)', duration_str)
        if not match:
            raise ValueError(f"Invalid duration format: {duration_str}")

        num, unit = int(match.group(1)), match.group(2)
        if unit not in duration_map:
            raise ValueError(f"Unsupported duration unit: {unit}")

        now = pd.Timestamp.utcnow()
        if unit == 'mo':
            start = now - pd.DateOffset(months=num)
        elif unit == 'y':
            start = now - pd.DateOffset(years=num)
        elif unit == 'w':
            start = now - pd.Timedelta(weeks=num)
        else:  # 'd'
            start = now - pd.Timedelta(days=num)

        delta = now - start
        row_count = int(delta.total_seconds() // freq_timedelta.total_seconds())
        return row_count

        

        # return consistent_pairs


In [9]:
data_dir = '/Users/macsnaxx/Personal Projects/Crypto_Arbitrage/crypto_data_2020-2024'
manager = FileReader(data_dir)
data = manager.load_latest_data(selected_files=manager.filter_by_currency('usd'))

Skipping tonusd.csv: not enough data.
Skipping galausd.csv: not enough data.
Skipping velo-usd.csv: not enough data.
Skipping waxusd.csv: not enough data.
Skipping planetsusd.csv: not enough data.
Skipping boousd.csv: not enough data.
Skipping astusd.csv: not enough data.
Skipping briseusd.csv: not enough data.
Skipping exousd.csv: not enough data.
Skipping suku-usd.csv: not enough data.
Skipping band-usd.csv: not enough data.
Skipping chzusd.csv: not enough data.
Skipping boson-usd.csv: not enough data.
Skipping arbusd.csv: not enough data.
Skipping testalgotestusd.csv: not enough data.
Skipping xautusd.csv: not enough data.
Skipping genusd.csv: not enough data.
Skipping reefusd.csv: not enough data.
Skipping luxousd.csv: not enough data.
Skipping pngusd.csv: not enough data.
Skipping gmtusd.csv: not enough data.
Skipping laiusd.csv: not enough data.
Skipping sidus_usd.csv: not enough data.
Skipping dora_usd.csv: not enough data.
Skipping swmusd.csv: not enough data.
Skipping polisusd

# updated pythonic script

In [None]:
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, Iterable
import re
import pandas as pd
import logging

@dataclass(frozen=True)
class FileMeta:
    pair: str
    currency: Optional[str]
    path: Path

    @property
    def key(self) -> str:
        return f"{self.pair}/{self.currency or 'base'}"

class FilenameParser:
    _pattern = re.compile(r"^(?P<pair>[a-z0-9]+?)(?P<quote>usd|usdt|btc|...)?$", re.I)

    def parse(self, path: Path) -> Optional[FileMeta]:
        m = self._pattern.fullmatch(path.stem)
        if not m:
            return None
        return FileMeta(pair=m.group('pair').lower(),
                        currency=(m.group('quote') or '').lower() or None,
                        path=path)

class FileScanner:
    def __init__(self, directory: Path):
        self.directory = directory

    def scan(self) -> list[FileMeta]:
        files = []
        for path in self.directory.iterdir():
            if path.suffix.lower() == ".csv" and path.is_file():
                meta = FilenameParser().parse(path)
                if meta:
                    files.append(meta)
        return files

class CsvLoader:
    def __init__(self, parse_dates: list[str] = ['time'], date_unit='ms', index_col='time'):
        self.parse_dates = parse_dates
        self.date_unit = date_unit
        self.index_col = index_col

    def load(self, metas: Iterable[FileMeta], nrows: Optional[int] = None) -> dict[str, pd.DataFrame]:
        out = {}
        for meta in metas:
            df = pd.read_csv(meta.path,
                             nrows=nrows,
                             parse_dates=self.parse_dates,
                             date_unit=self.date_unit,
                             index_col=self.index_col)
            out[meta.key] = df
        return out

class DataManager:
    def __init__(self, data_dir: str):
        self.data_dir = Path(data_dir)
        self._index: list[FileMeta] = None
        self.loader = CsvLoader()

    @property
    def index(self) -> list[FileMeta]:
        if self._index is None:
            self._index = FileScanner(self.data_dir).scan()
        return self._index

    def filter(self, currency: Optional[str] = None) -> list[FileMeta]:
        if currency:
            c = currency.lower()
            return [f for f in self.index if f.currency == c]
        return list(self.index)

    def load(self, currency: Optional[str] = None,
             number_of_files: Optional[int] = None,
             nrows: Optional[int] = None) -> dict[str, pd.DataFrame]:
        selected = self.filter(currency)
        if number_of_files:
            selected = selected[:number_of_files]
        if not selected:
            raise ValueError(f"No files selected for currency={currency}")
        return self.loader.load(selected, nrows=nrows)
