In [10]:
# Cell 1: Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [11]:
# Cell 2: Load Parquet Data
def load_data(file_path):
    df = pd.read_parquet(file_path, engine='fastparquet')
    df = df.sort_index()
    df = df.ffill().bfill()  # Forward and backward fill missing data
    return df

In [12]:
# Cell 3: Enhanced Preprocess
def preprocess(df):
    df = df.copy()
    df.index = pd.to_datetime(df.index)
    start, end = df.index.min(), df.index.max()
    master_index = pd.date_range(start=start, end=end, freq='1s')
    df = df.reindex(master_index)
    df = df.ffill().bfill()
    smoothed_df = df.rolling(window=5, min_periods=1).mean()
    return smoothed_df

In [13]:
# Cell 4: Rolling z-score function
def rolling_zscore(series, window=60):
    mean = series.rolling(window=window, min_periods=20).mean()
    std = series.rolling(window=window, min_periods=20).std()
    z = (series - mean) / std
    return z

In [14]:
# Cell 5: Feature Engineering
def feature_engineering(df):
    features = {}
    instruments = df.columns.get_level_values(1).unique()
    for instrument in instruments:
        try:
            ltp = df[('ltp', instrument)]
            iv = df[('iv', instrument)]
            volume = df[('traded_volume', instrument)]
            features[(instrument, 'ltp_ma')] = ltp.rolling(window=60).mean()
            features[(instrument, 'iv_zscore')] = rolling_zscore(iv.ffill())
            features[(instrument, 'volume_spike')] = volume.pct_change().rolling(60).mean()
        except KeyError:
            continue
    return pd.DataFrame(features, index=df.index)

In [15]:
# Cell 6: Anomaly Detection
def detect_anomalies(df, features):
    anomalies = []
    min_valid_time = df.index[10]
    for instrument in df.columns.get_level_values(1).unique():
        try:
            ltp = df[('ltp', instrument)]
            iv = df[('iv', instrument)]
            volume = df[('traded_volume', instrument)]

            ltp_diff = ltp.diff()
            ltp_std = ltp_diff.std()

            iv_z = features.get((instrument, 'iv_zscore'))
            vol_spike = features.get((instrument, 'volume_spike'))

            if iv_z is not None:
                iv_anomaly_idx = iv_z[(iv_z > 3) & (~iv_z.isna())].index
                for t in iv_anomaly_idx:
                    if t >= min_valid_time:
                        anomalies.append((t, instrument, 'IV spike'))

            if vol_spike is not None:
                vol_anomaly_idx = vol_spike[(vol_spike > 2) & (~vol_spike.isna())].index
                for t in vol_anomaly_idx:
                    if t >= min_valid_time:
                        anomalies.append((t, instrument, 'Volume spike'))

            ltp_jump_idx = ltp_diff[(ltp_diff.abs() > 3 * ltp_std) & (~ltp_diff.isna())].index
            for t in ltp_jump_idx:
                if t >= min_valid_time:
                    anomalies.append((t, instrument, 'LTP jump'))

        except KeyError:
            continue

    result_df = pd.DataFrame(anomalies, columns=['timestamp', 'instrument', 'reason'])
    result_df = result_df.sort_values(by='timestamp')
    return result_df

In [16]:
# Cell 7: Main Execution for Two Local Files
def main(local_paths):
    all_data = []

    for path in local_paths:
        print(f"Loading {path}...")
        df = load_data(path)
        all_data.append(df)

    print("Concatenating datasets...")
    combined_df = pd.concat(all_data).sort_index()

    print("Preprocessing combined data...")
    processed_df = preprocess(combined_df)

    print("Generating features...")
    features = feature_engineering(processed_df)

    print("Detecting anomalies...")
    results = detect_anomalies(processed_df, features)

    results.to_csv("results.csv", index=False)
    print("Anomalies saved to results.csv")

    return results

In [17]:
# Cell 8: Example Usage with Local Files
local_files = ["ad1.parquet", "ad2.parquet"]  # Replace with your actual filenames
results = main(local_files)

Loading ad1.parquet...
Loading ad2.parquet...
Concatenating datasets...
Preprocessing combined data...
Generating features...
Detecting anomalies...
Anomalies saved to results.csv
