In [18]:
# import
import pandas as pd
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import os
from typing import Dict, Tuple, List, Optional, Union, Callable
import warnings
warnings.filterwarnings('ignore')

In [2]:
# create output folders
os.makedirs("anomaly_plots", exist_ok=True)

def load_and_preprocess_data(file_path: str) -> pd.DataFrame:
    """
    Load options data from parquet file and perform preprocessing

    Args:
        file_path: Path to the parquet file

    Returns:
        Preprocessed DataFrame
    """
    df = pd.read_parquet(file_path)

    if not isinstance(df.index, pd.DatetimeIndex):
        try:
            df.index = pd.to_datetime(df.index)
        except:
            print("Warning: Could not convert index to datetime, using as-is")

    for col in df.columns:
        df[col] = df[col].ffill(limit=5)

    print(f"Data timeframe: {df.index[0]} to {df.index[-1]}")

    return df

In [None]:
def bs_price(S: pd.Series, K: float, T: pd.Series, r: float, sigma: pd.Series, option_type: str) -> pd.Series:
    """
    Calculate Black-Scholes option price

    Args:
        S: Underlying price series
        K: Strike price
        T: Time to expiry in years
        r: Risk-free rate
        sigma: Implied volatility series
        option_type: 'call' or 'put'

    Returns:
        Option price series
    """
    idx = S.index
    S_arr = S.values
    T_arr = T.values
    sigma_arr = sigma.values

    S_arr = np.asarray(S_arr, dtype=np.float64)
    T_arr = np.asarray(T_arr, dtype=np.float64)
    sigma_arr = np.asarray(sigma_arr, dtype=np.float64)
    
    mask = (sigma_arr > 0) & (T_arr > 0) & (S_arr > 0)
    
    d1 = np.full_like(S_arr, np.nan, dtype=np.float64)
    d2 = np.full_like(S_arr, np.nan, dtype=np.float64)
    price = np.full_like(S_arr, np.nan, dtype=np.float64)

    d1[mask] = (
        np.log(S_arr[mask] / K)
        + (r + 0.5 * sigma_arr[mask]**2) * T_arr[mask]
    ) / (sigma_arr[mask] * np.sqrt(T_arr[mask]))
    
    d2[mask] = d1[mask] - sigma_arr[mask] * np.sqrt(T_arr[mask])

    if option_type.lower() == 'call':
        price[mask] = S_arr[mask] * norm.cdf(d1[mask]) - K * np.exp(-r * T_arr[mask]) * norm.cdf(d2[mask])
    else:
        price[mask] = K * np.exp(-r * T_arr[mask]) * norm.cdf(-d2[mask]) - S_arr[mask] * norm.cdf(-d1[mask])

    return pd.Series(price, index=idx)

def calculate_price_jump(price: pd.Series, window: int = 5) -> pd.Series:
    """Calculate price jumps as percentage changes over specified window"""
    return price.pct_change(window).abs()

def calculate_spread_width(bid: pd.Series, ask: pd.Series) -> pd.Series:
    """Calculate bid-ask spread as percentage of mid price"""
    mid = (bid + ask) / 2
    return (ask - bid) / mid

def calculate_z_score(series: pd.Series, window: int = 60, min_periods: int = 10) -> pd.Series:
    """Calculate z-score for a time series using rolling window"""
    mean = series.rolling(window, min_periods=min_periods).mean()
    std = series.rolling(window, min_periods=min_periods).std()

    # for cases where std is 0 or NaN
    std = std.replace(0, series.std() if series.std() > 0 else 1e-6)
    std = std.fillna(series.std() if series.std() > 0 else 1e-6)

    return (series - mean) / std

In [19]:
def detect_anomalies(df: pd.DataFrame,
                    threshold: float = 3.0,
                    r: float = 0.0588,  # risk-free rate
                    window_size: int = 60,
                    plot_anomalies: bool = True) -> Tuple[pd.DataFrame, Dict]:
    """
    Detect anomalies in options data

    Args:
        df: DataFrame with options data
        threshold: Z-score threshold for anomaly detection
        r: Risk-free rate for BS model
        window_size: Window size for rolling statistics
        plot_anomalies: Whether to create plots

    Returns:
        Tuple of (anomalies DataFrame, results dictionary)
    """
    anomalies = []

    # filter out columns that are not actual instruments
    instruments = [col for col in df.columns.get_level_values(1).unique()
                if '_' in col and (col.endswith('CE') or col.endswith('PE'))]
    print(f"Found {len(instruments)} valid option instruments")

    detection_results = {}

    for inst in instruments:
        try:
            bid = df.get(('best_bid', inst), pd.Series(index=df.index, dtype=float))
            ask = df.get(('best_offer', inst), pd.Series(index=df.index, dtype=float))
            iv = df.get(('iv', inst), pd.Series(index=df.index, dtype=float))
            S = df.get(('additional', 'underlying_price'), pd.Series(index=df.index, dtype=float))
            T = df.get(('additional', 'tte'), pd.Series(index=df.index, dtype=float))
            gamma = df.get(('gamma', inst), pd.Series(index=df.index, dtype=float))

            # align indexes & NaN handling
            bid = bid.reindex(df.index).fillna(method='ffill', limit=5)
            ask = ask.reindex(df.index).fillna(method='ffill', limit=5)
            iv = iv.reindex(df.index).fillna(method='ffill', limit=5)
            S = S.reindex(df.index).fillna(method='ffill')
            T = T.reindex(df.index).fillna(method='ffill')
            gamma = gamma.reindex(df.index).fillna(method='ffill', limit=5).fillna(0)

            # for too many NaN values
            if (bid.isna().sum() > 0.5 * len(bid)) or (ask.isna().sum() > 0.5 * len(ask)):
                print(f"Skipping {inst} due to insufficient data")
                continue

            mid = (bid + ask) / 2

            parts = inst.split('_')
            if len(parts) < 3:
                print(f"Skipping {inst}: invalid instrument name format")
                continue

            try:
                strike = float(parts[2])
                option_type = 'call' if inst.endswith('CE') else 'put'
                expiry = parts[1]
            except:
                print(f"Skipping {inst}: could not parse strike price")
                continue

            # feature 1: mispricing
            theory = bs_price(S, strike, T, r, iv, option_type)
            mispricing = (mid - theory).abs()
            mispricing_pct = mispricing / theory
            z_mis = calculate_z_score(mispricing, window=window_size)

            # feature 2: iv changes
            z_iv = calculate_z_score(iv, window=window_size)

            # feature 3: price jumps
            price_jump = calculate_price_jump(mid, window=5)
            z_price_jump = calculate_z_score(price_jump, window=window_size)

            # feature 4: spread widening
            spread = calculate_spread_width(bid, ask)
            z_spread = calculate_z_score(spread, window=window_size)

            # feature 5: gamma spikes
            z_gamma = calculate_z_score(gamma, window=window_size)

            iv_spike_mask = (z_iv > threshold).fillna(False)
            mispricing_mask = (z_mis > threshold).fillna(False)
            price_jump_mask = (z_price_jump > threshold).fillna(False)
            spread_widening_mask = (z_spread > threshold).fillna(False)
            gamma_spike_mask = (z_gamma > threshold).fillna(False)

            detection_results[inst] = {
                'iv_spike_mask': iv_spike_mask,
                'mispricing_mask': mispricing_mask,
                'price_jump_mask': price_jump_mask,
                'spread_widening_mask': spread_widening_mask,
                'gamma_spike_mask': gamma_spike_mask,
                'iv': iv,
                'mid': mid,
                'bid': bid,
                'ask': ask,
                'spread': spread,
                'theory': theory,
                'mispricing_pct': mispricing_pct,
                'gamma': gamma,
                'z_iv': z_iv,
                'z_spread': z_spread,
                'z_gamma': z_gamma,
                'z_price_jump': z_price_jump,
                'expiry': expiry,
                'option_type': option_type,
                'strike': strike
            }

            has_anomalies = (
                iv_spike_mask.sum() > 0 or
                mispricing_mask.sum() > 0 or
                price_jump_mask.sum() > 0 or
                spread_widening_mask.sum() > 0 or
                gamma_spike_mask.sum() > 0
            )

            if has_anomalies:
                print(f"{inst}: IV spikes: {iv_spike_mask.sum()}, "
                      f"Mispricings: {mispricing_mask.sum()}, "
                      f"Price jumps: {price_jump_mask.sum()}, "
                      f"Spread widenings: {spread_widening_mask.sum()}, "
                      f"Gamma spikes: {gamma_spike_mask.sum()}")

            for ts in iv_spike_mask[iv_spike_mask].index:
                anomalies.append((ts, inst, "IV spike", round(float(z_iv.loc[ts]), 2)))

            for ts in mispricing_mask[mispricing_mask].index:
                anomalies.append((ts, inst, "Mispricing", f"{round(float(mispricing_pct.loc[ts]*100), 2)}%"))

            for ts in price_jump_mask[price_jump_mask].index:
                anomalies.append((ts, inst, "Price jump", f"{round(float(price_jump.loc[ts]*100), 2)}%"))

            for ts in spread_widening_mask[spread_widening_mask].index:
                anomalies.append((ts, inst, "Spread widening", round(float(z_spread.loc[ts]), 2)))

            for ts in gamma_spike_mask[gamma_spike_mask].index:
                anomalies.append((ts, inst, "Gamma spike", round(float(z_gamma.loc[ts]), 2)))

            # plots
            if has_anomalies and plot_anomalies:
                fig, axs = plt.subplots(5, 1, figsize=(14, 18), sharex=True)
                plot_idx = df.index

                axs[0].plot(plot_idx, iv, label="IV", linewidth=1, marker='.', markersize=1)
                axs[0].scatter(plot_idx[iv_spike_mask],
                              iv[iv_spike_mask],
                              color='red', label="IV Spike", s=30)
                axs[0].set_ylabel("IV")
                axs[0].legend(loc='upper right')
                axs[0].grid(True, alpha=0.3)

                axs[1].plot(plot_idx, mid, label="Mid Price", linewidth=1, marker='.', markersize=1)
                axs[1].plot(plot_idx, theory, label="BS Price", linestyle='--', linewidth=1)
                axs[1].scatter(plot_idx[mispricing_mask],
                              mid[mispricing_mask],
                              color='purple', label="Mispricing", s=30)
                axs[1].set_ylabel("Price")
                axs[1].legend(loc='upper right')
                axs[1].grid(True, alpha=0.3)

                axs[2].plot(plot_idx, price_jump * 100, label="Price Jump %", linewidth=1, marker='.', markersize=1)
                axs[2].scatter(plot_idx[price_jump_mask],
                              price_jump[price_jump_mask] * 100,
                              color='green', label="Jump Anomaly", s=30)
                axs[2].set_ylabel("Price Jump %")
                axs[2].legend(loc='upper right')
                axs[2].grid(True, alpha=0.3)

                axs[3].plot(plot_idx, spread * 100, label="Spread %", linewidth=1, marker='.', markersize=1)
                axs[3].scatter(plot_idx[spread_widening_mask],
                              spread[spread_widening_mask] * 100,
                              color='blue', label="Spread Widening", s=30)
                axs[3].set_ylabel("Spread %")
                axs[3].legend(loc='upper right')
                axs[3].grid(True, alpha=0.3)

                axs[4].plot(plot_idx, gamma, label="Gamma", linewidth=1, marker='.', markersize=1)
                axs[4].scatter(plot_idx[gamma_spike_mask],
                              gamma[gamma_spike_mask],
                              color='magenta', label="Gamma Spike", s=30)
                axs[4].set_ylabel("Gamma")
                axs[4].legend(loc='upper right')
                axs[4].grid(True, alpha=0.3)

                date_fmt = mdates.DateFormatter('%H:%M')
                axs[4].xaxis.set_major_locator(mdates.HourLocator(interval=1))
                axs[4].xaxis.set_major_formatter(date_fmt)
                axs[4].xaxis.set_minor_locator(mdates.MinuteLocator(byminute=[15,30,45]))

                plt.gcf().autofmt_xdate()

                for ax in axs:
                    ax.xaxis.set_tick_params(rotation=45)
                plt.subplots_adjust(hspace=0.3)
                trading_date = plot_idx[0].strftime('%Y-%m-%d')
                fig.suptitle(f"Anomalies for {inst} (Date: {trading_date})", fontsize=14)

                plt.tight_layout()
                plt.savefig(f"anomaly_plots/{inst}.png")
                plt.close()

        except Exception as e:
            print(f"Error processing {inst}: {e}")
            continue

    if anomalies:
        anomalies_df = pd.DataFrame(anomalies, columns=["timestamp", "instrument", "anomaly_type", "value"])
        anomalies_df = anomalies_df.sort_values(["timestamp", "instrument"])
    else:
        anomalies_df = pd.DataFrame(columns=["timestamp", "instrument", "anomaly_type", "value"])

    return anomalies_df, detection_results

In [None]:
""" Note that the summary and result csv files and plots are for ad1.parquet in order to get results for 
    ad2.parquet replace the data_file below and run this cell """


data_file = "ad1.parquet"
df = load_and_preprocess_data(data_file)

print("\nColumn structure:")
print(f"Level 0 (metrics): {df.columns.get_level_values(0).unique()}")
print(f"Level 1 (instruments): {len(df.columns.get_level_values(1).unique())} unique values")

anomalies_df, detection_results = detect_anomalies(df, threshold=3.0, plot_anomalies=True)
anomalies_df.to_csv("anomaly_results.csv", index=False)

# summary
if not anomalies_df.empty:
    anomaly_summary = anomalies_df.groupby(['instrument','anomaly_type']).size().reset_index(name='count')
    anomaly_summary = anomaly_summary.sort_values(['instrument','count'], ascending=[True,False])
    anomaly_summary.to_csv("anomaly_summary.csv", index=False)

    anomaly_type_summary = anomalies_df.groupby('anomaly_type').size().reset_index(name='count')
    anomaly_type_summary = anomaly_type_summary.sort_values('count', ascending=False)
    anomaly_type_summary.to_csv("anomaly_type_summary.csv", index=False)

    print(f"\nDetected {len(anomalies_df)} individual anomalies")

print("\nPlots saved in ./anomaly_plots/")
print("Results saved in anomaly_results.csv and anomaly_summary.csv")

Data timeframe: 2025-01-13 09:15:00 to 2025-01-13 15:30:00

Column structure:
Level 0 (metrics): Index(['best_bid', 'best_offer', 'ltp', 'traded_volume', 'additional',
       'bid_iv', 'ask_iv', 'iv', 'iv_ema60', 'iv7', 'iv7_ema60', 'delta',
       'gamma', 'theta', 'vega'],
      dtype='object')
Level 1 (instruments): 162 unique values
Found 156 valid option instruments
X_25123_21800_CE: IV spikes: 212, Mispricings: 254, Price jumps: 185, Spread widenings: 90, Gamma spikes: 488
X_25123_21800_PE: IV spikes: 339, Mispricings: 278, Price jumps: 395, Spread widenings: 176, Gamma spikes: 133
X_25123_21850_CE: IV spikes: 191, Mispricings: 320, Price jumps: 284, Spread widenings: 165, Gamma spikes: 508
X_25123_21850_PE: IV spikes: 319, Mispricings: 466, Price jumps: 745, Spread widenings: 383, Gamma spikes: 138
X_25123_21900_CE: IV spikes: 230, Mispricings: 238, Price jumps: 258, Spread widenings: 139, Gamma spikes: 545
X_25123_21900_PE: IV spikes: 339, Mispricings: 342, Price jumps: 429, Sp