In [7]:
import pandas as pd
import numpy as np
import os

def add_colnames(input_data, column_names=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'trades']):
    """
    Add column names to a DataFrame without a header, from a CSV file or an existing DataFrame.
    input_data: String (CSV file path) or pandas DataFrame.
    """
    # Check if input_data is a string (CSV path)
    if isinstance(input_data, str):
        df = pd.read_csv(input_data, header=None)
    # Check if input_data is a DataFrame
    elif isinstance(input_data, pd.DataFrame):
        df = input_data.copy()  # Avoid modifying the original
    # Validate number of columns
    num_cols = df.shape[1]
    num_expected = len(column_names)
    
    if num_cols != num_expected:
        raise ValueError(f"DataFrame has {num_cols} columns, but {num_expected} column names were provided: {column_names}")
    df.columns = column_names
    return df


def add_index(df, start_time, timestep=3600):
    """
    Adds an 'index' column (consecutive integers, with 0 at start_time)
    """
    df['index'] = ((df['timestamp'] - start_time) // timestep).astype(int)
    return df

def add_index_truncate(df, start_time, end_time, timestep):
    """
    Adds an 'index' column (in range(T)) 
    Truncates the df to only keep the range [start_time-timestep, end_time] # keep -1 for return computation
    Return the df with a list of missing indices
    """
    df = df.copy()
    # Truncate to only keep rows within [start_time-timestep, end_time] (need -1 entry to compute return)
    df = df[(df['timestamp'] >= start_time-timestep) & (df['timestamp'] <= end_time)].reset_index(drop=True)
    df['index'] = ((df['timestamp'] - start_time) // timestep).astype(int)
    T = int((end_time - start_time) // timestep) + 1
    missing_indices = sorted(list(set(range(T)) - set(df[1:]['index'])))
    return df, missing_indices


def fill_truncate(input_path: str, folder: str = None, start_time: int = None, end_time: int = None, timestep: int = 3600):
    """
    Fill missing rows with NaN, using timestamp as index with timestep as gap.
    Only truncate if both start_time and end_time are provided.
    Use unix timestamps as input.

    Args:
        input_path: Path to input file (CSV or parquet)
        folder: Optional output folder for parquet file
        timestep: Time step in seconds between consecutive timestamps (default: 3600)
    """
    # Read input file based on extension
    if input_path.endswith('.csv'):
        df = pd.read_csv(input_path)
    elif input_path.endswith('.parquet'):
        df = pd.read_parquet(input_path)
    else:
        raise ValueError("Input file must be CSV or parquet format")
    
    # Only truncate if both start_time and end_time are provided
    if start_time is not None and end_time is not None:
        df = df[(df['timestamp'] >= start_time) & (df['timestamp'] <= end_time)]
    
    # Set timestamp as index
    df = df.set_index('timestamp')
    # The 'timestamp' column is moved from a regular column to become the index. 
    # It is no longer in df.columns. Instead, it's now df.index, and it will not appear in the df columns unless you reset it back with .reset_index().
    
    # Create full timestamp range
    full_timestamps = pd.RangeIndex(start=df.index.min(), stop=df.index.max() + timestep, step=timestep)
    df_filled = df.reindex(full_timestamps)
    df_filled.index.name = 'timestamp'  # Set index name
    df_filled = df_filled.reset_index()  # Moves it to a column called 'timestamp'  
    
    if folder is not None:
        os.makedirs(folder, exist_ok=True)
        # Save as parquet file
        output_path = input_path.replace('.csv', '_filled.parquet') if folder is None else \
                    f"{folder}/{input_path.split('/')[-1].replace('.csv', '.parquet').replace('.parquet', '.parquet')}"
        df_filled.to_parquet(output_path, index=False)
    return df_filled



def add_log_return(df):
    """
    Input: df with 'index' and 'close' columns.
    Add a 'log_return' column to the df
    Use open price if previous close is missing
    """
    df = df.copy()
    df['log_return'] = np.nan

    df.loc[0,'log_return'] = np.log(df.loc[0,'close'] / df.loc[0,'open'])    

    for i in range(1, len(df)):
        prev_close = df.loc[i-1, 'close']
        curr_close = df.loc[i, 'close']
        if df.loc[i,'index'] == df.loc[i-1,'index'] + 1 and not np.isnan(prev_close) and not np.isnan(curr_close):
            df.loc[i,'log_return'] = np.log(curr_close / prev_close)
        elif not np.isnan(df.loc[i,'open']): # use open price to compute log return for missing values
            df.loc[i,'log_return'] = np.log(curr_close / df.loc[i,'open'])
    return df


def add_return(df):
    """
    Input: df with 'index' and 'close' columns.
    Add a 'return' column to the df r_t = (P_t-P_{t-1})/P_{t-1}
    Use open price if previous close is missing
    """
    df['return'] = np.nan
    df.loc[0,'return'] = df.loc[0,'close'] / df.loc[0,'open'] - 1

    for i in range(1, len(df)):
        prev_close = df.loc[i-1, 'close']
        curr_close = df.loc[i, 'close']
        if df.loc[i,'index'] == df.loc[i-1,'index'] + 1 and not np.isnan(prev_close) and not np.isnan(curr_close):
            df.loc[i,'return'] = curr_close / prev_close - 1
        elif not np.isnan(df.loc[i,'open']): # use open price to compute log return for missing values
            df.loc[i,'return'] = df.loc[i,'close'] / df.loc[i,'open'] - 1
    return df


def add_return_logreturn(df):
    """
    Input: df with 'index', 'close', and 'open' columns.
    Adds 'log_return' and 'return' columns to the df.
    Log return: log(P_t / P_{t-1}), Simple return: (P_t - P_{t-1}) / P_{t-1}
    Uses open price if previous close is missing.
    """
    df = df.copy()
    df['log_return'] = np.nan
    df['return'] = np.nan

    # First row: use open price
    df.loc[0, 'log_return'] = np.log(df.loc[0, 'close'] / df.loc[0, 'open'])
    df.loc[0, 'return'] = df.loc[0, 'close'] / df.loc[0, 'open'] - 1

    for i in range(1, len(df)):
        prev_close = df.loc[i-1, 'close']
        curr_close = df.loc[i, 'close']
        if df.loc[i, 'index'] == df.loc[i-1, 'index'] + 1 and not np.isnan(prev_close) and not np.isnan(curr_close):
            df.loc[i, 'log_return'] = np.log(curr_close / prev_close)
            df.loc[i, 'return'] = curr_close / prev_close - 1
        elif not np.isnan(df.loc[i, 'open']):
            df.loc[i, 'log_return'] = np.log(curr_close / df.loc[i, 'open'])
            df.loc[i, 'return'] = curr_close / df.loc[i, 'open'] - 1

    return df


def add_return_logreturn_volume(df):
    """
    Input: df with 'index', 'close', 'open', and 'volume' columns.
    Adds:
      - 'log_return': log(P_t / P_{t-1})
      - 'return': (P_t - P_{t-1}) / P_{t-1}
      - 'volume_change': (V_t - V_{t-1}) / V_{t-1}
    Uses open price if previous close is missing.
    """
    df = df.copy()
    df['log_return'] = np.nan
    df['return'] = np.nan
    df['volume_change'] = np.nan

    # First row: use open price for return, volume_change remains NaN
    df.loc[0, 'log_return'] = np.log(df.loc[0, 'close'] / df.loc[0, 'open'])
    df.loc[0, 'return'] = df.loc[0, 'close'] / df.loc[0, 'open'] - 1

    for i in range(1, len(df)):
        prev_close = df.loc[i-1, 'close']
        curr_close = df.loc[i, 'close']
        prev_vol = df.loc[i-1, 'volume']
        curr_vol = df.loc[i, 'volume']

        # Compute return
        if df.loc[i, 'index'] == df.loc[i-1, 'index'] + 1 and not np.isnan(prev_close) and not np.isnan(curr_close):
            df.loc[i, 'log_return'] = np.log(curr_close / prev_close)
            df.loc[i, 'return'] = curr_close / prev_close - 1
        elif not np.isnan(df.loc[i, 'open']):
            df.loc[i, 'log_return'] = np.log(curr_close / df.loc[i, 'open'])
            df.loc[i, 'return'] = curr_close / df.loc[i, 'open'] - 1

        # Compute volume change
        if not np.isnan(prev_vol) and not np.isnan(curr_vol) and prev_vol != 0:
            df.loc[i, 'volume_change'] = curr_vol / prev_vol - 1

    return df


In [8]:
# Function for reference only (the same as handle_double_missing in infer_missing_data)
def handle_double_missing(df, row_idx): # current and previous row are missing
    df.loc[row_idx - 1, 'open'] = df.loc[row_idx - 2, 'close']
    df.loc[row_idx, 'close'] = df.loc[row_idx + 1, 'open']
    # Set O_t = C_t-1 = 1/2(O_t-1 + O_t+1) = 1/2(C_t-2 + C_t)
    avg = (df.loc[row_idx - 1, 'open'] + df.loc[row_idx + 1, 'open']) / 2
    df.loc[row_idx - 1, 'close'] = avg
    df.loc[row_idx, 'open'] = avg
    
    # Update return and log_return if they exist
    if 'return' in df.columns and 'log_return' in df.columns:
        # For first missing row
        df.loc[row_idx - 1, 'return'] = df.loc[row_idx - 1, 'close'] / df.loc[row_idx - 1, 'open'] - 1
        df.loc[row_idx - 1, 'log_return'] = np.log(df.loc[row_idx - 1, 'close'] / df.loc[row_idx - 1, 'open'])
        # For second missing row
        df.loc[row_idx, 'return'] = df.loc[row_idx, 'close'] / df.loc[row_idx, 'open'] - 1
        df.loc[row_idx, 'log_return'] = np.log(df.loc[row_idx, 'close'] / df.loc[row_idx, 'open'])


def infer_missing_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    If 1 or 2 rows have NaN open and close values, infer them based on surrounding data. Then compute return and log_return if these columns exist.
    Uses lookahead to ensure next rows exist before applying inference rules.
    """
    # Function to handle single missing row
    def handle_single_missing(row_idx):
        open_prev = df.loc[row_idx - 1, 'close']
        close_next = df.loc[row_idx + 1, 'open']
        
        df.loc[row_idx, 'open'] = open_prev
        df.loc[row_idx, 'close'] = close_next
        if 'return' in df.columns and 'log_return' in df.columns:
            df.loc[row_idx, 'return'] = close_next / open_prev - 1
            df.loc[row_idx, 'log_return'] = np.log(close_next / open_prev)


    # Function to handle two consecutive missing rows    
    def handle_double_missing(row_idx):  # if current and previous row are missing
        open_m2 = df.loc[row_idx - 2, 'close']
        open_p1 = df.loc[row_idx + 1, 'open']
        close_m2 = df.loc[row_idx - 2, 'close']
        # Fill open and close for row_idx - 1 and row_idx
        df.loc[row_idx - 1, 'open'] = close_m2
        df.loc[row_idx, 'close'] = open_p1

        avg = (close_m2 + open_p1) / 2
        df.loc[row_idx - 1, 'close'] = avg
        df.loc[row_idx, 'open'] = avg

        if 'return' in df.columns and 'log_return' in df.columns:
            # First missing row (row_idx - 1): use close[t-1]/open[t-1]
            df.loc[row_idx - 1, 'return'] = avg / close_m2 - 1 
            df.loc[row_idx - 1, 'log_return'] = np.log(avg / close_m2)
            # Second missing row (row_idx)
            df.loc[row_idx, 'return'] = open_p1 / avg - 1
            df.loc[row_idx, 'log_return'] = np.log(open_p1 / avg)

    # Initialize last existing row index
    last_existing_idx = None
    
    for i in range(1, len(df) - 1):
        # Update last existing row index if current row is not missing
        if not pd.isna(df.loc[i, 'close']):
            last_existing_idx = i
            continue
        # Skip if we don't have a previous existing row or if we have already processed this row
        if last_existing_idx is None or last_existing_idx >= i:
            continue    
        # Look ahead to check next rows
        if last_existing_idx == i-1 and i+1 < len(df) and not pd.isna(df.loc[i + 1, 'close']):
            # Next row exists, can handle single missing
            handle_single_missing(i)  
        elif last_existing_idx == i-1 and i+2 < len(df) and not pd.isna(df.loc[i + 2, 'close']):
            # Next row is missing but row after exists, can handle double missing
            handle_double_missing(i+1)
            last_existing_idx = i + 2
       
    return df

In [9]:
file = 'USD_60/BNTUSD_60.parquet'
#1640995200 1743379200
#2022-01-01 00:00:00 2025-03-31 00:00:00
bnt_filled = fill_truncate(file, start_time=1640995200, end_time=1743379200)
bnt_filled.head(20)

# timestamp	open	high	low	close	volume	trades	log_return	return	volume_change	close_to_high	close_to_low	log_price_range	amihud

Unnamed: 0,timestamp,open,high,low,close,volume,trades,log_return,return,volume_change,close_to_high,close_to_low,log_price_range,amihud
0,1640995200,3.262,3.262,3.262,3.262,1.537337,1.0,0.0,0.0,-0.998834,1.0,1.0,0.0,0.0
1,1640998800,,,,,,,,,,,,,
2,1641002400,,,,,,,,,,,,,
3,1641006000,,,,,,,,,,,,,
4,1641009600,,,,,,,,,,,,,
5,1641013200,,,,,,,,,,,,,
6,1641016800,3.266,3.266,3.266,3.266,4.451,1.0,0.0,0.0,1.895266,1.0,1.0,0.0,0.0
7,1641020400,3.246,3.246,3.246,3.246,4.451,1.0,-0.006143,-0.006124,0.0,1.0,1.0,0.0,0.00138
8,1641024000,3.264,3.264,3.264,3.264,12.556,1.0,0.00553,0.005545,1.820939,1.0,1.0,0.0,0.00044
9,1641027600,3.264,3.264,3.264,3.264,22.093,1.0,0.0,0.0,0.759557,1.0,1.0,0.0,0.0


In [7]:
bnt = infer_missing_data(bnt_filled)
bnt.head(20)

Unnamed: 0,timestamp,open,high,low,close,volume,trades,log_return,return,volume_change,close_to_high,close_to_low,log_price_range,amihud
0,1640995200,3.262,3.262,3.262,3.262,1.537337,1.0,0.0,0.0,-0.998834,1.0,1.0,0.0,0.0
1,1640998800,,,,,,,,,,,,,
2,1641002400,,,,,,,,,,,,,
3,1641006000,,,,,,,,,,,,,
4,1641009600,,,,,,,,,,,,,
5,1641013200,,,,,,,,,,,,,
6,1641016800,3.266,3.266,3.266,3.266,4.451,1.0,0.0,0.0,1.895266,1.0,1.0,0.0,0.0
7,1641020400,3.246,3.246,3.246,3.246,4.451,1.0,-0.006143,-0.006124,0.0,1.0,1.0,0.0,0.00138
8,1641024000,3.264,3.264,3.264,3.264,12.556,1.0,0.00553,0.005545,1.820939,1.0,1.0,0.0,0.00044
9,1641027600,3.264,3.264,3.264,3.264,22.093,1.0,0.0,0.0,0.759557,1.0,1.0,0.0,0.0


In [10]:
import numpy as np
import pandas as pd


# Can be computed with values at time t, no need to fill or check for missing values:

def close_to_high_ratio(close: pd.Series, high: pd.Series) -> pd.Series:
    return close / high

def close_to_low_ratio(close: pd.Series, low: pd.Series) -> pd.Series:
    return close / low

def log_price_range(high: pd.Series, low: pd.Series) -> pd.Series:
    return np.log(high / low)

def amihud_illiquidity(log_return: pd.Series, volume: pd.Series, window: int = 24) -> pd.Series:
    # Handle division by zero by setting those values to NaN
    illiquidity = np.where(volume != 0, np.abs(log_return) / volume, np.nan)
    # Compute rolling mean, requiring at least 1 non-NaN value
    return pd.Series(illiquidity).rolling(window=window, min_periods=1).mean()

# Need previous values, cannot be computed solely with values at time t:
def compute_ma_zscore(close: pd.Series, window: int) -> tuple[pd.Series, pd.Series]:
    ma = close.rolling(window=window, min_periods=1).mean()
    zscore = (close - ma) / close.rolling(window=window, min_periods=1).std()
    return ma, zscore

# Rem: EMA should to be used as a difference or ratio (normalized, compare fast/slow or compare against price), see below
def compute_ema(close: pd.Series, window: int) -> pd.Series:
    return close.ewm(span=window, min_periods=1).mean()


def ema_crossover(ema_short: pd.Series, ema_long: pd.Series) -> pd.Series:
    """
    Compute whether EMA crossover happened at t.
    Returns:
        pd.Series: Series with values:
                   1 for bullish crossover,
                  -1 for bearish crossover,
                   0 for no crossover.
    """
    prev_short = ema_short.shift(1)
    prev_long = ema_long.shift(1)

    bullish_idx = (prev_short < prev_long) & (ema_short >= ema_long)
    bearish_idx = (prev_short > prev_long) & (ema_short <= ema_long)

    signal = pd.Series(0, index=ema_short.index)
    signal[bullish_idx] = 1
    signal[bearish_idx] = -1
    return signal

def price_ema_diff(price: pd.Series, window: int) -> pd.Series:
    """
    Computes (Price - EMA) / Price
    e.g. Take EMA12
    """
    ema = price.ewm(span=window, adjust=False).mean()
    return (price - ema) / price

def ema_diff_normalized(price: pd.Series, fast: int, slow: int, normalize_by: str = 'price') -> pd.Series:
    """
    Computes (EMA_fast - EMA_slow) / normalizer
    normalize_by: 'price' or 'ema_slow'
    e.g. take EMA 12/48 and 24/120
    """
    ema_fast = price.ewm(span=fast, adjust=False).mean()
    ema_slow = price.ewm(span=slow, adjust=False).mean()
    
    if normalize_by == 'price':
        return (ema_fast - ema_slow) / price
    elif normalize_by == 'ema_slow':
        return (ema_fast - ema_slow) / ema_slow
    else:
        raise ValueError("normalize_by must be 'price' or 'ema_slow'")

def compute_macd(price: pd.Series, fast: int = 12, slow: int = 26) -> pd.Series:
    """
    Computes MACD = EMA_fast - EMA_slow
    """
    ema_fast = price.ewm(span=fast, adjust=False).mean()
    ema_slow = price.ewm(span=slow, adjust=False).mean()
    return ema_fast - ema_slow


def macd_hist(price: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.Series:
    """
    Computes MACD Histogram = MACD - Signal line
    """
    macd_line = compute_macd(price, fast, slow)
    signal_line = macd_line.ewm(span=signal, adjust=False).mean()
    return macd_line - signal_line


def compute_rolling_std(price: pd.Series, window: int = 24) -> pd.Series:
    return price.rolling(window=window, min_periods=1).std()

# Volume-Weighted Average Price
def compute_vwap(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, window: int) -> pd.Series:
    typical_price = (high + low + close) / 3
    volume_sum = volume.rolling(window=window, min_periods=1).sum()
    return np.where(volume_sum != 0, 
                (typical_price * volume).rolling(window=window, min_periods=1).sum() / volume_sum,
                np.nan)

def volume_zscore(volume: pd.Series, window: int) -> pd.Series:
    # Compute standardized volume over a period of window
    mean = volume.rolling(window=window, min_periods=1).mean()
    std = volume.rolling(window=window, min_periods=1).std()
    return np.where(std != 0, (volume - mean) / std, 0)


def volume_weighted_volatility(returns: pd.Series, volume: pd.Series, window: int = 24) -> tuple[pd.Series, pd.Series]:
    squared_returns_weighted = (returns ** 2) * volume
    sum_sqreturns_weighted = squared_returns_weighted.rolling(window=window, min_periods=1).sum()
    sum_volume = volume.rolling(window=window, min_periods=1).sum()
    weighted_vol = np.sqrt(sum_sqreturns_weighted / sum_volume)
    
    sum_sqreturns = (returns ** 2).rolling(window=window, min_periods=1).sum()
    realized_vol = np.sqrt(sum_sqreturns)

    return realized_vol, weighted_vol


In [11]:
def stochastic_oscillator(high: pd.Series, low: pd.Series, close: pd.Series, window: int = 24) -> pd.Series:
    """
    Compute the Stochastic Oscillator %K = (C - L) / (H - L) * 100. 
    This gives a normalized value (from 0 to 100) indicating where the current close price sits relative to the recent price range.
    """
    lowest_low = low.rolling(window=window, min_periods=1).min()
    highest_high = high.rolling(window=window, min_periods=1).max()
    denom = highest_high - lowest_low
    
    percent_k = np.where(denom == 0, 50, ((close - lowest_low) / denom) * 100) 
    percent_k[close.isna()] = pd.NA
    return percent_k


def momentum(close: pd.Series, a: int, b: int) -> pd.Series:
    """ 
    Compute momentum r_a,b = P_{t-b}/P_{t-a} - 1, which is the return of period [t-a, t-b].
    Args:
        close: Series of closing prices
        t-a: start time, t-b: end time (e.g. r_2,1  r_12,2  r_12,7  r_36,6)
    """
    return close.shift(b) / close.shift(a) - 1



def rsi(close: pd.Series, window: int = 14) -> pd.Series:
    # Step 1: Forward fill NaNs in close prices
    close_filled = close.ffill().bfill()
    # Step 2: Calculate price changes on filled series
    delta = close_filled.diff()
    # Separate gains and losses
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    # Average gains and losses
    avg_gain = gain.rolling(window=window, min_periods=1).mean()
    avg_loss = loss.rolling(window=window, min_periods=1).mean()
    # RS and RSI calculations
    rs = pd.Series(index=close.index, dtype=float)
    mask = (avg_gain == 0) & (avg_loss == 0)
    rs[~mask] = avg_gain[~mask] / avg_loss[~mask]
    rs[mask] = 100  # handle zero division case
    rsi = 100 - (100 / (1 + rs))
    # Step 3: Restore NaNs where original close was NaN
    rsi[close.isna()] = pd.NA
    
    return rsi



In [10]:

def compute_all_indicators(df: pd.DataFrame, window: int = 24) -> pd.DataFrame:
    """
    Compute all technical indicators efficiently and add them to the DataFrame.
    
    Parameters:
        df (pd.DataFrame): DataFrame with columns: timestamp, open, high, low, close, volume, trades, log_return, return
        window (int): Window size for rolling calculations. Default is 24.
        
    Returns:
        pd.DataFrame: Original DataFrame with added technical indicators
    """
    # Compute all indicators as Series first
    ma, zscore = compute_ma_zscore(df['close'], window)
    ema_short = compute_ema(df['close'], window)
    ema_long = compute_ema(df['close'], window * 2)
    ema_cross = ema_crossover(ema_short, ema_long)
    rolling_std = compute_rolling_std(df['return'], window)
    vwap = compute_vwap(df['high'], df['low'], df['close'], df['volume'], window)
    vol_zscore = volume_zscore(df['volume'], window)
    realized_vol, weighted_vol = volume_weighted_volatility(df['return'], df['volume'], window)
    stoch_osc = stochastic_oscillator(df['high'], df['low'], df['close'], window)
    rsi_series = rsi(df['close'], window)
    
    # Compute momentum indicators
    momentum_series = {}
    for a, b in [(2,1), (12,2), (12,7), (36,6)]:
        momentum_series[f'momentum_{a}_{b}'] = momentum(df['close'], a, b)
    
    # Create dictionary of all new columns
    new_columns = {
        f'ma_{window}': ma,
        f'zscore_{window}': zscore,
        f'ema_short_{window}': ema_short,
        f'ema_long_{window*2}': ema_long,
        f'ema_crossover_{window}': ema_cross,
        f'rolling_std_{window}': rolling_std,
        f'vwap_{window}': vwap,
        f'volume_zscore_{window}': vol_zscore,
        f'realized_vol_{window}': realized_vol,
        f'weighted_vol_{window}': weighted_vol,
        f'stoch_osc_{window}': stoch_osc,
        f'rsi_{window}': rsi_series,
        **momentum_series
    }
    
    # Add all columns at once
    return pd.concat([df, pd.DataFrame(new_columns, index=df.index)], axis=1)

In [11]:
bnt2 = compute_all_indicators(bnt)

In [13]:

pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.width', None)        # Auto-detect display width
bnt2.head(20)


Unnamed: 0,timestamp,open,high,low,close,volume,trades,log_return,return,volume_change,close_to_high,close_to_low,log_price_range,amihud,ma_24,zscore_24,ema_short_24,ema_long_48,ema_crossover_24,rolling_std_24,vwap_24,volume_zscore_24,realized_vol_24,weighted_vol_24,stoch_osc_24,rsi_24,momentum_2_1,momentum_12_2,momentum_12_7,momentum_36_6
0,1640995200,3.262,3.262,3.262,3.262,1.537337,1.0,0.0,0.0,-0.998834,1.0,1.0,0.0,0.0,3.262,,3.262,3.262,0,,3.262,,0.0,0.0,50.0,99.009901,,,,
1,1640998800,,,,,,,,,,,,,,3.262,,3.262,3.262,0,,3.262,,0.0,0.0,50.0,,,,,
2,1641002400,,,,,,,,,,,,,,3.262,,3.262,3.262,0,,3.262,,0.0,0.0,50.0,,,,,
3,1641006000,,,,,,,,,,,,,,3.262,,3.262,3.262,0,,3.262,,0.0,0.0,50.0,,,,,
4,1641009600,,,,,,,,,,,,,,3.262,,3.262,3.262,0,,3.262,,0.0,0.0,50.0,,,,,
5,1641013200,,,,,,,,,,,,,,3.262,,3.262,3.262,0,,3.262,,0.0,0.0,50.0,,,,,
6,1641016800,3.266,3.266,3.266,3.266,4.451,1.0,0.0,0.0,1.895266,1.0,1.0,0.0,0.0,3.264,0.707107,3.26449,3.264249,0,0.0,3.264973,0.707107,0.0,0.0,100.0,100.0,,,,
7,1641020400,3.246,3.246,3.246,3.246,4.451,1.0,-0.006143,-0.006124,0.0,1.0,1.0,0.0,0.00138,3.258,-1.133893,3.257028,3.257505,-1,0.003536,3.256884,0.57735,0.006124,0.003999,0.0,16.666667,,,,
8,1641024000,3.264,3.264,3.264,3.264,12.556,1.0,0.00553,0.005545,1.820939,1.0,1.0,0.0,0.00044,3.2595,0.491967,3.259154,3.259312,0,0.004767,3.260769,1.435683,0.008261,0.004904,90.0,52.380952,-0.006124,,,
9,1641027600,3.264,3.264,3.264,3.264,22.093,1.0,0.0,0.0,0.759557,1.0,1.0,0.0,0.0,3.2604,0.440468,3.26036,3.260365,0,0.004129,3.262352,1.559606,0.008261,0.003502,90.0,52.380952,0.005545,,,


In [17]:
# Read the txt file and create a list of paths
with open('USD_60_2022_01_01-2025_03_31_filenames.txt', 'r') as f:
    lines = f.read().splitlines()

# Transform the paths
filepaths = [line.replace('Kraken_OHLCVT', 'USD_60').replace('.csv', '.parquet') for line in lines]

# Save the new file paths to a text file
with open('USD_60_filenames.txt', 'w') as f:
    for path in filepaths:
        f.write(f"{path}\n")

#### Process Market Cap Data

In [62]:
# Load marketcap data
marketcap_df = pd.read_csv('marketcap_data.csv')
# Display header and column names
# print("\nOriginal Column Names:")
# print(marketcap_df.columns.tolist())
# UNIT,TIMESTAMP,TYPE,OPEN,HIGH,LOW,CLOSE,TOP_TIER_VOLUME

# Convert column names to lowercase
marketcap_df.columns = marketcap_df.columns.str.lower()
marketcap_df = marketcap_df.drop(['unit', 'type'], axis=1)
# Filter out rows where top_tier_volume is 0.0
mcap_cleaned = marketcap_df[marketcap_df['top_tier_volume'] != 0.0].copy()
mcap_cleaned.to_csv('mcap_cleaned.csv', index=False)

mcap = add_index(mcap_cleaned, start_unix)
mcap = mcap.reset_index(drop=True) # original indexing doesn't start at 0
mcap.head()
# add return logreturn and save to csv
mcap = add_return_logreturn(mcap)
mcap.to_csv('mcap_processed.csv', index=False)


Original Column Names:
['UNIT', 'TIMESTAMP', 'TYPE', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'TOP_TIER_VOLUME']


Unnamed: 0,timestamp,open,high,low,close,top_tier_volume,index
0,1639306800,2460623000000.0,2487127000000.0,2454857000000.0,2473300000000.0,1453037000.0,-469
1,1639310400,2473826000000.0,2495821000000.0,2468145000000.0,2483743000000.0,1837831000.0,-468
2,1639314000,2483543000000.0,2502559000000.0,2480391000000.0,2494231000000.0,1783415000.0,-467
3,1639317600,2494327000000.0,2503654000000.0,2481405000000.0,2497112000000.0,1521798000.0,-466
4,1639321200,2496984000000.0,2500671000000.0,2478254000000.0,2484339000000.0,1386847000.0,-465


#### Compute beta

In [12]:
import pandas as pd
import numpy as np

# much faster than add_market_beta1 because it uses pd.rolling 
def add_market_beta(asset_df, market_df, window=24, min_periods=1, timestep=3600):
    """
    Compute market beta for each timestamp in asset_df using returns from asset and market
    over a rolling window (of size `window` * timestep). 
    Adds 'beta' column to asset_df.
    
    Parameters:
        asset_df (DataFrame): Must include 'timestamp' and 'return'. Missing rows filled with NaN.
        market_df (DataFrame): Must include 'timestamp' and 'return'. Missing rows filled with NaN.
        window (int): Number of steps in rolling window.
        min_periods: Minimum required non-NaN observations within window to compute beta.
        timestep (int): The time increment between timestamps (in seconds).
    """
    # Ensure timestamps are sorted
    asset_df = asset_df.sort_values('timestamp').reset_index(drop=True)
    market_df = market_df.sort_values('timestamp').reset_index(drop=True)

    # Merge dataframes on timestamp
    merged_df = pd.merge(
        asset_df[['timestamp', 'return']],
        market_df[['timestamp', 'return']],
        on='timestamp',
        how='left',  # Keep all asset_df timestamps
        suffixes=('_asset', '_market')
    )

    # Set timestamp as index for rolling operations
    merged_df.set_index('timestamp', inplace=True)

    # Define rolling window size in terms of rows (assuming timestamps are evenly spaced)
    # If timestamps are irregular, you might need to use a time-based rolling window
    
    # Compute rolling covariance and variance
    rolling_cov = merged_df['return_asset'].rolling(window=window, min_periods=min_periods).cov(merged_df['return_market'])
    rolling_var = merged_df['return_market'].rolling(window=window, min_periods=min_periods).var(ddof=1)

    # Compute beta: beta = cov(asset, market) / var(market)
    beta = rolling_cov / rolling_var.where(rolling_var != 0, np.nan)  # Avoid division by zero

    # Create beta dataframe
    beta_df = pd.DataFrame({'timestamp': merged_df.index, 'beta': beta}).reset_index(drop=True)

    # Merge beta back into asset_df
    asset_df = pd.merge(asset_df, beta_df, on='timestamp', how='left')

    return asset_df

In [21]:
import pandas as pd
import numpy as np

def compute_beta(asset_returns, market_returns, window=24, min_periods=1):
    """
    Compute market beta using rolling window calculations on aligned return series.
    More efficient than add_market_beta as it assumes uniform timesteps within each series.
    Ensure returns are pd.Series that have timestamp as indices (set_index('timestamp'))
    """
    # Align the series to asset_returns' index
    # This will create NaNs for any timestamps in asset_returns that don't exist in market_returns
    market_returns = market_returns.reindex(asset_returns.index)
    # print(f"Number of overlapping indices: {len(asset_returns.index.intersection(market_returns.index))}")
    # Compute rolling covariance and variance directly on the aligned series
    rolling_cov = asset_returns.rolling(window=window, min_periods=min_periods).cov(market_returns)
    rolling_var = market_returns.rolling(window=window, min_periods=min_periods).var(ddof=1)
    
    # Compute beta, handling division by zero
    beta = rolling_cov / rolling_var.where(rolling_var != 0, np.nan)
    
    return beta

In [43]:
df_btc = fill_truncate('USD_60/XBTUSD_60.parquet')
df_eth = fill_truncate('USD_60/ETHUSD_60.parquet')
#print(df_btc[1200:1220])
#print(df_eth[100:120])
df_market = pd.read_csv('mcap_processed.csv').set_index('timestamp')
bnt_filled.set_index('timestamp', inplace=True)
compute_beta(bnt_filled['log_return'], df_market['log_return'])


Number of overlapping indices: 28438


timestamp
1640995200         NaN
1640998800         NaN
1641002400         NaN
1641006000         NaN
1641009600         NaN
                ...   
1743354000    0.619950
1743357600    0.542741
1743361200    0.549446
1743364800    0.581137
1743368400    0.566861
Name: log_return, Length: 28438, dtype: float64

In [40]:
start = bnt_filled['timestamp'].iloc[0]
end = bnt_filled['timestamp'].iloc[-1]
print(start, end)

def count_rows(df, start_time, end_time):
    mask = (df['timestamp'] >= start_time) & (df['timestamp'] <= end_time)
    return df.loc[mask, 'close'].count() # count number of non-NaN entries

count_rows(df_btc, start, end)

np.int64(28422)

In [41]:
count_rows(bnt_filled, start, end)

np.int64(13585)

In [70]:
def compute_all_betas(df_asset, window=720, min_periods=240):
    """
    Compute market beta, BTC beta, and ETH beta for an asset.
    
    Parameters:
        df_asset (pd.DataFrame): Asset dataframe with timestamp index and 'log_return' column
        window (int): Number of periods in rolling window
        min_periods (int): Minimum required non-NaN observations within window
        
    Returns:
        df_asset with beta_market, beta_btc, and beta_eth columns
    """
    # Load market, BTC, and ETH data
    df_market = pd.read_csv('mcap_processed.csv')
    df_btc = fill_truncate('USD_60/XBTUSD_60.parquet')
    df_eth = fill_truncate('USD_60/ETHUSD_60.parquet')
    # Set timestamp as index for all dataframes
    df_market.set_index('timestamp', inplace=True)
    df_btc.set_index('timestamp', inplace=True)
    df_eth.set_index('timestamp', inplace=True)
    # Compute betas
    market_beta = compute_beta(df_asset['log_return'], df_market['log_return'], window, min_periods)
    btc_beta = compute_beta(df_asset['log_return'], df_btc['log_return'], window, min_periods)
    eth_beta = compute_beta(df_asset['log_return'], df_eth['log_return'], window, min_periods)

    # Print statistics for all betas
    print("\nBeta Statistics:")
    print(f"Market Beta - Total: {len(market_beta)}, Non-NaN: {pd.Series(market_beta).count()}")
    print(f"BTC Beta - Total: {len(btc_beta)}, Non-NaN: {pd.Series(btc_beta).count()}")
    print(f"ETH Beta - Total: {len(eth_beta)}, Non-NaN: {pd.Series(eth_beta).count()}")
    
    # Print statistics for each dataframe
    print("\nDataFrame Statistics:")
    print(f"Asset DataFrame - Total: {len(df_asset)}, Non-NaN close: {df_asset['close'].count()}")
    print(f"Market DataFrame - Total: {len(df_market)}, Non-NaN close: {df_market['close'].count()}")
    print(f"BTC DataFrame - Total: {len(df_btc)}, Non-NaN close: {df_btc['close'].count()}")
    print(f"ETH DataFrame - Total: {len(df_eth)}, Non-NaN close: {df_eth['close'].count()}")

    # Add all beta columns at once to the original dataframe
    df_asset[['beta_market', 'beta_btc', 'beta_eth']] = pd.DataFrame({
        'beta_market': market_beta,
        'beta_btc': btc_beta,
        'beta_eth': eth_beta
    })
    
    return df_asset


In [71]:
B = compute_all_betas(bnt_filled)
beta_btc = B['beta_market']
print(f"Total entries: {len(beta_btc)}")
print(f"Non-NaN entries: {beta_btc.count()}")
# Count non-NaN entries for returns
returns_count = B['return'].count()
print(f"Non-NaN returns: {returns_count}")


Number of overlapping indices: 28438
Number of overlapping indices: 28438
Number of overlapping indices: 28438

Beta Statistics:
Market Beta - Total: 28438, Non-NaN: 21720
BTC Beta - Total: 28438, Non-NaN: 21772
ETH Beta - Total: 28438, Non-NaN: 21772

DataFrame Statistics:
Asset DataFrame - Total: 28438, Non-NaN close: 13585
Market DataFrame - Total: 29981, Non-NaN close: 29981
BTC DataFrame - Total: 100659, Non-NaN close: 89787
ETH DataFrame - Total: 84586, Non-NaN close: 81096
Total entries: 28438
Non-NaN entries: 21720
Non-NaN returns: 13585


In [72]:
# Count non-NaN entries for each beta where both return and beta are not NaN
market_beta_count = B[['return', 'beta_market']].dropna().shape[0]
btc_beta_count = B[['return', 'beta_btc']].dropna().shape[0]
eth_beta_count = B[['return', 'beta_eth']].dropna().shape[0]
print(f"Non-NaN returns: {B['return'].count()}")
print(f"Non-NaN returns and market beta: {market_beta_count}")
print(f"Non-NaN returns and BTC beta: {btc_beta_count}")
print(f"Non-NaN returns and ETH beta: {eth_beta_count}")

Non-NaN returns: 13585
Non-NaN returns and market beta: 11559
Non-NaN returns and BTC beta: 11582
Non-NaN returns and ETH beta: 11582


#### Load files and check

In [74]:
bnt = pd.read_parquet('USD_60_indicators/BNTUSD_60.parquet')
bnt.head()

Unnamed: 0,timestamp,open,high,low,close,volume,trades,log_return,return,volume_change,close_to_high,close_to_low,log_price_range,amihud,ma_zscore_24,ema_diff_12_48,ema_diff_24_120,ema_diff_norm_12_48,ema_diff_norm_24_120,price_ema_diff_12,macd_hist,momentum_24_4,momentum_168_24,st_rev_24_1,st_rev_168_1,lt_rev_24_1,lt_rev_168_1,realized_vol_24,weighted_vol_24,volume_zscore_24,vwap_24,stoch_osc_24,rsi_24,amihud_24
0,1621868400,4.44,4.44,4.385,4.385,20.309077,5.0,-0.012465,-0.012387,,0.987613,1.0,0.012465,0.000614,,0.0,0.0,0.0,0.0,0.0,0.0,,,,,,,0.012387,0.012387,,4.403333,0.0,99.009901,0.000614
1,1621872000,4.437,10.211,4.384,4.43,406.1254,10.0,0.01021,0.010262,18.997236,0.433846,1.010493,0.845504,2.5e-05,0.707107,0.001148,0.000645,0.001159,0.000651,0.008595,0.002872,,,,,,,0.016086,0.010373,0.707107,6.249353,0.789429,100.0,0.000319
2,1621875600,4.43,4.48,4.43,4.48,13.288682,3.0,0.011223,0.011287,-0.967279,1.0,1.011287,0.011223,0.000845,1.017074,0.003311,0.001922,0.003379,0.001963,0.016635,0.007706,,,,,,,0.019651,0.010402,-0.592894,6.195378,1.647503,100.0,0.000494
3,1621879200,4.525,4.557,4.525,4.55,10.16821,4.0,0.015504,0.015625,-0.234822,0.998464,1.005525,0.007047,0.001525,1.254331,0.006717,0.004011,0.006951,0.004157,0.026877,0.014774,,,,,,,0.025106,0.010549,-0.522457,6.158055,2.848807,100.0,0.000752
4,1621882800,4.7,6.314,4.506,4.73,867.089671,18.0,0.038798,0.03956,84.274564,0.749129,1.049711,0.33736,4.5e-05,1.593686,0.013422,0.008112,0.014393,0.008729,0.054077,0.02986,,,,,,,0.046854,0.032687,1.5984,5.516306,5.937875,100.0,0.000611
