### 1: Imports and Load Data

In [2]:
import pandas as pd
import numpy as np
from datetime import datetime

# Load the raw data
df = pd.read_csv('raw_market_data_with_errors.csv', parse_dates=['timestamp'])

print("Data loaded for validation.")
print(f"Shape: {df.shape}")
print("First 5 rows:")
print(df.head())

Data loaded for validation.
Shape: (20200, 4)
First 5 rows:
                      timestamp exchange       price  volume
0 2025-06-01 09:30:00.000000000     BATS  150.000507     288
1 2025-06-01 09:30:01.170058502   NASDAQ  150.000378      81
2 2025-06-01 09:30:02.340117005   NASDAQ  150.001036      18
3 2025-06-01 09:30:03.510175508   NASDAQ  150.002569     196
4 2025-06-01 09:30:04.680234011     NYSE  150.002345     291


### 2: Rule-Based Validation Flags

In [3]:
# Simple rule-based checks (fast and deterministic)
df['is_missing_price'] = df['price'].isna()
df['is_negative_price'] = df['price'] < 0
df['is_zero_volume'] = df['volume'] == 0

print("Rule-based flags added:")
print(f"- Missing prices: {df['is_missing_price'].sum()}")
print(f"- Negative prices: {df['is_negative_price'].sum()}")
print(f"- Zero volumes: {df['is_zero_volume'].sum()}")

Rule-based flags added:
- Missing prices: 608
- Negative prices: 20
- Zero volumes: 43


### 3: Statistical Anomaly Detection (Z-Score and IQR)

In [4]:
# Z-score outlier detection (common in HFT for price spikes)
# Only on valid prices to avoid NaN propagation
valid_prices = df['price'].dropna()
mean_price = valid_prices.mean()
std_price = valid_prices.std()

df['z_score'] = np.abs((df['price'] - mean_price) / std_price)
df['is_outlier_z'] = (df['z_score'] > 3) & (~df['price'].isna())  # Threshold of 3σ

# IQR method (robust to extreme values)
Q1 = valid_prices.quantile(0.25)
Q3 = valid_prices.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

df['is_outlier_iqr'] = ((df['price'] < lower_bound) | (df['price'] > upper_bound)) & (~df['price'].isna())

print("Statistical outlier detection completed:")
print(f"- Z-score outliers (>3σ): {df['is_outlier_z'].sum()}")
print(f"- IQR outliers: {df['is_outlier_iqr'].sum()}")

Statistical outlier detection completed:
- Z-score outliers (>3σ): 96
- IQR outliers: 135


#### 4: Timestamp Gap Detection

In [5]:
# Sort by timestamp and calculate gaps
df_sorted = df.sort_values('timestamp').reset_index(drop=True)
time_diffs = df_sorted['timestamp'].diff().dt.total_seconds().fillna(0)

df_sorted['gap_seconds'] = time_diffs
df_sorted['is_gap_large'] = time_diffs > 10  # Flag gaps >10 seconds (unusual in HFT)

# Merge back to original order if needed, but keep sorted for consistency
df = df_sorted

print(f"Large timestamp gaps (>10s) detected: {df['is_gap_large'].sum()}")
print("Top 5 largest gaps (seconds):")
print(df.nlargest(5, 'gap_seconds')['gap_seconds'].values)

Large timestamp gaps (>10s) detected: 5
Top 5 largest gaps (seconds):
[298.1700585 226.1700585 195.1700585 168.1700585 165.1700585]


#### 5: Combined Anomaly Flag and Report

In [7]:
# Master anomaly flag
anomaly_cols = [
    'is_missing_price', 'is_negative_price', 'is_zero_volume',
    'is_outlier_z', 'is_outlier_iqr', 'is_gap_large'
]
df['is_anomaly'] = df[anomaly_cols].any(axis=1)

In [8]:
# Final report
print("="*60)
print("ANOMALY DETECTION REPORT")
print("="*60)
print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Total rows flagged as anomalous: {df['is_anomaly'].sum()} ({df['is_anomaly'].mean()*100:.2f}%)")
print("\nDetailed breakdown:")
print(f"- Missing prices: {df['is_missing_price'].sum()}")
print(f"- Negative prices: {df['is_negative_price'].sum()}")
print(f"- Zero volumes: {df['is_zero_volume'].sum()}")
print(f"- Z-score outliers: {df['is_outlier_z'].sum()}")
print(f"- IQR outliers: {df['is_outlier_iqr'].sum()}")
print(f"- Large gaps: {df['is_gap_large'].sum()}")
print(f"- Exact duplicates: {df.duplicated().sum()} (to be handled in cleaning)")

ANOMALY DETECTION REPORT
Generated on: 2025-12-31 12:45:53
Total rows flagged as anomalous: 789 (3.91%)

Detailed breakdown:
- Missing prices: 608
- Negative prices: 20
- Zero volumes: 43
- Z-score outliers: 96
- IQR outliers: 135
- Large gaps: 5
- Exact duplicates: 0 (to be handled in cleaning)


In [9]:
print("\nSample anomalous rows:")
print(df[df['is_anomaly']].head(10)[['timestamp', 'exchange', 'price', 'volume', 'z_score', 'gap_seconds']])


Sample anomalous rows:
                        timestamp exchange        price  volume   z_score  \
12  2025-06-01 09:30:14.040702035     BATS          NaN     142       NaN   
38  2025-06-01 09:30:43.292164608     NYSE          NaN     192       NaN   
42  2025-06-01 09:30:47.972398619     NYSE          NaN     379       NaN   
72  2025-06-01 09:31:21.904095204     NYSE          NaN     290       NaN   
99  2025-06-01 09:31:53.495674783     NYSE          NaN     390       NaN   
128 2025-06-01 09:32:27.427371368     NYSE          NaN     149       NaN   
215 2025-06-01 09:34:09.222461123     ARCA          NaN     414       NaN   
225 2025-06-01 09:34:20.923046152   NASDAQ  2607.369361     281  19.30386   
242 2025-06-01 09:34:40.814040702     NYSE          NaN     266       NaN   
268 2025-06-01 09:35:10.065503275     NYSE          NaN     368       NaN   

     gap_seconds  
12      1.170059  
38      1.170059  
42      1.170059  
72      1.170059  
99      1.170059  
128     1.1700

In [10]:
print("\nSample clean rows:")
print(df[~df['is_anomaly']].head(5)[['timestamp', 'price', 'volume']])


Sample clean rows:
                      timestamp       price  volume
0 2025-06-01 09:30:00.000000000  150.000507     288
1 2025-06-01 09:30:01.170058502  150.000378      81
2 2025-06-01 09:30:02.340117005  150.001036      18
3 2025-06-01 09:30:03.510175508  150.002569     196
4 2025-06-01 09:30:04.680234011  150.002345     291


#### 6: Save Flagged Dataset

In [11]:
df.to_csv('market_data_with_anomaly_flags.csv', index=False)
print("Flagged dataset saved as 'market_data_with_anomaly_flags.csv' for next phase.")

Flagged dataset saved as 'market_data_with_anomaly_flags.csv' for next phase.
