In [1]:
import pandas as pd

In [2]:
ad1 = pd.read_parquet("ad1.parquet")

In [3]:
ad2 = pd.read_parquet("ad2.parquet")

In [7]:
import numpy as np
from scipy.stats import zscore
import warnings
warnings.filterwarnings("ignore")
anomalies = []

In [9]:
def detect_anomalous_price_change(df, anomalies, threshold=0.05, resample_freq='1S'):
    
    instruments = [inst for l1, inst in df.columns if l1 == 'ltp']

    for inst in instruments:
        try:
            # Extract and clean the LTP series
            ltp_series = df[('ltp', inst)].replace(0, np.nan).dropna()

            # Resample to uniform interval if needed (only if not already uniform)
            ltp_series = ltp_series.resample(resample_freq).ffill().dropna()

            if len(ltp_series) < 2:
                continue

            # Compute percentage change
            price_change_rate = ltp_series.pct_change().replace([np.inf, -np.inf], np.nan).dropna()

            # Detect anomalies
            anomaly_times = price_change_rate[price_change_rate.abs() > threshold].index

            for ts in anomaly_times:
                anomalies.append({
                    'datetime': ts,
                    'stock_id': inst,
                    'anomaly_type': 'Anomalous Price Change'
                })

        except Exception as e:
            # Log or skip issues gracefully
            continue


In [11]:
from scipy.stats import zscore
import numpy as np

def detect_anomalous_volume_zscore(df, anomalies, threshold=3):
    # Identify instruments with 'traded_volume' data
    instruments = [inst for l1, inst in df.columns if l1 == 'traded_volume']

    for inst in instruments:
        try:
            # Extract and clean the traded volume series
            volume_series = df[('traded_volume', inst)].replace(0, np.nan).dropna()

            if len(volume_series) < 2:
                continue

            # Ensure continuity by only analyzing segments without large gaps
            volume_series = volume_series[volume_series.notna()]

            # Compute z-score (standard score)
            volume_z = zscore(volume_series, nan_policy='omit')

            # Filter index where z-score is above threshold
            anomaly_times = volume_series.index[np.abs(volume_z) > threshold]

            for ts in anomaly_times:
                anomalies.append({
                    'datetime': ts,
                    'stock_id': inst,
                    'anomaly_type': 'Volume Spike Z-Score'
                })

        except Exception:
            continue  # Gracefully skip issues in structure or calculation


In [13]:
def detect_bid_ask_spread_spike(df, anomalies, threshold=0.05):
    # Identify instruments having both bid and offer
    instruments = set(inst for l1, inst in df.columns if l1 in ('best_bid', 'best_offer'))

    for inst in instruments:
        try:
            bid_series = df[('best_bid', inst)].replace(0, np.nan).dropna()
            ask_series = df[('best_offer', inst)].replace(0, np.nan).dropna()

            # Align on common valid timestamps
            common_index = bid_series.index.intersection(ask_series.index)
            bid_series = bid_series.loc[common_index]
            ask_series = ask_series.loc[common_index]

            # Remove zero or negative bid/ask prices (invalid)
            valid_mask = (bid_series > 0) & (ask_series > 0)
            bid_series = bid_series[valid_mask]
            ask_series = ask_series[valid_mask]

            if len(bid_series) < 2 or len(ask_series) < 2:
                continue

            # Compute mid-price and spread
            mid_price = (bid_series + ask_series) / 2
            spread = ask_series - bid_series

            # Avoid division by zero or inf
            spread_ratio = (spread / mid_price).replace([np.inf, -np.inf], np.nan).dropna()

            # Find times with anomalous spread ratios
            anomaly_times = spread_ratio[spread_ratio > threshold].index

            for ts in anomaly_times:
                anomalies.append({
                    'datetime': ts,
                    'stock_id': inst,
                    'anomaly_type': 'Bid-Ask Spread Spike'
                })

        except Exception:
            continue  # Gracefully skip instruments with structural issues


In [15]:
def detect_delta_flip(df, anomalies, epsilon=1e-6):
    """
    Detects flips in Delta from positive to negative or vice versa,
    excluding noise around zero using a small threshold (epsilon).
    """

    instruments = [inst for l1, inst in df.columns if l1 == 'delta']

    for inst in instruments:
        try:
            delta = df[('delta', inst)].replace(0, np.nan).dropna()

            if len(delta) < 2:
                continue

            # Smooth out small noise around zero using epsilon
            delta_filtered = delta.copy()
            delta_filtered[np.abs(delta_filtered) < epsilon] = np.nan
            delta_filtered = delta_filtered.fillna(method='ffill').dropna()

            # Detect sign change
            sign_change = np.sign(delta_filtered).diff().fillna(0)
            flip_times = sign_change[sign_change != 0].index

            for ts in flip_times:
                anomalies.append({
                    'datetime': ts,
                    'stock_id': inst,
                    'anomaly_type': 'Delta Flip'
                })

        except Exception:
            continue


In [17]:
def detect_vega_sensitivity_spike(df, anomalies, z_threshold=3, epsilon=1e-6):
    """
    Detects Vega sensitivity spikes using Z-score.
    Handles noisy or missing data robustly.
    """
    from scipy.stats import zscore

    instruments = [inst for l1, inst in df.columns if l1 == 'vega']

    for inst in instruments:
        try:
            vega_series = df[('vega', inst)].replace(0, np.nan).dropna()

            if len(vega_series) < 10 or vega_series.std() < epsilon:
                continue  # skip if not enough variation or too few points

            z_scores = zscore(vega_series, nan_policy='omit')
            anomaly_times = vega_series.index[np.abs(z_scores) > z_threshold]

            for ts in anomaly_times:
                anomalies.append({
                    'datetime': ts,
                    'stock_id': inst,
                    'anomaly_type': 'Vega Sensitivity Spike'
                })

        except Exception:
            continue


In [19]:
def detect_iv_bid_ask_divergence(df, anomalies, threshold=0.15, epsilon=1e-6):
    """
    Detects divergence between IV and midpoint of Bid IV and Ask IV.
    Handles sparse, zero, or noisy data robustly.
    """
    # Identify instruments having all three columns: iv, bid_iv, ask_iv
    props, stocks = df.columns.levels
    instruments = [stock for stock in stocks if all((prop, stock) in df.columns for prop in ['iv', 'bid_iv', 'ask_iv'])]

    for inst in instruments:
        try:
            iv = df[('iv', inst)].replace(0, np.nan).dropna()
            bid_iv = df[('bid_iv', inst)].replace(0, np.nan).dropna()
            ask_iv = df[('ask_iv', inst)].replace(0, np.nan).dropna()

            # Common time indices
            common_index = iv.index.intersection(bid_iv.index).intersection(ask_iv.index)
            if len(common_index) < 10:
                continue

            # Align and calculate mid-IV
            mid_iv = (bid_iv.loc[common_index] + ask_iv.loc[common_index]) / 2
            valid_mask = mid_iv > epsilon

            # Avoid dividing by near-zero mid IV
            if valid_mask.sum() < 5:
                continue

            divergence = ((iv.loc[common_index] - mid_iv) / mid_iv).abs()
            divergence = divergence[valid_mask]

            anomaly_times = divergence[divergence > threshold].index

            for ts in anomaly_times:
                anomalies.append({
                    'datetime': ts,
                    'stock_id': inst,
                    'anomaly_type': 'IV-BidIV-AskIV Divergence'
                })

        except Exception:
            continue


In [21]:
from scipy.stats import zscore

def detect_iv_spike(df, anomalies, z_threshold=3):
    """
    Detects Implied Volatility (IV) spikes using Z-score method.
    Filters out invalid values and handles sparse or non-continuous data.
    """
    instruments = [inst for l1, inst in df.columns if l1 == 'iv']

    for inst in instruments:
        try:
            iv_series = df[('iv', inst)].replace(0, np.nan).dropna()

            # Remove negative or very small values
            iv_series = iv_series[iv_series > 0]

            # Ensure we have enough data for Z-score
            if len(iv_series) < 10:
                continue

            # Calculate Z-score
            iv_z = zscore(iv_series, nan_policy='omit')

            # Identify anomaly timestamps
            anomaly_times = iv_series.index[np.abs(iv_z) > z_threshold]

            for ts in anomaly_times:
                anomalies.append({
                    'datetime': ts,
                    'stock_id': inst,
                    'anomaly_type': 'IV Spike'
                })

        except Exception:
            continue  # Handle unexpected structure or NaNs


In [33]:

def detect_anomalies(df, output_file='result.csv'):
    df.index = pd.to_datetime(df.index)
    anomalies = []

    # Call all anomaly detection functions
    detect_iv_bid_ask_divergence(df, anomalies, threshold=0.15)
    detect_anomalous_price_change(df, anomalies)
    detect_anomalous_volume_zscore(df, anomalies)
    detect_iv_spike(df, anomalies)
    detect_vega_sensitivity_spike(df, anomalies)
    detect_delta_flip(df, anomalies)
    detect_bid_ask_spread_spike(df, anomalies, threshold=0.05)
    # Convert to DataFrame
    result_df = pd.DataFrame(anomalies)

    if not result_df.empty:
        result_df.sort_values(by="datetime", inplace=True)
        result_df.to_csv(output_file, index=False)
        print(f"Saved {len(result_df)} anomalies to {output_file}")
    else:
        print("No anomalies detected.")



In [35]:
detect_anomalies(ad1, output_file='result_set1.csv')

Saved 79896 anomalies to result_set1.csv


In [36]:
detect_anomalies(ad2, output_file='result_set2.csv')

Saved 122659 anomalies to result_set2.csv
