In [None]:
import argparse
import math
import sys
from dataclasses import dataclass
from typing import Optional, Tuple, Dict

import numpy as np

import pandas as pd


import matplotlib.pyplot as plt


In [None]:
def time_delay_embed(x: np.ndarray, l: int) -> np.ndarray:
    """
    l: Window.
    x: Time Series.
    """
    if l < 1:
        raise ValueError("Embedding length l must be >= 1.")
    
    T = x.shape[0] # time series length
    if T < l:
        raise ValueError(f"Series length {T} is shorter than embedding l={l}.")

    from numpy.lib.stride_tricks import as_strided
    n_rows = T - l + 1
    stride = x.strides[0] # Location in memory 
    X = as_strided(x, shape=(n_rows, l), strides=(stride, stride))
    return X.copy()  

In [None]:
def _mahalanobis_VI(T: np.ndarray, reg: float = 1e-6) -> np.ndarray:
    """
    Compute inverse covariance (VI) for Mahalanobis distance on sample T (n x l).
    Adds Tikhonov regularization reg*I for stability.
    """
    l = T.shape[1]
    cov = np.cov(T, rowvar=False)
    if cov.ndim == 0:  # l == 1 edge case: scalar var
        cov = np.array([[cov]])
    cov = cov + reg * np.eye(l)
    # Pseudoinverse is safer than inverse
    VI = np.linalg.pinv(cov)
    return VI

In [None]:

def _mahalanobis_distances(T: np.ndarray, x: np.ndarray, VI: np.ndarray) -> np.ndarray:
    """
    Vectorized Mahalanobis distances from x to each row in T.
    """
    diff = T - x  # (n, l)
    # sqrt( (x-y)^T VI (x-y) ) for each row
    d2 = np.einsum('ij,jk,ik->i', diff, VI, diff, optimize=True)
    d2 = np.maximum(d2, 0.0)  # guard tiny negatives from numerics
    return np.sqrt(d2)


In [None]:
def knn_nonconformity(T: np.ndarray,
                      x: np.ndarray,
                      k: int,
                      metric: str = "mahalanobis",
                      VI: Optional[np.ndarray] = None) -> float:
    """
    Average distance from x to its k nearest neighbors in T (excluding x itself; x is not in T here).
    metric: 'mahalanobis' (default) or 'euclidean'.
    For 'mahalanobis', pass VI (inverse covariance) for T, or it will be computed.
    """
    if T.ndim != 2 or x.ndim != 1:
        raise ValueError("Shapes must be T:(n,l), x:(l,).")
    n = T.shape[0]
    if n == 0:
        return np.nan
    k_eff = max(1, min(k, n))
    if metric == "mahalanobis":
        if VI is None:
            VI = _mahalanobis_VI(T)
        d = _mahalanobis_distances(T, x, VI)
    elif metric == "euclidean":
        diff = T - x
        d = np.sqrt(np.sum(diff * diff, axis=1))
    else:
        raise ValueError("metric must be 'mahalanobis' or 'euclidean'")
    # take k smallest distances
    idx = np.argpartition(d, k_eff - 1)[:k_eff]
    return float(np.mean(d[idx]))


In [None]:

@dataclass
class LDCDConfig:
    l: int = 19           # embedding dimension
    k: int = 27           # neighbors
    n: int = 600          # training window (embedded points)
    m: int = 600          # calibration window (scores)
    metric: str = "mahalanobis"  # or 'euclidean'
    reg: float = 1e-6     # covariance regularization
    prune: bool = False   # alarm thinning
    prune_quantile: float = 0.995
    cooldown_frac: float = 0.2  # n/5 as in the paper

In [None]:

def ldcd_conformal_knn(x: np.ndarray, cfg: LDCDConfig) -> Dict[str, np.ndarray]:
    """
    Run LDCD on univariate series x. Returns dict with arrays aligned to original time index.
    Keys:
      'anomaly_prob' : 1 - conformal p-values (NaN before first valid index)
      'p_value'      : conformal p-values (NaN before first valid index)
      'score'        : kNN nonconformity scores (embedded-time aligned)
      'valid_from'   : integer (raw index) where the first p-value is defined
      'offset'       : embedding offset (= l-1)
    """
    if x.ndim != 1:
        raise ValueError("x must be a 1-D array.")
    l, k, n, m = cfg.l, cfg.k, cfg.n, cfg.m
    if any(v < 1 for v in [l, k, n, m]):
        raise ValueError("l, k, n, m must all be >= 1")

    # 1) Embed
    X = time_delay_embed(x, l)                  # shape (N_emb, l)
    N_emb = X.shape[0]
    offset = l - 1  # raw index for embedded row t is t + offset

    # Check we have enough embedded samples
    if N_emb < n + m + 1:
        raise ValueError(f"Not enough data. Need at least l + n + m points: "
                         f"l={l} n={n} m={m} -> min raw length {l + n + m}")

    # Output arrays (embedded timeline)
    scores = np.full(N_emb, np.nan, dtype=float)     # nonconformity
    pvals  = np.full(N_emb, np.nan, dtype=float)     # conformal p-values
    aprobs = np.full(N_emb, np.nan, dtype=float)     # anomaly probability = 1 - p

    # 2) Initialize training block at t0 = n + m - 1 (embedded index), training indices [0, n-1]
    # Fill initial calibration queue with scores for s in [n, n+m-1] scored against that training block.
    t0 = n + m - 1
    T0 = X[0:n]  # (n, l)

    # Precompute VI if using Mahalanobis for the initial block
    VI0 = _mahalanobis_VI(T0, reg=cfg.reg) if cfg.metric == "mahalanobis" else None

    for s in range(n, n + m):
        scores[s] = knn_nonconformity(T0, X[s], k, metric=cfg.metric, VI=VI0)

    # 3) Main loop: starting at t = n + m (first index with m prior scores available)
    cooldown = 0
    for t in range(n + m, N_emb):
        # Build training window T_t: indices [t - (n + m), t - m - 1]  (length n)
        start = t - (n + m)
        end = t - m  # exclusive
        T_t = X[start:end]  # shape (n, l)
        if cfg.metric == "mahalanobis":
            VI_t = _mahalanobis_VI(T_t, reg=cfg.reg)
        else:
            VI_t = None

        # Nonconformity for current x_t against T_t
        a_t = knn_nonconformity(T_t, X[t], k, metric=cfg.metric, VI=VI_t)
        scores[t] = a_t

        # Calibration scores: last m scores (a_{t-m}..a_{t-1})
        cal = scores[t - m: t]
        if np.any(np.isnan(cal)):
            # Should not happen after initialization, but be safe
            valid = cal[~np.isnan(cal)]
            if valid.size < m:
                # Not enough calibration yet
                pvals[t] = np.nan
                aprobs[t] = np.nan
                continue
            cal = valid[-m:]

        # Conformal p-value (LDCD): (count of >= including current) / (m+1)
        ge = np.sum(cal >= a_t)  # number of previous >= current
        p = (ge + 1) / float(m + 1)
        pvals[t] = p
        anomaly_prob = 1.0 - p

        # Optional pruning: if high alarm, set next n/5 outputs to 0.5
        if cfg.prune and anomaly_prob > cfg.prune_quantile and cooldown <= 0:
            cooldown = max(1, int(round(cfg.cooldown_frac * n)))

        if cooldown > 0:
            aprobs[t] = 0.5
            cooldown -= 1
        else:
            aprobs[t] = anomaly_prob

    # Map to raw time axis (length len(x)), with NaNs before offset
    anomaly_prob_raw = np.full_like(x, np.nan, dtype=float)
    pvals_raw = np.full_like(x, np.nan, dtype=float)
    # Fill from embedded indices where p-values are defined (t >= n+m)
    first_valid_emb = n + m
    if first_valid_emb < N_emb:
        raw_idx = np.arange(first_valid_emb, N_emb) + offset
        anomaly_prob_raw[raw_idx] = aprobs[first_valid_emb:N_emb]
        pvals_raw[raw_idx] = pvals[first_valid_emb:N_emb]

    return dict(
        anomaly_prob=anomaly_prob_raw,
        p_value=pvals_raw,
        score=scores,           # note: embedded timeline
        valid_from=int(first_valid_emb + offset),
        offset=int(offset)
    )


In [None]:
# Let's implement a Python script that applies the LDCD conformal k-NN anomaly detector
# on a univariate time-series. We'll match the paper's method exactly.

import numpy as np
from sklearn.neighbors import NearestNeighbors

class LDCD_KNN:
    def __init__(self, l=19, k=27, n=100, m=50):
        """
        l: embedding dimension (time-delay window length)
        k: number of nearest neighbors
        n: training set size
        m: calibration set size
        """
        self.l = l
        self.k = k
        self.n = n
        self.m = m
        self.N = n + m
        self.training_set = None
        self.calibration_scores = None
        self.is_initialized = False

    def _embed(self, series):
        """Embed univariate series into l-dim time-delay vectors."""
        T = len(series)
        embedded = np.zeros((T - self.l + 1, self.l))
        for i in range(T - self.l + 1):
            embedded[i] = series[i:i + self.l]
        return embedded

    def _knn_score(self, point, reference):
        """Compute average distance to k nearest neighbors."""
        neigh = NearestNeighbors(n_neighbors=min(self.k, len(reference)))
        neigh.fit(reference)
        distances, _ = neigh.kneighbors([point])
        return np.mean(distances)

    def initialize(self, embedded_series):
        """Initialize training and calibration sets."""
        self.training_set = embedded_series[:self.n]
        self.calibration_scores = []
        for idx in range(self.n, self.N):
            score = self._knn_score(embedded_series[idx], self.training_set)
            self.calibration_scores.append(score)
        self.is_initialized = True

    def update(self, new_point):
        """Process a new embedded point and return anomaly score."""
        if not self.is_initialized:
            raise RuntimeError("LDCD not initialized.")
        # Compute score using current training set
        score = self._knn_score(new_point, self.training_set)
        # Compute conformal p-value
        count = sum(1 for s in self.calibration_scores if s >= score)
        p_value = (count + 1) / (self.m + 1)  # +1 includes current score
        anomaly_score = 1 - p_value
        # Update calibration queue
        self.calibration_scores.pop(0)
        self.calibration_scores.append(score)
        # Update training set (slide forward)
        self.training_set = np.vstack([self.training_set[1:], new_point])
        return anomaly_score

    def fit_predict(self, series):
        """Fit LDCD to a univariate series and return anomaly scores."""
        embedded = self._embed(series)
        scores = [None] * len(series)
        self.initialize(embedded)
        # Fill scores for burn-in period with NaN
        for t in range(self.N + self.l - 1):
            scores[t] = np.nan
        # Process the rest
        for idx in range(self.N, len(embedded)):
            s = self.update(embedded[idx])
            scores[idx + self.l - 1] = s
        return scores

# Example usage with synthetic data
if __name__ == "__main__":
    # Create synthetic data with anomalies
    np.random.seed(0)
    normal_part = np.sin(np.linspace(0, 20, 300)) + 0.1 * np.random.randn(300)
    anomalies = np.random.uniform(3, 4, size=10)
    series = np.copy(normal_part)
    anomaly_positions = np.random.choice(len(series), size=10, replace=False)
    series[anomaly_positions] = anomalies

    # Instantiate and run LDCD
    detector = LDCD_KNN(l=19, k=27, n=100, m=50)
    scores = detector.fit_predict(series)

    import pandas as pd
    df_results = pd.DataFrame({
        "value": series,
        "anomaly_score": scores,
        "is_anomaly": [1 if i in anomaly_positions else 0 for i in range(len(series))]
    })

    import ace_tools as tools; tools.display_dataframe_to_user(name="LDCD Anomaly Detection Results", dataframe=df_results)

