In [1]:
import pandas as pd
import os
from pathlib import Path
from typing import Union, List
import glob

def load_all_crypto_data(
    base_path: Union[str, Path],
    symbols: List[str] = None,
    freq: str = "1d",
    format_type: str = "wide"  # New parameter: "wide" or "long"
) -> pd.DataFrame:
    """
    Load and merge data for multiple cryptocurrencies across all available dates
    
    Args:
        base_path (str/Path): Base path for data files
        symbols (List[str]): List of trading pairs (e.g., ["BTCUSDT", "ETHUSDT"])
                          If None, will load all available symbols
        freq (str): Frequency, e.g., "1m" or "1d"
        format_type (str): Output format - "wide" (default) or "long"
    
    Returns:
        pd.DataFrame: DataFrame with all cryptocurrencies data
    """
    # Ensure base_path is a Path object
    base_path = Path(base_path)
    
    # If symbols not provided, discover all available symbols
    if symbols is None:
        # Look for subfolders that might be cryptocurrency names
        symbols = []
        for item in base_path.iterdir():
            if item.is_dir() and item.name.endswith("USDT"):
                # Check if the frequency subfolder exists
                freq_folder = item / freq
                if freq_folder.exists() and freq_folder.is_dir():
                    symbols.append(item.name)
        
        symbols.sort()
        print(f"Found {len(symbols)} available symbols with {freq} frequency: {symbols[:5]}...")
    
    # Dictionary to store DataFrames for each symbol
    symbol_dfs = {}
    
    # Load data for each symbol
    for i, symbol in enumerate(symbols):
        try:
            print(f"Processing {i+1}/{len(symbols)}: {symbol}")
            
            # Get path to the frequency folder for this symbol
            symbol_freq_path = base_path / symbol / freq
            
            if not symbol_freq_path.exists():
                print(f"  No {freq} folder found for {symbol}")
                continue
                
            # Get all csv files in the frequency folder
            csv_files = sorted(list(symbol_freq_path.glob("*.csv")))
            
            if not csv_files:
                print(f"  No CSV files found for {symbol} in {freq} folder")
                continue
                
            # Read and merge all CSV files for this symbol
            dfs = []
            for file in csv_files:
                try:
                    df = pd.read_csv(file)
                    dfs.append(df)
                except Exception as e:
                    print(f"  Error reading file {file.name}: {e}")
            
            if not dfs:
                print(f"  No data successfully loaded for {symbol}")
                continue
                
            # Combine all data for this symbol
            symbol_df = pd.concat(dfs, ignore_index=True)
            
            # Format time columns
            symbol_df['open_time'] = pd.to_datetime(symbol_df['open_time'])
            
            # Remove duplicates
            symbol_df = symbol_df.drop_duplicates(subset=['open_time'], keep='first')
            
            # Sort by time
            symbol_df = symbol_df.sort_values('open_time').reset_index(drop=True)
            
            # Keep only essential columns
            try:
                columns_to_keep = ['open_time', 'open', 'high', 'low', 'close', 'volume', 
                                   'quote_asset_volume', 'number_of_trades', 
                                   'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']
                symbol_df = symbol_df[columns_to_keep]
            except KeyError as e:
                print(f"  Warning: Some columns are missing in {symbol} data - {e}")
                # Fallback to only columns that exist
                available_columns = ['open_time']
                for col in ['open', 'high', 'low', 'close', 'volume', 'quote_asset_volume', 
                           'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']:
                    if col in symbol_df.columns:
                        available_columns.append(col)
                symbol_df = symbol_df[available_columns]
            
            # For long format, we'll add the symbol column here
            if format_type.lower() == "long":
                symbol_df['symbol'] = symbol
                symbol_dfs[symbol] = symbol_df
            else:
                # For wide format, rename columns to include symbol
                rename_dict = {
                    'open': f'{symbol}_open',
                    'high': f'{symbol}_high',
                    'low': f'{symbol}_low',
                    'close': f'{symbol}_close',
                    'volume': f'{symbol}_volume'
                }
                
                # Only rename columns that exist
                rename_dict = {k: v for k, v in rename_dict.items() if k in symbol_df.columns}
                
                # Add additional columns if they exist
                additional_cols = {
                    'quote_asset_volume': f'{symbol}_quote_volume',
                    'number_of_trades': f'{symbol}_num_of_trades',
                    'taker_buy_base_asset_volume': f'{symbol}_taker_buy_base_vol',
                    'taker_buy_quote_asset_volume': f'{symbol}_taker_buy_quote_vol'
                }
                
                for col, new_name in additional_cols.items():
                    if col in symbol_df.columns:
                        rename_dict[col] = new_name
                
                symbol_df = symbol_df.rename(columns=rename_dict)
                symbol_dfs[symbol] = symbol_df
            
            print(f"  Successfully loaded {symbol} with {len(symbol_df)} rows")
        except Exception as e:
            print(f"  Error processing {symbol}: {e}")
    
    if not symbol_dfs:
        print(f"Warning: No data was successfully loaded for any symbol with frequency {freq}.")
        return pd.DataFrame()  # Return empty DataFrame instead of raising error
    
    # Handle long format
    if format_type.lower() == "long":
        print("\nCombining all symbols into long format...")
        
        # Concatenate all DataFrames (already in long format)
        result_df = pd.concat(list(symbol_dfs.values()), ignore_index=True)
        
        # Sort by symbol and time
        result_df = result_df.sort_values(['symbol', 'open_time']).reset_index(drop=True)
        
        print(f"Final long-format dataset has {len(result_df)} rows with data for {len(symbol_dfs)} symbols")
        
        return result_df
    
    # Handle wide format (original behavior)
    print("\nMerging data from all symbols into wide format...")
    # Start with the first DataFrame (preferably choose one with the most complete data)
    # Finding the symbol with the most data points
    symbol_counts = {symbol: len(df) for symbol, df in symbol_dfs.items()}
    first_symbol = max(symbol_counts, key=symbol_counts.get)
    print(f"Using {first_symbol} as the base for merging (has {symbol_counts[first_symbol]} data points)")
    
    result_df = symbol_dfs[first_symbol].copy()
    
    # Merge with other DataFrames on open_time
    for symbol, df in symbol_dfs.items():
        if symbol == first_symbol:
            continue
        
        print(f"  Merging {symbol} ({len(df)} rows)")
        result_df = pd.merge(result_df, df, on='open_time', how='outer')
    
    # Sort by time
    result_df = result_df.sort_values('open_time').reset_index(drop=True)
    
    # Set open_time as index for easier time-series analysis
    result_df.set_index('open_time', inplace=True)
    
    print(f"Final wide-format dataset has {len(result_df)} rows with {len(result_df.columns)} columns")
    
    return result_df

def convert_wide_to_long(df, id_vars=None):
    """
    Convert a wide format DataFrame to long format
    
    Args:
        df: Wide format DataFrame
        id_vars: Columns to use as identifiers (if None, uses index)
    
    Returns:
        pd.DataFrame: Long format DataFrame
    """
    if df.empty:
        return df
    
    # If no id_vars specified, use the index
    if id_vars is None:
        # Reset index to make it a column
        df = df.reset_index()
        id_vars = ['open_time']
    
    # Create a list of column tuples (symbol, metric)
    value_vars = []
    for col in df.columns:
        if col not in id_vars and '_' in col:
            value_vars.append(col)
    
    # Melt the DataFrame
    long_df = pd.melt(df, id_vars=id_vars, value_vars=value_vars, 
                      var_name='temp', value_name='value')
    
    # Split the temp column into symbol and metric
    long_df[['symbol', 'metric']] = long_df['temp'].str.split('_', n=1, expand=True)
    
    # Drop the temp column
    long_df = long_df.drop('temp', axis=1)
    
    # Pivot to get metrics as columns
    long_df = long_df.pivot_table(index=['open_time', 'symbol'], 
                                 columns='metric', 
                                 values='value').reset_index()
    
    # Flatten the column names
    long_df.columns.name = None
    
    return long_df

def check_data_quality(df, format_type="wide"):
    """
    Check quality of the cryptocurrency DataFrame
    
    Args:
        df: DataFrame to check
        format_type: "wide" or "long"
    
    Returns:
        dict: Dictionary with various data quality metrics
    """
    if df.empty:
        return {
            'total_rows': 0,
            'date_range': (None, None),
            'symbols_count': 0,
            'symbols': [],
            'completeness': {}
        }
    
    quality_report = {
        'total_rows': len(df)
    }
    
    if format_type.lower() == "long":
        # For long format
        quality_report['date_range'] = (df['open_time'].min(), df['open_time'].max())
        symbols = df['symbol'].unique().tolist()
        quality_report['symbols_count'] = len(symbols)
        quality_report['symbols'] = symbols
        
        # Check completeness by symbol
        quality_report['completeness'] = {}
        for symbol in symbols:
            symbol_data = df[df['symbol'] == symbol]
            metrics = ['open', 'high', 'low', 'close', 'volume']
            completeness = {}
            
            for metric in metrics:
                if metric in df.columns:
                    pct_non_null = (100 - (symbol_data[metric].isnull().sum() / len(symbol_data) * 100))
                    completeness[metric] = f"{pct_non_null:.2f}%"
            
            quality_report['completeness'][symbol] = completeness
    else:
        # For wide format
        quality_report['date_range'] = (df.index.min(), df.index.max())
        
        # Get all symbols from column names
        symbols = list(set([col.split('_')[0] for col in df.columns if '_' in col]))
        quality_report['symbols_count'] = len(symbols)
        quality_report['symbols'] = symbols
        
        # Check completeness by symbol
        quality_report['completeness'] = {}
        for symbol in symbols:
            symbol_cols = [col for col in df.columns if col.startswith(f'{symbol}_')]
            completeness = 100 - (df[symbol_cols].isnull().any(axis=1).sum() / len(df) * 100)
            quality_report['completeness'][symbol] = f"{completeness:.2f}%"
    
    return quality_report

def save_crypto_data(df, output_path, format="parquet"):
    """
    Save cryptocurrency DataFrame to disk
    
    Args:
        df: DataFrame to save
        output_path: Path to save the file
        format: Format to save ('parquet' or 'csv')
    """
    if df.empty:
        print("Warning: DataFrame is empty, not saving to disk.")
        return
    
    output_path = Path(output_path)
    os.makedirs(output_path.parent, exist_ok=True)
    
    if format.lower() == "parquet":
        df.to_parquet(output_path)
        print(f"Saved data to {output_path}")
    elif format.lower() == "csv":
        df.to_csv(output_path)
        print(f"Saved data to {output_path}")
    else:
        raise ValueError(f"Unsupported format: {format}")



load parquet

In [2]:
def load_parquet_data(file_path):
    """
    Load cryptocurrency data from a parquet file
    
    Args:
        file_path: Path to the parquet file
        
    Returns:
        pd.DataFrame: DataFrame with cryptocurrency data
    """
    try:
        df = pd.read_parquet(file_path)
        print(f"Successfully loaded data from {file_path}")
        print(f"Shape: {df.shape} (rows, columns)")
        
        if 'symbol' in df.columns:
            # Long format
            print(f"Format: Long format")
            print(f"Date range: {df['open_time'].min()} to {df['open_time'].max()}")
            symbols = df['symbol'].unique()
            print(f"Contains data for {len(symbols)} cryptocurrencies")
        else:
            # Wide format
            print(f"Format: Wide format")
            print(f"Date range: {df.index.min()} to {df.index.max()}")
            symbols = list(set([col.split('_')[0] for col in df.columns if '_' in col]))
            print(f"Contains data for {len(symbols)} cryptocurrencies")
        
        return df
    except Exception as e:
        print(f"Error loading parquet file: {e}")
        return None

def explore_directory_structure(base_path, max_depth=3):
    """
    Explore the directory structure to understand how files are organized
    
    Args:
        base_path: Base path to explore
        max_depth: Maximum depth to explore
    """
    base_path = Path(base_path)
    
    def _explore(path, depth, prefix=""):
        if depth > max_depth:
            return
        
        items = list(path.iterdir())
        items.sort()
        
        for i, item in enumerate(items):
            is_last = i == len(items) - 1
            connector = "└── " if is_last else "├── "
            
            print(f"{prefix}{connector}{item.name}")
            
            if item.is_dir() and depth < max_depth:
                next_prefix = prefix + ("    " if is_last else "│   ")
                _explore(item, depth + 1, next_prefix)
    
    print(f"Directory structure of {base_path}:")
    _explore(base_path, 1)

# Example usage
if __name__ == "__main__":
    base_path = "/Users/mouyasushi/Desktop/quantDevops/Research/Alpha-Research/kline/binance"
    
    # Explore directory structure to understand how files are organized
    explore_directory_structure(base_path)
    
    # Set frequency
    freq = "1d"  # Options might be "1d", "1m", etc.
    
    # Load all available symbols with the specified frequency in LONG format
    print("\n=== Loading data in LONG format ===")
    all_crypto_df_long = load_all_crypto_data(
        base_path=base_path,
        symbols=None,  # Load all available symbols
        freq=freq,
        format_type="long"  # Specify long format
    )
    
    # Check data quality
    if not all_crypto_df_long.empty:
        quality_report = check_data_quality(all_crypto_df_long, format_type="long")
        print("\nData Quality Report:")
        print(f"Total rows: {quality_report['total_rows']}")
        print(f"Date range: {quality_report['date_range'][0]} to {quality_report['date_range'][1]}")
        print(f"Total symbols: {quality_report['symbols_count']}")
        
        # Save the long format data
        output_file_long = Path(base_path) / f"all_crypto_data_{freq}_long.parquet"
        save_crypto_data(all_crypto_df_long, output_file_long, format="parquet")
        
        # Example: Show some sample data
        print("\nSample of long-format data:")
        print(all_crypto_df_long.head())
    else:
        print(f"No data was loaded. Check file path and frequency.")
    
    

Directory structure of /Users/mouyasushi/Desktop/quantDevops/Research/Alpha-Research/kline/binance:
├── AAVEUSDT
│   └── 1d
│       ├── AAVEUSDT_2021-01-01_1d.csv
│       ├── AAVEUSDT_2021-01-02_1d.csv
│       ├── AAVEUSDT_2021-01-03_1d.csv
│       ├── AAVEUSDT_2021-01-04_1d.csv
│       ├── AAVEUSDT_2021-01-05_1d.csv
│       ├── AAVEUSDT_2021-01-06_1d.csv
│       ├── AAVEUSDT_2021-01-07_1d.csv
│       ├── AAVEUSDT_2021-01-08_1d.csv
│       ├── AAVEUSDT_2021-01-09_1d.csv
│       ├── AAVEUSDT_2021-01-10_1d.csv
│       ├── AAVEUSDT_2021-01-11_1d.csv
│       ├── AAVEUSDT_2021-01-12_1d.csv
│       ├── AAVEUSDT_2021-01-13_1d.csv
│       ├── AAVEUSDT_2021-01-14_1d.csv
│       ├── AAVEUSDT_2021-01-15_1d.csv
│       ├── AAVEUSDT_2021-01-16_1d.csv
│       ├── AAVEUSDT_2021-01-17_1d.csv
│       ├── AAVEUSDT_2021-01-18_1d.csv
│       ├── AAVEUSDT_2021-01-19_1d.csv
│       ├── AAVEUSDT_2021-01-20_1d.csv
│       ├── AAVEUSDT_2021-01-21_1d.csv
│       ├── AAVEUSDT_2021-01-22_1d.csv
│       ├── AAVEUS

In [3]:
path = '/Users/mouyasushi/Desktop/quantDevops/Research/Alpha-Research/kline/binance/all_crypto_data_1d_long.parquet'

load_parquet_data(path)

Successfully loaded data from /Users/mouyasushi/Desktop/quantDevops/Research/Alpha-Research/kline/binance/all_crypto_data_1d_long.parquet
Shape: (31878, 11) (rows, columns)
Format: Long format
Date range: 2021-01-01 00:00:00 to 2025-02-26 00:00:00
Contains data for 21 cryptocurrencies


Unnamed: 0,open_time,open,high,low,close,volume,quote_asset_volume,number_of_trades,taker_buy_base_asset_volume,taker_buy_quote_asset_volume,symbol
0,2021-01-01,88.581,91.000,84.652,90.909,368132.6,3.236081e+07,95220,160420.1,1.411901e+07,AAVEUSDT
1,2021-01-02,90.940,92.200,82.573,85.888,350675.1,3.019238e+07,96482,141557.9,1.219980e+07,AAVEUSDT
2,2021-01-03,85.911,95.680,81.338,94.185,490412.4,4.367915e+07,120659,219913.4,1.964500e+07,AAVEUSDT
3,2021-01-04,94.183,115.590,85.932,114.322,1108493.1,1.136470e+08,301713,514642.8,5.291774e+07,AAVEUSDT
4,2021-01-05,114.344,124.279,98.653,119.800,843809.1,9.551636e+07,271640,385402.1,4.365867e+07,AAVEUSDT
...,...,...,...,...,...,...,...,...,...,...,...
31873,2025-02-22,0.870,0.899,0.867,0.893,9587461.5,8.503384e+06,46368,4670777.7,4.143709e+06,XTZUSDT
31874,2025-02-23,0.893,0.895,0.858,0.865,5712478.5,5.005713e+06,39536,2578248.6,2.258186e+06,XTZUSDT
31875,2025-02-24,0.865,0.866,0.761,0.773,19900237.0,1.618839e+07,91384,9703884.8,7.900364e+06,XTZUSDT
31876,2025-02-25,0.772,0.791,0.725,0.777,28215329.7,2.147710e+07,112628,14213387.9,1.081920e+07,XTZUSDT
