# Start with the data analysis

#### This script detects anomalies in sensor data. It calculates a rolling average
#### and flags any point that is more than 3 standard deviations away.
#### It's very basic and needs to be turned into a real product.


In [None]:
import pandas as pd
import numpy as np

# --- Hardcoded Configuration & Paths ---
# This should be configurable.
TRAIN_DATA_PATH = '../data/sensor_data_train.csv'
TEST_DATA_PATH = '../data/sensor_data_test.csv'
WINDOW_SIZE = 60  # Rolling window for stats
THRESHOLD_MULTIPLIER = 3.0 # Number of std deviations for a point to be an anomaly

print("--- Starting Anomaly Detection POC ---")

# --- Step 1: "Training" ---
# Load the training data to establish a baseline of normal behavior.
# This logic is tightly coupled and not reusable.
try:
    train_df = pd.read_csv(TRAIN_DATA_PATH)
    train_df['timestamp'] = pd.to_datetime(train_df['timestamp'])
    train_df.set_index('timestamp', inplace=True)

    # Calculate rolling statistics on the training data
    rolling_mean = train_df['value'].rolling(window=WINDOW_SIZE).mean()
    rolling_std = train_df['value'].rolling(window=WINDOW_SIZE).std()

    # Use the last calculated values as our "model" for normal behavior
    # This is a naive approach but serves for the POC.
    NORMAL_MEAN = rolling_mean.iloc[-1]
    NORMAL_STD = rolling_std.iloc[-1]

    print(f"Training complete. Normal baseline: Mean={NORMAL_MEAN:.2f}, StdDev={NORMAL_STD:.2f}")

except FileNotFoundError:
    print(f"Error: Training data file not found at '{TRAIN_DATA_PATH}'")
    exit()


--- Starting Anomaly Detection POC ---
Training complete. Normal baseline: Mean=65.08, StdDev=0.23


In [None]:
train_df['sensor_id'].unique()
# We have a unique sensor ID (TEMP-01)

array(['TEMP-01'], dtype=object)

In [None]:
print(f'Start date: {str(train_df.index.min())}')
print(f'End date: {str(train_df.index.max())}')

Start date: 2025-08-01 00:00:00+00:00
End date: 2025-08-21 23:59:00+00:00


In [None]:
# Verify that we have everey minute in the training data
train_time_index = pd.date_range(start=train_df.index.min(), end=train_df.index.max(), freq='min')
missing_times = train_time_index.difference(train_df.index)
if not missing_times.empty:
    print(f"Warning: Missing timestamps in training data: {missing_times}")

In [None]:
rolling_mean

timestamp
2025-08-01 00:00:00+00:00          NaN
2025-08-01 00:01:00+00:00          NaN
2025-08-01 00:02:00+00:00          NaN
2025-08-01 00:03:00+00:00          NaN
2025-08-01 00:04:00+00:00          NaN
                               ...    
2025-08-21 23:55:00+00:00    65.105667
2025-08-21 23:56:00+00:00    65.102833
2025-08-21 23:57:00+00:00    65.096333
2025-08-21 23:58:00+00:00    65.091167
2025-08-21 23:59:00+00:00    65.085000
Name: value, Length: 30240, dtype: float64

In [None]:
rolling_std

timestamp
2025-08-01 00:00:00+00:00         NaN
2025-08-01 00:01:00+00:00         NaN
2025-08-01 00:02:00+00:00         NaN
2025-08-01 00:03:00+00:00         NaN
2025-08-01 00:04:00+00:00         NaN
                               ...   
2025-08-21 23:55:00+00:00    0.227740
2025-08-21 23:56:00+00:00    0.225186
2025-08-21 23:57:00+00:00    0.229103
2025-08-21 23:58:00+00:00    0.228615
2025-08-21 23:59:00+00:00    0.229794
Name: value, Length: 30240, dtype: float64

In [None]:
# --- Step 2: "Prediction" ---
# Load the test data and find anomalies.
# This whole section should be refactored into a reusable class/function.
try:
    test_df = pd.read_csv(TEST_DATA_PATH)
    test_df['timestamp'] = pd.to_datetime(test_df['timestamp'])
except FileNotFoundError:
    print(f"Error: Test data file not found at '{TEST_DATA_PATH}'")
    exit()

anomalies_found = []
print(f"\nScanning '{TEST_DATA_PATH}' for anomalies...")

# Inefficient row-by-row iteration
for index, row in test_df.iterrows():
    value = row['value']
    timestamp = row['timestamp']

    # Define upper and lower bounds for normal behavior
    upper_bound = NORMAL_MEAN + THRESHOLD_MULTIPLIER * NORMAL_STD
    lower_bound = NORMAL_MEAN - THRESHOLD_MULTIPLIER * NORMAL_STD

    # Check for anomaly
    if not (lower_bound <= value <= upper_bound):
        anomaly_details = {
            "timestamp": str(timestamp),
            "value": value,
            "reason": f"Value {value:.2f} is outside the normal range [{lower_bound:.2f}, {upper_bound:.2f}]"
        }
        anomalies_found.append(anomaly_details)
        print(f"  -> Anomaly Detected at {timestamp}: Value={value:.2f}")

# --- Step 3: Reporting ---
# Final summary of the results.
print("\n--- Scan Complete ---")
if anomalies_found:
    print(f"Total anomalies found: {len(anomalies_found)}")
    # In a real tool, this would be saved to a file, not just printed.
    # print("Details:", anomalies_found)
else:
    print("No anomalies were detected in the test data.")


Scanning '../data/sensor_data_test.csv' for anomalies...
  -> Anomaly Detected at 2025-08-22 01:43:00+00:00: Value=65.79
  -> Anomaly Detected at 2025-08-22 01:46:00+00:00: Value=65.82
  -> Anomaly Detected at 2025-08-22 01:55:00+00:00: Value=65.96
  -> Anomaly Detected at 2025-08-22 02:00:00+00:00: Value=66.02
  -> Anomaly Detected at 2025-08-22 02:01:00+00:00: Value=65.79
  -> Anomaly Detected at 2025-08-22 02:05:00+00:00: Value=65.93
  -> Anomaly Detected at 2025-08-22 02:07:00+00:00: Value=66.04
  -> Anomaly Detected at 2025-08-22 02:11:00+00:00: Value=65.87
  -> Anomaly Detected at 2025-08-22 02:12:00+00:00: Value=65.93
  -> Anomaly Detected at 2025-08-22 02:13:00+00:00: Value=65.79
  -> Anomaly Detected at 2025-08-22 02:15:00+00:00: Value=65.98
  -> Anomaly Detected at 2025-08-22 02:16:00+00:00: Value=66.06
  -> Anomaly Detected at 2025-08-22 02:17:00+00:00: Value=66.04
  -> Anomaly Detected at 2025-08-22 02:21:00+00:00: Value=66.08
  -> Anomaly Detected at 2025-08-22 02:22:00+0

In [None]:
import pandas as pd
import logging
from pathlib import Path

logging.basicConfig(level=logging.INFO)

class AnomalyDetector:
    def __init__(self, normal_mean, normal_std, threshold_multiplier=3):
        self.normal_mean = normal_mean
        self.normal_std = normal_std
        self.threshold_multiplier = threshold_multiplier

    def detect(self, df: pd.DataFrame) -> pd.DataFrame:
        """Detect anomalies and return them as a DataFrame"""
        upper_bound = self.normal_mean + self.threshold_multiplier * self.normal_std
        lower_bound = self.normal_mean - self.threshold_multiplier * self.normal_std

        mask = ~df['value'].between(lower_bound, upper_bound)
        anomalies = df.loc[mask].copy()
        anomalies['reason'] = anomalies['value'].apply(
            lambda v: f"Value {v:.2f} is outside range [{lower_bound:.2f}, {upper_bound:.2f}]"
        )
        return anomalies

In [None]:
def load_data(path: str) -> pd.DataFrame:
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Test data file not found at {path}")
    df = pd.read_csv(path)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    return df

def main(test_data_path, normal_mean, normal_std, threshold_multiplier=3, output_path="anomalies.csv"):
    logging.info(f"Loading test data from {test_data_path}")
    df = load_data(test_data_path)

    detector = AnomalyDetector(normal_mean, normal_std, threshold_multiplier)
    anomalies = detector.detect(df)

    if anomalies.empty:
        logging.info("No anomalies detected ✅")
    else:
        logging.warning(f"{len(anomalies)} anomalies found ⚠️")
        anomalies.to_csv(output_path, index=False)
        logging.info(f"Anomaly report saved at {output_path}")