#**Anomaly Detection in Continuous Data Streams**

## **Project Description**
This project involves developing a Python script that detects anomalies in a continuous data stream. This data stream simulates real-time sequences of floating-point numbers, representing various metrics, such as financial transactions or system metrics. The primary goal is to identify unusual patterns, such as exceptionally high values or deviations from the norm.

## **Objectives**
The main objectives of this project are:

1. **Algorithm Selection**: Identify and implement a suitable algorithm for anomaly detection that can adapt to concept drift and seasonal variations.
2. **Data Stream Simulation**: Design a function that simulates a continuous data stream, incorporating regular patterns, seasonal elements, and random noise.
3. **Anomaly Detection**: Develop a real-time mechanism to accurately flag anomalies as the data is streamed.
4. **Optimization**: Ensure that the algorithm is optimized for speed and efficiency.
5. **Visualization**: Create a straightforward real-time visualization tool to display both the data stream and any detected anomalies.

## **Explanation of the Chosen Algorithm**

- ### Exponential Moving Average (EMA)
The EMA is an effective method for smoothing time-series data, which helps in detecting anomalies by focusing more on recent observations. It calculates a weighted average of the past data points, with the latest points having more significance.

- ### Isolation Forest
Isolation Forest is a robust algorithm designed specifically for anomaly detection. It works by isolating observations in a random partitioning tree structure. Anomalies are expected to be easier to isolate, leading to shorter paths in the tree. This method is particularly effective for high-dimensional datasets.

- ### Effectiveness
The combination of EMA and Isolation Forest allows the system to adapt to seasonal variations and detect both point anomalies and batch anomalies efficiently. This is particularly useful in scenarios where the data is continuously generated, as it balances real-time detection with accuracy.

### **Algorithm Selection Explanation**

In this project, I chose to implement two algorithms for anomaly detection: the Exponential Moving Average (EMA) and the Isolation Forest. The EMA is particularly effective for real-time data streams, as it adapts quickly to changes in data trends and provides a smoothing mechanism that allows for the detection of deviations from typical patterns. This is essential when monitoring metrics that may have seasonal fluctuations or concept drift. On the other hand, the Isolation Forest is a robust machine learning model specifically designed for anomaly detection in high-dimensional datasets. It excels at identifying anomalies based on the concept of isolating observations in the feature space, making it highly effective for batch processing of data points. By combining these two approaches, the system leverages the strengths of both real-time adaptation and batch anomaly detection, thus ensuring a comprehensive and effective anomaly detection mechanism.

### **Data Stream Simulation**
The `DataPointGenerator` class simulates a real-time data stream by generating floating-point numbers with sinusoidal patterns, seasonal elements, and random noise. Each data point reflects typical metrics, such as financial transactions, while also introducing random anomalies with a 1% probability to help detect unusual patterns.

### **Visualization**
The `plot_real_time` method in the `AnomalyDetectionSystem` class provides a real-time visualization of the data stream and detected anomalies. It uses Plotly to create an interactive line plot, displaying the data points and highlighting any anomalies detected by both the Exponential Moving Average (EMA) method and the Isolation Forest model.

### **Environment Setup**
Before running the project, ensure you have the required libraries installed. You can use the following command to install them in Google Colab:

```python
!pip install numpy plotly scikit-learn
```

## **Code Implementation**
The following sections outline the implementation details, including code snippets and explanations.

In [None]:
import numpy as np
import random
import plotly.graph_objects as go
from sklearn.ensemble import IsolationForest

class DataPointGenerator:
    """Class to generate data points with seasonal patterns and random anomalies."""

    def __init__(self, seasonal_factor: float = 5.0):
        self.seasonal_factor = seasonal_factor

    def generate(self, time: int) -> float:
        """
        Generate a data point based on sinusoidal patterns with optional anomalies.

        Args:
            time (int): Current time index to generate the data point.

        Returns:
            float: The generated data point.
        """
        base_signal = np.sin(2 * np.pi * time / 50)  # Base sinusoidal pattern
        seasonal_signal = self.seasonal_factor * np.sin(2 * np.pi * time / 30)  # Seasonal component
        noise = 0.1 * np.random.randn()  # Random noise
        anomaly = 0

        # Introduce a random anomaly with a 1% chance
        if random.random() < 0.01:
            anomaly = random.choice([-10, 10])  # Anomaly can be -10 or +10

        return base_signal + seasonal_signal + noise + anomaly


class EMAAnomalyDetector:
    """Class to detect anomalies using Exponential Moving Average (EMA)."""

    def __init__(self, alpha: float, threshold: float):
        """
        Initialize the EMA Anomaly Detector.

        Args:
            alpha (float): Smoothing factor for the EMA.
            threshold (float): Z-score threshold for anomaly detection.
        """
        self.alpha = alpha
        self.threshold = threshold
        self.ema = None
        self.variance = None

    def update(self, x: float) -> bool:
        """
        Update the EMA and check if the current point is an anomaly.

        Args:
            x (float): New data point.

        Returns:
            bool: True if the data point is an anomaly, False otherwise.
        """
        if self.ema is None:
            self.ema = x
            self.variance = 0
        else:
            prev_ema = self.ema
            self.ema = self.alpha * x + (1 - self.alpha) * self.ema
            self.variance = self.alpha * (x - prev_ema) ** 2 + (1 - self.alpha) * self.variance

        std_dev = np.sqrt(self.variance)
        z_score = (x - self.ema) / (std_dev if std_dev != 0 else 1)
        return abs(z_score) > self.threshold


class AnomalyDetectionSystem:
    """Anomaly Detection System using EMA and Isolation Forest."""

    def __init__(self, alpha: float = 0.3, ema_threshold: float = 1.0, contamination: float = 0.01):
        """
        Initialize the Anomaly Detection System.

        Args:
            alpha (float): Smoothing factor for EMA.
            ema_threshold (float): Z-score threshold for EMA anomaly detection.
            contamination (float): Proportion of anomalies in the data for Isolation Forest.
        """
        self.alpha = alpha
        self.ema_threshold = ema_threshold
        self.contamination = contamination
        self.data_stream = []  # Store the generated data points
        self.ema_anomalies = []  # Store EMA anomalies
        self.batch_anomalies = []  # Store batch anomalies
        self.model = IsolationForest(contamination=self.contamination)  # Isolation Forest model
        self.data_generator = DataPointGenerator(seasonal_factor=5)  # Data generator instance

    def detect_anomalies_batch(self):
        """Detect anomalies using Isolation Forest on the batch data."""
        if len(self.data_stream) < 10:
            print("Not enough data points for batch detection.")
            return

        data = np.array(self.data_stream).reshape(-1, 1)  # Reshape for model
        self.batch_anomalies = self.model.fit_predict(data)  # Detect anomalies

    def plot_real_time(self):
        """Visualize the data stream and detected anomalies."""
        if len(self.data_stream) == 0:
            print("No data to plot.")
            return

        try:
            fig = go.Figure()
            fig.add_trace(go.Scatter(x=np.arange(len(self.data_stream)), y=self.data_stream, mode='lines', name='Data Stream'))

            # Plot EMA anomalies
            ema_anomaly_indices = np.where(np.array(self.ema_anomalies) == True)[0]
            if len(ema_anomaly_indices) > 0:
                fig.add_trace(go.Scatter(x=ema_anomaly_indices,
                                         y=np.array(self.data_stream)[ema_anomaly_indices],
                                         mode='markers', marker=dict(color='orange', size=10),
                                         name='EMA Anomalies'))

            # Plot batch anomalies
            if len(self.batch_anomalies) > 0:
                batch_anomaly_indices = np.where(self.batch_anomalies == -1)[0]
                if len(batch_anomaly_indices) > 0:
                    fig.add_trace(go.Scatter(x=batch_anomaly_indices,
                                             y=np.array(self.data_stream)[batch_anomaly_indices],
                                             mode='markers', marker=dict(color='red', size=10),
                                             name='Batch Anomalies (Isolation Forest)'))

            # Update layout and show the plot
            fig.update_layout(
                title='Anomaly Detection in Data Stream',
                xaxis_title='Time',
                yaxis_title='Value',
                hovermode='closest',
                template='plotly_white',
                width=800,
                height=500
            )
            fig.show()
        except Exception as e:
            print(f"Error in plotting data: {e}")

    def run(self):
        """Run the anomaly detection system."""
        try:
            while True:
                new_data_point = self.data_generator.generate(len(self.data_stream))  # Generate new data point
                self.data_stream.append(new_data_point)  # Append to data stream

                # Update EMA detector for current data stream
                ema_detector = EMAAnomalyDetector(self.alpha, self.ema_threshold)
                self.ema_anomalies = [ema_detector.update(point) for point in self.data_stream]

                # Detect anomalies in batch
                self.detect_anomalies_batch()

                # Plot the real-time data and anomalies
                self.plot_real_time()

        except KeyboardInterrupt:
            print("Stopping the anomaly detection system.")


if __name__ == "__main__":
    system = AnomalyDetectionSystem()  # Initialize the system
    system.run()  # Start the anomaly detection system



### **Results**
The system efficiently detects anomalies in a continuous data stream, providing real-time visual feedback. The orange markers represent anomalies detected using the Exponential Moving Average (EMA) method, while the red markers indicate those identified by the Isolation Forest algorithm. This dual approach enhances the robustness of the detection process, ensuring that various types of anomalies are accurately flagged.

### **Conclusion**
This project demonstrates the ability to detect anomalies in continuous data streams using Python. By leveraging algorithms such as Exponential Moving Average (EMA) and Isolation Forest, we can effectively identify unusual patterns in data. The implementation also provides a visual representation of the data and anomalies, making it easier to understand the dynamics of the data stream. Future work could include refining the algorithms, optimizing performance further, and exploring additional anomaly detection techniques.

**Author:** Omar Salous

**Email:** omar.salous2@gmail.com