# HPCSeries Pipeline API

This notebook demonstrates the **Pipeline API** introduced in v0.8.0, which enables composable kernel execution for multi-stage data processing.

## Key Features

- **12 predefined stages** for common time-series transformations
- **Ping-pong buffer management** for efficient memory reuse
- **Fluent API** for chaining operations
- **Workspace support** for memory-intensive pipelines
- **Three execution modes**: SAFE, FAST, DETERMINISTIC

In [None]:
import hpcs
import numpy as np
import time

print(f"HPCSeries version: {hpcs.__version__}")
print(f"Build features: {bin(hpcs.build_features())}")

## 1. Basic Pipeline Usage

Create a pipeline, add stages, and execute on input data.

In [None]:
# Generate sample time series with trend, seasonality, and noise
np.random.seed(42)
n = 100_000
t = np.arange(n)
x = (
    0.001 * t +                           # Trend
    10 * np.sin(2 * np.pi * t / 1000) +   # Seasonality
    np.random.randn(n)                    # Noise
)

# Inject some outliers
outlier_idx = np.random.choice(n, size=50, replace=False)
x[outlier_idx] += np.random.choice([-1, 1], size=50) * 20

print(f"Data shape: {x.shape}")
print(f"Data range: [{x.min():.2f}, {x.max():.2f}]")

In [None]:
# Create a simple pipeline
pipe = hpcs.pipeline(mode='fast')

# Add stages using fluent API
pipe.diff(order=1)        # Remove trend with first differencing
pipe.ewma(alpha=0.1)      # Smooth with exponential moving average
pipe.robust_zscore()      # Normalize using MAD-based z-score

# View pipeline summary
print(pipe.summary())

In [None]:
# Execute the pipeline
start = time.perf_counter()
result = pipe.execute(x)
elapsed = time.perf_counter() - start

print(f"Pipeline execution time: {elapsed*1000:.2f} ms")
print(f"Throughput: {n/elapsed/1e6:.2f} M elements/sec")
print(f"Result shape: {result.shape}")

## 2. Available Pipeline Stages

The pipeline supports 12 predefined operations:

In [None]:
# Demonstrate all available stages
stages_info = [
    ("diff(order)", "Finite differencing: y[t] = x[t] - x[t-order]"),
    ("ewma(alpha)", "Exponential weighted moving average"),
    ("ewvar(alpha)", "Exponential weighted variance"),
    ("ewstd(alpha)", "Exponential weighted standard deviation"),
    ("rolling_mean(window)", "Rolling window mean"),
    ("rolling_std(window)", "Rolling window standard deviation"),
    ("rolling_median(window)", "Rolling window median (robust)"),
    ("rolling_mad(window)", "Rolling window MAD (robust)"),
    ("zscore()", "Global z-score normalization"),
    ("robust_zscore(eps)", "MAD-based z-score (outlier resistant)"),
    ("normalize_minmax()", "Scale to [0, 1] range"),
    ("clip(min, max)", "Clamp values to range"),
]

print("Available Pipeline Stages:")
print("=" * 60)
for stage, desc in stages_info:
    print(f"  {stage:30} - {desc}")

## 3. Execution Modes

Three modes provide different trade-offs between safety and performance:

In [None]:
modes = ['safe', 'fast', 'deterministic']
times = {}

for mode in modes:
    pipe = hpcs.pipeline(mode=mode)
    pipe.diff(1).ewma(0.2).rolling_std(50).robust_zscore()
    
    # Warm-up
    _ = pipe.execute(x)
    
    # Benchmark
    start = time.perf_counter()
    for _ in range(10):
        result = pipe.execute(x)
    elapsed = (time.perf_counter() - start) / 10
    times[mode] = elapsed

print("Execution Mode Comparison:")
print("=" * 50)
for mode in modes:
    speedup = times['safe'] / times[mode]
    print(f"  {mode:15} {times[mode]*1000:8.2f} ms  ({speedup:.2f}x vs safe)")

## 4. Workspace for Large Arrays

For memory-intensive operations, pre-allocate a workspace:

In [None]:
# Create workspace (64MB)
ws = hpcs.workspace(64 * 1024 * 1024)
print(f"Workspace size: {ws.size / 1024 / 1024:.1f} MB")

# Use workspace with pipeline
pipe = hpcs.pipeline(ws=ws, mode='fast')
pipe.rolling_median(window=100)  # Memory-intensive operation
pipe.robust_zscore()

result = pipe.execute(x)
print(f"Result computed with workspace support")

In [None]:
# Grow workspace if needed
ws.reserve(128 * 1024 * 1024)  # Grow to 128MB
print(f"Workspace grown to: {ws.size / 1024 / 1024:.1f} MB")

## 5. Pipeline vs Manual Chaining

Compare pipeline execution to calling individual functions:

In [None]:
# Method 1: Pipeline (optimized buffer management)
pipe = hpcs.pipeline(mode='fast')
pipe.diff(1).ewma(0.2).zscore()

start = time.perf_counter()
for _ in range(100):
    result_pipe = pipe.execute(x)
time_pipe = (time.perf_counter() - start) / 100

# Method 2: Manual chaining (extra allocations)
start = time.perf_counter()
for _ in range(100):
    temp1 = hpcs.diff(x, order=1)
    temp2 = hpcs.ewma(temp1, alpha=0.2)
    result_manual = hpcs.zscore(temp2)
time_manual = (time.perf_counter() - start) / 100

print(f"Pipeline:      {time_pipe*1000:.3f} ms")
print(f"Manual chain:  {time_manual*1000:.3f} ms")
print(f"Pipeline speedup: {time_manual/time_pipe:.2f}x")
print(f"Results match: {np.allclose(result_pipe, result_manual, equal_nan=True)}")

## 6. Real-World Example: Anomaly Detection Pipeline

Build a complete anomaly detection pipeline for sensor data:

In [None]:
# Simulate sensor data with anomalies
np.random.seed(123)
n_sensors = 500_000
sensor_data = np.cumsum(np.random.randn(n_sensors) * 0.1)  # Random walk
sensor_data += 5 * np.sin(np.linspace(0, 50*np.pi, n_sensors))  # Seasonal

# Inject anomalies (sudden spikes)
anomaly_times = np.random.choice(n_sensors, size=100, replace=False)
sensor_data[anomaly_times] += np.random.choice([-1, 1], size=100) * 10

print(f"Sensor readings: {n_sensors:,}")

In [None]:
# Build anomaly detection pipeline
anomaly_pipe = hpcs.pipeline(mode='fast')

# Stage 1: Remove trend with differencing
anomaly_pipe.diff(order=1)

# Stage 2: Smooth short-term noise
anomaly_pipe.ewma(alpha=0.3)

# Stage 3: Compute robust z-scores
anomaly_pipe.robust_zscore(eps=1e-10)

print(anomaly_pipe.summary())

In [None]:
# Execute pipeline
start = time.perf_counter()
z_scores = anomaly_pipe.execute(sensor_data)
elapsed = time.perf_counter() - start

# Detect anomalies (|z| > 3)
threshold = 3.0
detected_anomalies = np.where(np.abs(z_scores) > threshold)[0]

print(f"Pipeline execution: {elapsed*1000:.2f} ms")
print(f"Throughput: {n_sensors/elapsed/1e6:.2f} M readings/sec")
print(f"Detected anomalies: {len(detected_anomalies)}")
print(f"True anomalies injected: {len(anomaly_times)}")

## 7. Custom Pre/Post Processing

The pipeline supports 12 predefined operations. For custom transformations, apply them before or after the pipeline:

In [None]:
# Custom pre-processing: log transform for skewed data
x_raw = np.abs(np.random.randn(50_000)) + 0.1  # Positive, skewed
x_log = np.log1p(x_raw)  # Custom: log(1+x) transform

# Standard pipeline
pipe = hpcs.pipeline(mode='fast')
pipe.rolling_mean(window=20)
pipe.zscore()
result = pipe.execute(x_log)

# Custom post-processing: convert to binary signal
binary_signal = (np.abs(result) > 2.0).astype(float)

print(f"Flagged {binary_signal.sum():.0f} points as unusual")

## 8. Feature Discovery

Query runtime information about the library build:

In [None]:
# Get build features bitmask
features = hpcs.build_features()

print("Build Features:")
print("=" * 40)
print(f"  OpenMP:     {'Yes' if features & hpcs.FEAT_OPENMP else 'No'}")
print(f"  AVX2:       {'Yes' if features & hpcs.FEAT_SIMD_AVX2 else 'No'}")
print(f"  AVX-512:    {'Yes' if features & hpcs.FEAT_SIMD_AVX512 else 'No'}")
print(f"  NEON:       {'Yes' if features & hpcs.FEAT_SIMD_NEON else 'No'}")
print(f"  Fast Math:  {'Yes' if features & hpcs.FEAT_FAST_MATH else 'No'}")
print(f"  GPU:        {'Yes' if features & hpcs.FEAT_GPU_OFFLOAD else 'No'}")

In [None]:
# Check for errors (thread-local)
error = hpcs.last_error()
if error:
    print(f"Last error: {error}")
else:
    print("No errors")

## Summary

The Pipeline API provides:

1. **Efficient Chaining**: Ping-pong buffers minimize allocations
2. **12 Built-in Stages**: Common time-series transformations
3. **Three Execution Modes**: SAFE (default), FAST, DETERMINISTIC
4. **Workspace Support**: Pre-allocated memory for large arrays
5. **Fluent API**: Readable method chaining

For custom operations not in the 12 predefined stages, apply them before or after the pipeline using standard NumPy or individual HPCSeries functions.