# DensityAD Real-Time Simulation

Watch DensityAD detect anomalies in real-time with severity color coding.

**Key concept:** Each detection call uses historical data as context for pattern learning, but only evaluates the latest data point(s) for anomalies. For real-time monitoring, you pass a sliding window of history with each new data point appended at the end.

**Features:**
- Streaming simulation (one datapoint at a time)
- Severity-based color coding
- Animated visualization
- GIF export

In [None]:
import sys
sys.path.append('../..')

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.patches import Patch
from IPython.display import HTML
from datetime import datetime, timedelta
from gradientcast import GradientCastDenseAD

# Replace with your API key
GRADIENTCAST_API_KEY = "your-api-key-here"

dense_ad = GradientCastDenseAD(api_key=GRADIENTCAST_API_KEY)

# Severity colors
SEVERITY_COLORS = {
    'none': '#3498db',    # Blue
    'low': '#f1c40f',     # Yellow
    'medium': '#e67e22',  # Orange
    'high': '#e74c3c',    # Red
    'critical': '#8e44ad' # Purple
}

---
## Generate Simulation Data

In [None]:
def generate_simulation_data(n_points=120, base_value=1500000):
    """Generate time series with varying severity anomalies."""
    np.random.seed(42)
    
    # Timestamps
    start = datetime.now() - timedelta(hours=n_points)
    timestamps = [(start + timedelta(hours=i)).strftime("%m/%d/%Y, %I:%M %p") 
                  for i in range(n_points)]
    
    # Base values with trend and daily pattern
    t = np.arange(n_points)
    daily_pattern = 0.1 * np.sin(2 * np.pi * t / 24)  # Daily cycle
    trend = np.linspace(0, 0.05, n_points)
    noise = np.random.normal(0, 0.02, n_points)
    values = base_value * (1 + daily_pattern + trend + noise)
    
    # Inject anomalies of different severities
    anomaly_info = []
    
    # Low severity (10-20% drop)
    for pos in [50, 51]:
        values[pos] *= 0.85
        anomaly_info.append((pos, 'low'))
    
    # Medium severity (25-35% drop)
    for pos in [70, 71]:
        values[pos] *= 0.70
        anomaly_info.append((pos, 'medium'))
    
    # High severity (40-50% drop)
    for pos in [90, 91, 92]:
        values[pos] *= 0.55
        anomaly_info.append((pos, 'high'))
    
    # Critical severity (60%+ drop)
    for pos in [110, 111]:
        values[pos] *= 0.35
        anomaly_info.append((pos, 'critical'))
    
    return timestamps, values.astype(int).tolist(), anomaly_info

# Generate data
timestamps, values, anomaly_info = generate_simulation_data()

print(f"Generated {len(values)} data points")
print(f"\nInjected anomalies:")
for pos, sev in anomaly_info:
    print(f"  Position {pos}: {sev}")

---
## Run Simulation

In [None]:
def run_simulation(timestamps, values, min_history=24):
    """Run anomaly detection simulation with severity tracking.
    
    Simulates real-time processing: for each new data point, we pass
    all history up to that point as context. The detector only evaluates
    the latest point for anomalies, using the history for pattern learning.
    """
    results = []
    
    for i in range(min_history, len(values)):
        # Build data window: all history up to and including current point
        # Earlier points serve as context; current point (index i) is evaluated
        window_data = [
            {"timestamp": timestamps[j], "value": values[j]}
            for j in range(i + 1)
        ]
        
        # Detect anomalies - context is provided, but only latest point(s) are evaluated
        result = dense_ad.detect(
            window_data,
            contamination=0.05,
            n_neighbors=12,
            min_contiguous_anomalies=2
        )
        
        # Check current point (the last one in the timeline)
        current_point = result.timeline[-1] if result.timeline else None
        is_anomaly = current_point.confirmed_anomaly if current_point else False
        severity = current_point.magnitude.severity if current_point and is_anomaly else 'none'
        
        results.append({
            'index': i,
            'value': values[i],
            'is_anomaly': is_anomaly,
            'severity': severity
        })
        
        # Log significant detections
        if is_anomaly and severity in ['high', 'critical']:
            print(f"[ALERT] Point {i}: {values[i]:,} - {severity.upper()} severity")
        elif i % 30 == 0:
            print(f"Processing point {i}...")
    
    return results

# Run simulation
print("Running simulation...\n")
simulation_results = run_simulation(timestamps, values)

# Summary
severity_counts = {}
for r in simulation_results:
    if r['is_anomaly']:
        sev = r['severity']
        severity_counts[sev] = severity_counts.get(sev, 0) + 1

print(f"\nSimulation complete!")
print(f"Detected by severity:")
for sev in ['low', 'medium', 'high', 'critical']:
    if sev in severity_counts:
        print(f"  {sev.capitalize()}: {severity_counts[sev]}")

---
## Animated Visualization

In [None]:
def create_severity_animation(values, simulation_results, min_history=24):
    """Create animated visualization with severity color coding."""
    
    fig, ax = plt.subplots(figsize=(14, 6))
    
    # Initialize plot elements
    line, = ax.plot([], [], 'b-', lw=1.5, alpha=0.7, label='Values')
    scatter_normal = ax.scatter([], [], c='blue', s=20, alpha=0.5)
    
    # Scatter for each severity level
    scatter_plots = {}
    for sev, color in SEVERITY_COLORS.items():
        if sev != 'none':
            scatter_plots[sev] = ax.scatter([], [], c=color, s=150, zorder=5,
                                            edgecolors='black', linewidths=1.5)
    
    # Set limits
    ax.set_xlim(0, len(values))
    ax.set_ylim(min(values) * 0.8, max(values) * 1.1)
    ax.set_xlabel('Time (hours)')
    ax.set_ylabel('Value')
    ax.set_title('DensityAD Real-Time Detection with Severity')
    ax.grid(alpha=0.3)
    
    # Legend
    legend_elements = [
        Patch(facecolor=SEVERITY_COLORS['low'], edgecolor='black', label='Low'),
        Patch(facecolor=SEVERITY_COLORS['medium'], edgecolor='black', label='Medium'),
        Patch(facecolor=SEVERITY_COLORS['high'], edgecolor='black', label='High'),
        Patch(facecolor=SEVERITY_COLORS['critical'], edgecolor='black', label='Critical'),
    ]
    ax.legend(handles=legend_elements, loc='upper right')
    
    # Data containers
    x_data = []
    y_data = []
    anomalies_by_severity = {sev: {'x': [], 'y': []} for sev in SEVERITY_COLORS if sev != 'none'}
    
    def init():
        line.set_data([], [])
        scatter_normal.set_offsets(np.empty((0, 2)))
        for scat in scatter_plots.values():
            scat.set_offsets(np.empty((0, 2)))
        return [line, scatter_normal] + list(scatter_plots.values())
    
    def update(frame):
        # Add point
        x_data.append(frame)
        y_data.append(values[frame])
        
        # Check for anomaly (after min_history)
        if frame >= min_history:
            result_idx = frame - min_history
            if result_idx < len(simulation_results):
                r = simulation_results[result_idx]
                if r['is_anomaly'] and r['severity'] in anomalies_by_severity:
                    anomalies_by_severity[r['severity']]['x'].append(frame)
                    anomalies_by_severity[r['severity']]['y'].append(values[frame])
        
        # Update line
        line.set_data(x_data, y_data)
        
        # Update normal scatter
        scatter_normal.set_offsets(np.column_stack([x_data, y_data]))
        
        # Update severity scatters
        for sev, scat in scatter_plots.items():
            if anomalies_by_severity[sev]['x']:
                scat.set_offsets(np.column_stack([
                    anomalies_by_severity[sev]['x'],
                    anomalies_by_severity[sev]['y']
                ]))
        
        return [line, scatter_normal] + list(scatter_plots.values())
    
    anim = animation.FuncAnimation(
        fig, update, frames=len(values),
        init_func=init, blit=True, interval=80
    )
    
    plt.close(fig)
    return anim

# Create animation
anim = create_severity_animation(values, simulation_results)

# Display
HTML(anim.to_jshtml())

---
## Save as GIF

In [None]:
# Save animation
anim = create_severity_animation(values, simulation_results)
anim.save('densityAD_simulation.gif', writer='pillow', fps=12)
print("Animation saved as 'densityAD_simulation.gif'")

---
## Static Summary

In [None]:
# Final static visualization
plt.figure(figsize=(14, 6))

# Plot time series
plt.plot(values, 'b-', lw=1.5, alpha=0.5, label='Values')

# Plot anomalies by severity
for r in simulation_results:
    if r['is_anomaly']:
        color = SEVERITY_COLORS.get(r['severity'], 'gray')
        plt.scatter(r['index'], r['value'], c=color, s=150, zorder=5,
                    edgecolors='black', linewidths=1.5)

# Detection start line
plt.axvline(x=24, color='gray', linestyle='--', alpha=0.5, label='Detection starts')

# Legend
legend_elements = [
    Patch(facecolor=SEVERITY_COLORS['low'], edgecolor='black', label='Low'),
    Patch(facecolor=SEVERITY_COLORS['medium'], edgecolor='black', label='Medium'),
    Patch(facecolor=SEVERITY_COLORS['high'], edgecolor='black', label='High'),
    Patch(facecolor=SEVERITY_COLORS['critical'], edgecolor='black', label='Critical'),
]
plt.legend(handles=legend_elements, loc='upper right')

plt.xlabel('Time (hours)')
plt.ylabel('Value')
plt.title('DensityAD Simulation - Final Results')
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

# Summary table
print("\nSeverity Summary:")
print(f"{'Severity':>10} | {'Count':>6} | {'Avg Value':>12}")
print("-" * 35)
for sev in ['low', 'medium', 'high', 'critical']:
    sev_results = [r for r in simulation_results if r['is_anomaly'] and r['severity'] == sev]
    if sev_results:
        avg_val = np.mean([r['value'] for r in sev_results])
        print(f"{sev:>10} | {len(sev_results):>6} | {avg_val:>12,.0f}")

---
## Key Takeaways

1. **Severity classification** helps prioritize alerts
2. **Color coding** makes anomalies visually distinct
3. **Contiguous filtering** reduces isolated false positives
4. **Real-time capable** - each detection is independent

**See also:** [PulseAD Simulation](../pulseAD/03_simulation.ipynb) for threshold-based detection