In [None]:
"""
Name : Aaditya Kulkarni
ID no : 2021A7PS0426P
University Email ID : f20210426@pilani.bits-pilani.ac.in
Personal Email ID : adityakulk0301@gmail.com
BITS Pilani - Pilani Campus
B.E. Computer Science
"""

In [20]:
# This file needs some imports to function properly - mentioned in the requirements.txt file
import dash
from dash import dcc, html
import plotly.graph_objs as go
from dash.dependencies import Input, Output
import numpy as np
from sklearn.neighbors import LocalOutlierFactor

In [21]:
"""
We create a dash app that displays a live updating graph of random data. 
The user can select between three different anomaly detection methods: Z-Score, Exponential Moving Average, and Local Outlier Factor.
"""

# We first initialize the Dash app
app = dash.Dash(__name__)

# We create the layout of the app
app.layout = html.Div([
    dcc.Graph(id='live-graph'),
    dcc.Interval(id='graph-update', interval=1000, n_intervals=0),  # Updates the graph every second
    html.Div("Anomaly Detection Method:"),
    dcc.RadioItems(
        id='anomaly-method',
        options=[
            {'label': 'Z-Score', 'value': 'zscore'},
            {'label': 'Exponential Moving Average', 'value': 'ema'},
            {'label': 'Local Outlier Factor', 'value': 'lof'},
        ],
        value='zscore',  # Value default to Z-score
        inline=True
    )
])

In [22]:
"""
We initialize the data that will be used to generate the graph, along with the anomalies that will be detected.
"""
x_data = []  # Time points on X-axis
y_data = []  # Transaction values on Y-axis
anomalies = []  # Store anomalies


In [29]:
"""
We define the function that will generate the financial data.
The function generates a random transaction value at time t, with a seasonal effect and random noise.
Occasionally, an anomaly is introduced, which is a random spike or drop in the transaction value.
We also add data validation to ensure that the generated values are valid.
We have also added a try-except block to catch any errors that may occur during data generation.
The function returns the transaction value and a boolean indicating whether it is an anomaly.

For the data stream, we are just appending the newly generated data at time t to the existing data
- this is because we are simulating a live data stream where new data points are continuously added.
This type of data streaming makes more sense when we are dealing with real-time data, such as financial transactions.

NOTE:
There is another type of real-time data streaming where every point from 0 to t gets generated every second but it is not a good idea for financial transactions data generation.

"""

def generate_financial_data(t):
    try:
        base_value = np.random.normal(100, 10)  # Base transaction value with normal distribution with mean 100 and std 10. This simulates the regular transaction amounts in the data stream.
        seasonal_effect = 20 * np.sin(t / 60)  # Seasonal effect with a period of 60 time units. This simulates predictable but fluctuations in transaction amounts.
        noise = np.random.normal(0, 5)  # Random gaussian noise with mean 0 and std 5. This simulates the unpredictable random fluctuations in transaction amounts.
        anomaly = 0

        # Occasionally introduce an anomaly
        if np.random.rand() < 0.1:  # 10% chance of an anomaly
            anomaly = np.random.uniform(-100, 200)  # Random spike or drop

        # Transaction value is the sum of base value, seasonal effect, noise, and anomaly
        transaction_value = base_value + seasonal_effect + noise + anomaly
        
        # Data validation
        if np.isnan(transaction_value) or np.isinf(transaction_value):  # Check for NaN or infinity
            raise ValueError(f"Invalid transaction value generated at time {t}: {transaction_value}")

        return transaction_value, anomaly != 0  # Return if it's an anomaly
    
    except Exception as e:
        print(f"Error generating data at time {t}: {e}")
        return None, False  # In case of error, return None and no anomaly

In [24]:
"""
Detecting anomalies using Z-Score method
The Z-score method detects anomalies by measuring how far each data point deviates from the mean of the data in terms of standard deviations.
Z-score is calculated as the difference between the data point and the mean divided by the standard deviation.
If the absolute Z-score of a data point is greater than a threshold (e.g., 3), it is considered an anomaly.
We have also added a try-except block to catch any errors that may occur during anomaly detection.

Reason for choosing Z-Score:
Z-score is simple and effective for identifying large deviations from the mean, which can indicate anomalies in financial data.
It is a statistical approach that is well-suited to data with a relatively stable mean and standard deviation.
"""
def detect_anomalies_zscore(y_data):
    try:
        if len(y_data) < 2:  # We need at least two points to compute mean and std
            return []

        mean = np.mean(y_data)
        std_dev = np.std(y_data)

        if std_dev == 0:
            return []  # To avoid division by zero

        z_scores = [(y - mean) / std_dev for y in y_data]
        anomaly_indices = [i for i, z in enumerate(z_scores) if abs(z) > 3]

        return anomaly_indices
    except Exception as e:
        print(f"Error detecting anomalies with Z-score: {e}")
        return []

In [25]:
"""
Detecting anomalies using Exponential Moving Average (EMA) method
The EMA method detects anomalies by comparing the original data with the exponentially smoothed data.
EMA is calculated as a weighted average of the current data point and the previous EMA value, more specifically:
EMA(t) = alpha * data(t) + (1 - alpha) * EMA(t-1) where alpha is the smoothing factor (e.g., 0.1).
The residuals are calculated as the absolute difference between the original data and the EMA.
If the residual of a data point is greater than a threshold (e.g., 2 times the standard deviation of residuals), it is considered an anomaly.
We have also added a try-except block to catch any errors that may occur during anomaly detection.

Reason for choosing EMA:
EMA adapts well to trends and shifts in data, making it ideal for time series with short-term fluctuations.
It helps capture gradual shifts in financial transaction patterns and highlights deviations from expected trends.
"""
def detect_anomalies_ema(y_data, alpha=0.1):
    try:
        if len(y_data) < 2:
            return []

        ema = [y_data[0]]  # Start EMA with the first value of the series
        for i in range(1, len(y_data)):
            ema.append(alpha * y_data[i] + (1 - alpha) * ema[-1])

        residuals = np.abs(np.array(y_data) - np.array(ema))
        anomaly_indices = [i for i, res in enumerate(residuals) if res > 2 * np.std(residuals)]

        return anomaly_indices
    except Exception as e:
        print(f"Error detecting anomalies with EMA: {e}")
        return []

In [26]:
"""
Detecting anomalies using Local Outlier Factor (LOF) method
The LOF method detects anomalies by comparing the local density of a data point with the local densities of its neighbors.
A data point is considered an anomaly if its local density is significantly lower than the local densities of its neighbors.
We have used nearest neighbors parameter n_neighbors=5 for LOF.
We have also added a try-except block to catch any errors that may occur during anomaly detection.
"""
def detect_anomalies_lof(y_data):
    try:
        if len(y_data) < 5:  # LOF needs atleast 5 points (based on n_neighbors=5)
            return []

        lof = LocalOutlierFactor(n_neighbors=5)
        y_data_reshaped = np.array(y_data).reshape(-1, 1)
        y_pred = lof.fit_predict(y_data_reshaped)

        anomaly_indices = np.where(y_pred == -1)[0].tolist() # Anomalies are labeled as -1 by LOF

        return anomaly_indices
    except Exception as e:
        print(f"Error detecting anomalies with LOF: {e}")
        return []

In [27]:
"""
This callback updates the real-time graph each second (n_intervals) and applies the selected anomaly detection method.
It appends the new transaction value to the data stream, detects anomalies using the chosen method (Z-score, EMA, or LOF),and stores any detected anomalies in a list. 
The graph is updated with the new transaction data and anomalies, if any, marking anomalies as red dots. 
The graph dynamically adjusts its x-axis (time) and y-axis (transaction value) to show the last 60 seconds.
Error handling ensures smooth graph updates even in case of data issues.
"""
@app.callback(
    Output('live-graph', 'figure'),
    [Input('graph-update', 'n_intervals'), Input('anomaly-method', 'value')]
)
def update_graph(n, method):
    global x_data, y_data, anomalies

    try:
        # We just append the new point in the end of list(t-th second)
        x_data.append(n)
        transaction_value, is_anomaly = generate_financial_data(n)

        # Only appending valid data
        if transaction_value is not None:
            y_data.append(transaction_value)

            # Detect anomalies based on the selected method
            if method == 'zscore':
                anomaly_indices = detect_anomalies_zscore(y_data)
            elif method == 'ema':
                anomaly_indices = detect_anomalies_ema(y_data)
            elif method == 'lof':
                anomaly_indices = detect_anomalies_lof(y_data)

            # We save anomalies in a list
            if is_anomaly:
                anomalies.append((n, transaction_value))

            # Create the plotly figure for the graph
            fig = go.Figure()

            # Adding the main data line
            fig.add_trace(go.Scatter(x=x_data, y=y_data, mode='lines', name='Transactions'))

            # Marking anomalies (they won't disappear once detected)
            for idx in anomaly_indices:
                anomalies.append((x_data[idx], y_data[idx]))

            if anomalies:
                anomaly_times, anomaly_values = zip(*anomalies) # Unzip the anomalies into times and values
                fig.add_trace(go.Scatter(
                    x=anomaly_times,
                    y=anomaly_values,
                    mode='markers',
                    marker=dict(color='red', size=10),
                    name='Anomalies'
                ))

            # We need to keep updating the layout of the graph, so it shows the last 60 seconds
            fig.update_layout(
                title='Real-Time Financial Transactions with Anomalies',
                xaxis_title='Time',
                yaxis_title='Transaction Value',
                xaxis=dict(range=[n - 60, n]),  
                yaxis=dict(range=[min(y_data[-60:]) - 10, max(y_data[-60:]) + 10])  # Adjust y-axis dynamically
            )

            return fig

    except Exception as e:
        print(f"Error updating graph at interval {n}: {e}")
        return go.Figure()  # Return an empty figure on error


In [28]:
# Running the dash app
if __name__ == '__main__':
    app.run_server(debug=True, port=8068)
