# Imports

In [59]:
import pandas as pd
import random
import numpy as np
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import optuna

from itertools import product
from tqdm import tqdm 
from sklearn.metrics import confusion_matrix
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split


# Initial test

In [5]:
# Load the dataset
data = pd.read_csv('../data/synthetic_time_series.csv')

# Convert timestamp to datetime
data['timestamp'] = pd.to_datetime(data['timestamp'])

In [6]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=data['timestamp'], y=data['kpi'], mode='lines', name='KPI'))
fig.update_layout(title='Time-Series Data', xaxis_title='Time', yaxis_title='KPI Value')
fig.show()

In [7]:
# Normalize KPI values
scaler = MinMaxScaler()
data['kpi_normalized'] = scaler.fit_transform(data[['kpi']])


In [8]:
# Split into 80% training and 20% testing
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

In [9]:
# Initialize and train the model
model = IsolationForest(contamination=0.02, random_state=42)
model.fit(train_data[['kpi_normalized']])

# Predict anomalies on the test set
test_data['anomaly'] = model.predict(test_data[['kpi_normalized']])

# Map predictions: 1 (normal), -1 (anomaly)
test_data['anomaly'] = test_data['anomaly'].map({1: 0, -1: 1})


In [10]:
# Extract anomalies
anomalies = test_data[test_data['anomaly'] == 1]

In [11]:
# Sort data by timestamp
test_data = test_data.sort_values(by='timestamp')

# Extract anomalies after sorting
anomalies = test_data[test_data['anomaly'] == 1]

In [12]:
# Plot the KPI values and anomalies
fig = go.Figure()
fig.add_trace(go.Scatter(x=test_data['timestamp'], y=test_data['kpi'], mode='lines', name='KPI'))
fig.add_trace(go.Scatter(x=anomalies['timestamp'], y=anomalies['kpi'], mode='markers', name='Anomalies', marker=dict(color='red')))
fig.update_layout(title='Detected Anomalies in KPI Data', xaxis_title='Time', yaxis_title='KPI Value')
fig.show()

# Refining

In [13]:
# Filter anomalies based on KPI value threshold (e.g., 0.6)
HIGH_KPI_THRESHOLD = 0.6
test_data['anomaly'] = test_data.apply(
    lambda row: 1 if row['anomaly'] == 1 and row['kpi'] > HIGH_KPI_THRESHOLD else 0,
    axis=1
)

# Extract anomalies
anomalies = test_data[test_data['anomaly'] == 1]

In [14]:
# Create the main line plot for KPI values
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=test_data['timestamp'],
    y=test_data['kpi'],
    mode='lines',
    name='KPI',
    line=dict(color='blue', width=2)
))

# Add anomalies as a scatter plot
fig.add_trace(go.Scatter(
    x=anomalies['timestamp'],
    y=anomalies['kpi'],
    mode='markers',
    name='Anomalies',
    marker=dict(color='red', size=10, symbol='circle'),
    hoverinfo='x+y'
))

# Update layout for better readability
fig.update_layout(
    title='Detected Anomalies in KPI Data (High Values Only)',
    xaxis_title='Time',
    yaxis_title='KPI Value',
    legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
    template='plotly_white'
)

fig.show()

# False Alarm Rate

In [15]:
# Load the dataset
data = pd.read_csv('../data/synthetic_time_series.csv')
data['timestamp'] = pd.to_datetime(data['timestamp'])

# Set the False Alarm Rate (e.g., 1%)
FAR = 0.01

# Calculate the threshold based on FAR
threshold = np.quantile(data['kpi'], 1 - FAR)

# Identify anomalies based on the threshold
data['anomaly'] = data['kpi'].apply(lambda x: 1 if x > threshold else 0)

# Extract anomalies for visualization
anomalies = data[data['anomaly'] == 1]

# Display the threshold value
print(f"Anomaly threshold (based on FAR={FAR}): {threshold}")


Anomaly threshold (based on FAR=0.01): 0.7594836744255726


In [16]:
# Create the main line plot for KPI values
fig = go.Figure()

# Add the KPI line plot
fig.add_trace(go.Scatter(
    x=data['timestamp'],
    y=data['kpi'],
    mode='lines',
    name='KPI',
    line=dict(color='blue', width=2)
))

# Add anomalies as a scatter plot
fig.add_trace(go.Scatter(
    x=anomalies['timestamp'],
    y=anomalies['kpi'],
    mode='markers',
    name='Anomalies',
    marker=dict(color='red', size=10, symbol='circle'),
    hoverinfo='x+y'
))

# Add a horizontal line for the threshold
fig.add_trace(go.Scatter(
    x=data['timestamp'],
    y=[threshold] * len(data),
    mode='lines',
    name='Threshold',
    line=dict(color='green', width=2, dash='dash')
))

# Update layout for better visualization
fig.update_layout(
    title='Anomaly Detection Using False Alarm Rate',
    xaxis_title='Time',
    yaxis_title='KPI Value',
    legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
    template='plotly_white'
)

fig.show()


# Cell-Averaging Constant False Alarm Rate (CA-CFAR)

In [17]:
# Load the dataset
data = pd.read_csv('../data/synthetic_time_series.csv')
data['timestamp'] = pd.to_datetime(data['timestamp'])

# Define CA-CFAR parameters
REFERENCE_CELLS = 20  # Number of reference cells
GUARD_CELLS = 5       # Number of guard cells
FAR = 0.01            # False alarm rate

# Initialize variables
kpi_values = data['kpi'].values
num_points = len(kpi_values)
thresholds = np.zeros(num_points)
anomalies = np.zeros(num_points)

# CA-CFAR processing
for i in range(num_points):
    # Define the sliding window
    start_idx = max(0, i - REFERENCE_CELLS - GUARD_CELLS)
    end_idx = min(num_points, i + REFERENCE_CELLS + GUARD_CELLS + 1)
    guard_start_idx = max(0, i - GUARD_CELLS)
    guard_end_idx = min(num_points, i + GUARD_CELLS + 1)

    # Exclude guard cells and the current cell
    reference_cells = np.concatenate([
        kpi_values[start_idx:guard_start_idx],
        kpi_values[guard_end_idx:end_idx]
    ])
    
    # Calculate the threshold based on the reference cells
    if len(reference_cells) > 0:
        noise_level = np.mean(reference_cells)
        threshold = noise_level * (1 + FAR)  # Adjust with FAR
        thresholds[i] = threshold

        # Detect anomalies
        if kpi_values[i] > threshold:
            anomalies[i] = 1

# Add results to the DataFrame
data['threshold'] = thresholds
data['anomaly'] = anomalies

# Extract anomalies for visualization
anomalies_df = data[data['anomaly'] == 1]

In [22]:
# Create the main line plot for KPI values
fig = go.Figure()

# Add KPI line plot
fig.add_trace(go.Scatter(
    x=data['timestamp'],
    y=data['kpi'],
    mode='lines',
    name='KPI',
    line=dict(color='blue', width=2)
))

# Add anomalies as scatter points
fig.add_trace(go.Scatter(
    x=anomalies_df['timestamp'],
    y=anomalies_df['kpi'],
    mode='markers',
    name='Anomalies',
    marker=dict(color='red', size=10, symbol='circle'),
    hoverinfo='x+y'
))

# Add dynamic threshold plot
fig.add_trace(go.Scatter(
    x=data['timestamp'],
    y=data['threshold'],
    mode='lines',
    name='Dynamic Threshold',
    line=dict(color='green', width=2, dash='dash')
))

# Update layout for better visualization
fig.update_layout(
    title='Anomaly Detection Using CA-CFAR',
    xaxis_title='Time',
    yaxis_title='KPI Value',
    legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
    template='plotly_white'
)

fig.show()


In [19]:
num_anomalies = np.sum(anomalies)
print(f"Number of anomalies: {num_anomalies}")

Number of anomalies: 246.0


In [23]:
# Update CA-CFAR parameters
REFERENCE_CELLS = 40  # Increase reference cells for better noise estimation
GUARD_CELLS = 10      # Increase guard cells to exclude nearby anomalies
FAR = 0.005           # Lower False Alarm Rate for stricter threshold
THRESHOLD_MARGIN = 1.5  # Additional multiplier to make the threshold stricter

# Reinitialize variables
thresholds = np.zeros(num_points)
anomalies = np.zeros(num_points)

# CA-CFAR processing with updated parameters
for i in range(num_points):
    # Define the sliding window
    start_idx = max(0, i - REFERENCE_CELLS - GUARD_CELLS)
    end_idx = min(num_points, i + REFERENCE_CELLS + GUARD_CELLS + 1)
    guard_start_idx = max(0, i - GUARD_CELLS)
    guard_end_idx = min(num_points, i + GUARD_CELLS + 1)

    # Exclude guard cells and the current cell
    reference_cells = np.concatenate([
        kpi_values[start_idx:guard_start_idx],
        kpi_values[guard_end_idx:end_idx]
    ])
    
    # Calculate the threshold based on the reference cells
    if len(reference_cells) > 0:
        noise_level = np.mean(reference_cells)
        threshold = noise_level * (1 + FAR) * THRESHOLD_MARGIN  # Add margin multiplier
        thresholds[i] = threshold

        # Detect anomalies
        if kpi_values[i] > threshold:
            anomalies[i] = 1

# Update the DataFrame with results
data['threshold'] = thresholds
data['anomaly'] = anomalies

# Count the number of anomalies
num_anomalies = np.sum(anomalies)
print(f"Number of anomalies: {num_anomalies}")


Number of anomalies: 59.0


In [24]:
def visualize_cfar_results(data, title="CA-CFAR Anomaly Detection Results"):
    """
    Visualizes KPI values, dynamic thresholds, and anomalies.

    Parameters:
        data (DataFrame): DataFrame with 'timestamp', 'kpi', 'threshold', and 'anomaly' columns.
        title (str): Title for the plot.
    """
    # Extract anomalies
    anomalies = data[data['anomaly'] == 1]

    # Create the main line plot for KPI values
    fig = go.Figure()

    # Add KPI line plot
    fig.add_trace(go.Scatter(
        x=data['timestamp'],
        y=data['kpi'],
        mode='lines',
        name='KPI',
        line=dict(color='blue', width=2)
    ))

    # Add anomalies as scatter points
    fig.add_trace(go.Scatter(
        x=anomalies['timestamp'],
        y=anomalies['kpi'],
        mode='markers',
        name='Anomalies',
        marker=dict(color='red', size=10, symbol='circle'),
        hoverinfo='x+y'
    ))

    # Add dynamic threshold plot
    fig.add_trace(go.Scatter(
        x=data['timestamp'],
        y=data['threshold'],
        mode='lines',
        name='Dynamic Threshold',
        line=dict(color='green', width=2, dash='dash')
    ))

    # Update layout for better visualization
    fig.update_layout(
        title=title,
        xaxis_title='Time',
        yaxis_title='KPI Value',
        legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
        template='plotly_white'
    )

    # Show the plot
    fig.show()


In [25]:
visualize_cfar_results(data, title="CA-CFAR Tuned Results")

In [26]:
# Update GO-CFAR parameters
REFERENCE_CELLS = 20  # Number of reference cells on each side
GUARD_CELLS = 1      # Guard cells to isolate the target
FAR = 0.005           # False alarm rate
THRESHOLD_MARGIN = 1.7  # Additional margin multiplier

# Reinitialize variables
thresholds = np.zeros(num_points)
anomalies = np.zeros(num_points)

# GO-CFAR processing
for i in range(num_points):
    # Define the sliding window
    start_idx = max(0, i - REFERENCE_CELLS - GUARD_CELLS)
    guard_start_idx = max(0, i - GUARD_CELLS)
    guard_end_idx = min(num_points, i + GUARD_CELLS + 1)
    end_idx = min(num_points, i + REFERENCE_CELLS + GUARD_CELLS + 1)

    # Split reference cells into two groups
    left_reference_cells = kpi_values[start_idx:guard_start_idx]
    right_reference_cells = kpi_values[guard_end_idx:end_idx]

    # Use the maximum average noise level from both groups
    max_noise_level = max(
        np.mean(left_reference_cells) if len(left_reference_cells) > 0 else 0,
        np.mean(right_reference_cells) if len(right_reference_cells) > 0 else 0
    )

    # Calculate the threshold
    threshold = max_noise_level * (1 + FAR) * THRESHOLD_MARGIN
    thresholds[i] = threshold

    # Detect anomalies
    if kpi_values[i] > threshold:
        anomalies[i] = 1

# Update the DataFrame with results
data['threshold'] = thresholds
data['anomaly'] = anomalies

# Count the number of anomalies
num_anomalies = np.sum(anomalies)
print(f"Number of anomalies: {num_anomalies}")


Number of anomalies: 10.0


In [27]:
anomalies = data[data['anomaly'] == 1]
anomaly_timestamps = anomalies['timestamp']
anomaly_values = anomalies['kpi']
print(anomaly_timestamps)
print(anomaly_values)

15    2024-01-01 00:15:00
40    2024-01-01 00:40:00
124   2024-01-01 02:04:00
140   2024-01-01 02:20:00
187   2024-01-01 03:07:00
192   2024-01-01 03:12:00
317   2024-01-01 05:17:00
358   2024-01-01 05:58:00
393   2024-01-01 06:33:00
451   2024-01-01 07:31:00
Name: timestamp, dtype: datetime64[ns]
15     0.733400
40     0.967942
124    0.738538
140    0.780887
187    0.895000
192    0.814958
317    0.738781
358    0.476092
393    0.759267
451    0.921141
Name: kpi, dtype: float64


In [28]:
visualize_cfar_results(data, title="GO-CFAR Anomaly Detection Results")

In [29]:
# Define the synthetic anomaly indices (ground truth)
synthetic_anomalies_indices = [15, 40, 124, 140, 187, 192, 317, 358, 393, 451]

# Declare the ground_truth variable
ground_truth = [1 if i in synthetic_anomalies_indices else 0 for i in range(len(data))]

# Add ground truth as a column to the data DataFrame
data['ground_truth'] = ground_truth

In [40]:
# Calculate the confusion matrix
cm = confusion_matrix(data['ground_truth'], data['anomaly'])

# Display the confusion matrix
print("Confusion Matrix:")
print(cm)

# Calculate accuracy
accuracy = (cm[0, 0] + cm[1, 1]) / np.sum(cm)
print(f"Accuracy: {accuracy:.2f}")

Confusion Matrix:
[[490   0]
 [  0  10]]
Accuracy: 1.00


In [33]:
data

Unnamed: 0,timestamp,kpi,threshold,anomaly,ground_truth
0,2024-01-01 00:00:00,0.249816,0.511875,0.0,0
1,2024-01-01 00:01:00,0.480286,0.496845,0.0,0
2,2024-01-01 00:02:00,0.392798,0.488907,0.0,0
3,2024-01-01 00:03:00,0.339463,0.623689,0.0,0
4,2024-01-01 00:04:00,0.162407,0.639491,0.0,0
...,...,...,...,...,...
495,2024-01-01 08:15:00,0.241341,0.635184,0.0,0
496,2024-01-01 08:16:00,0.333462,0.840789,0.0,0
497,2024-01-01 08:17:00,0.131094,0.844826,0.0,0
498,2024-01-01 08:18:00,0.489758,0.548362,0.0,0


# Test 2 

In [34]:
# Load the dataset
data2 = pd.read_csv('../data/synthetic_time_series_hard.csv')
data2['timestamp'] = pd.to_datetime(data2['timestamp'])

# Define the visualization function
def plot_time_series(data, title="Time-Series Data", show_anom=False):
    """
    Plots the time-series data using Plotly.
    
    Parameters:
        data (DataFrame): A DataFrame with 'timestamp' and 'kpi' columns.
        title (str): Title of the plot.
        show_anom (bool): If True, show anomalies with red dots.
    """
    fig = go.Figure()
    
    # Add the KPI line plot
    fig.add_trace(go.Scatter(
        x=data['timestamp'],
        y=data['kpi'],
        mode='lines',
        name='KPI',
        line=dict(color='blue', width=2)
    ))
    
    # Add anomalies as scatter points if show_anom is True
    if show_anom and 'anomaly_true' in data.columns:
        anomalies = data[data['anomaly_true'] == 1]
        fig.add_trace(go.Scatter(
            x=anomalies['timestamp'],
            y=anomalies['kpi'],
            mode='markers',
            name='Anomalies',
            marker=dict(color='red', size=10, symbol='circle'),
            hoverinfo='x+y'
        ))
    
    # Update layout
    fig.update_layout(
        title=title,
        xaxis_title='Time',
        yaxis_title='KPI Value',
        template='plotly_white'
    )
    
    fig.show()

# Visualize the dataset
plot_time_series(data2, title="Synthetic Time-Series with Harder Anomalies")


In [35]:
data2

Unnamed: 0,timestamp,kpi
0,2024-01-01 00:00:00,0.249816
1,2024-01-01 00:01:00,0.480286
2,2024-01-01 00:02:00,0.392798
3,2024-01-01 00:03:00,0.339463
4,2024-01-01 00:04:00,0.962444
...,...,...
1435,2024-01-01 23:55:00,0.111989
1436,2024-01-01 23:56:00,0.223172
1437,2024-01-01 23:57:00,0.381872
1438,2024-01-01 23:58:00,0.180741


## Model

In [36]:
def ca_cfar(data_set=None, ref_cells=20, guard_cells=1, far=0.005, margin=1.7):
    if data_set is None:
        print("There has to be a data set on argument")
        return None
    
    kpi_values = data_set['kpi'].values
    num_points = len(kpi_values)
    thresholds = np.zeros(num_points)
    anomalies = np.zeros(num_points)

    for i in range(num_points):
        start_idx = max(0, i - ref_cells - guard_cells)
        guard_start_idx = max(0, i - guard_cells)
        guard_end_idx = min(num_points, i + guard_cells + 1)
        end_idx = min(num_points, i + ref_cells + guard_cells + 1)

        left_reference_cells = kpi_values[start_idx:guard_start_idx]
        right_reference_cells = kpi_values[guard_end_idx:end_idx]

        max_noise_level = max(
            np.mean(left_reference_cells) if len(left_reference_cells) > 0 else 0,
            np.mean(right_reference_cells) if len(right_reference_cells) > 0 else 0
        )

        threshold = max_noise_level * (1 + far) * margin
        thresholds[i] = threshold

        if kpi_values[i] > threshold:
            anomalies[i] = 1

    data_set['threshold'] = thresholds
    data_set['anomaly'] = anomalies
    return data_set

# Apply the function to data2
data2 = ca_cfar(data2, ref_cells=10, guard_cells=1, margin=1.69)

# Count the number of anomalies
num_anomalies = np.sum(data2['anomaly'])
print(f"Number of anomalies: {num_anomalies}")

# Visualize the results using visualize cfar function 
visualize_cfar_results(data2)

Number of anomalies: 99.0


In [37]:
data2

Unnamed: 0,timestamp,kpi,threshold,anomaly
0,2024-01-01 00:00:00,0.249816,0.834709,0.0
1,2024-01-01 00:01:00,0.480286,0.841533,0.0
2,2024-01-01 00:02:00,0.392798,0.815287,0.0
3,2024-01-01 00:03:00,0.339463,0.681158,0.0
4,2024-01-01 00:04:00,0.962444,0.683021,1.0
...,...,...,...,...
1435,2024-01-01 23:55:00,0.111989,0.614267,0.0
1436,2024-01-01 23:56:00,0.223172,0.637435,0.0
1437,2024-01-01 23:57:00,0.381872,0.627362,0.0
1438,2024-01-01 23:58:00,0.180741,0.594488,0.0


# Test 3

In [41]:
# Generate synthetic time-series data for 24 hours (1440 minutes)
time_steps = pd.date_range(start='2024-01-02', periods=1440, freq='min')  # 24 hours
kpi_values = np.random.uniform(0.3, 0.7, size=len(time_steps))  # Normal KPI values

# Introduce 5 harder-to-detect anomalies
anomaly_indices = random.sample(range(len(kpi_values)), 25)
kpi_values[anomaly_indices] = np.random.uniform(0.71, 0.8, size=len(anomaly_indices))  # Slightly higher than normal

# Create a DataFrame with true anomaly labels
synthetic_data_hard_5 = pd.DataFrame({
    'timestamp': time_steps,
    'kpi': kpi_values,
    'anomaly_true': [1 if i in anomaly_indices else 0 for i in range(len(time_steps))]
})

# Save the DataFrame to a CSV file
file_path = '../data/synthetic_time_series_hard_5.csv'
synthetic_data_hard_5.to_csv(file_path, index=False)

print("File created at:", file_path)


File created at: ../data/synthetic_time_series_hard_5.csv


In [44]:
data5 = pd.read_csv('../data/synthetic_time_series_hard_5.csv') 
plot_time_series(data5, title="Synthetic Time-Series with Harder Anomalies", show_anom=True)

In [45]:
data_5_cacfar = ca_cfar(data5, margin=1)
visualize_cfar_results(data_5_cacfar, title="CA-CFAR Anomaly Detection Results")

In [46]:
def calculate_accuracy(data):
    """
    Calculate the accuracy between the columns 'anomaly_true' and 'anomaly'.

    Parameters:
        data (DataFrame): DataFrame containing 'anomaly_true' and 'anomaly' columns.

    Returns:
        float: Accuracy value.
    """
    correct_predictions = (data['anomaly_true'] == data['anomaly']).sum()
    total_predictions = len(data)
    accuracy = correct_predictions / total_predictions
    return accuracy

# Calculate accuracy for data5
accuracy = calculate_accuracy(data5)
print(f"Accuracy: {accuracy}")

Accuracy: 0.5659722222222222


In [None]:
def grid_search_cafar(data, ref_cells_range, guard_cells_range, far_range, margin_range):
    """
    Effectue une recherche en grille pour optimiser les paramètres du modèle CA-CFAR avec une barre de progression.

    Parameters:
        data (DataFrame): Jeu de données contenant les colonnes 'kpi' et 'anomaly_true'.
        ref_cells_range (list): Liste des valeurs pour REFERENCE_CELLS.
        guard_cells_range (list): Liste des valeurs pour GUARD_CELLS.
        far_range (list): Liste des valeurs pour FAR.
        margin_range (list): Liste des valeurs pour THRESHOLD_MARGIN.

    Returns:
        dict: Meilleurs paramètres et accuracy correspondante.
    """
    best_accuracy = 0
    best_params = {}

    # Liste des combinaisons de paramètres
    param_combinations = list(product(ref_cells_range, guard_cells_range, far_range, margin_range))

    # Ajouter une barre de progression
    for ref_cells, guard_cells, far, margin in tqdm(param_combinations, desc="Grid Search Progress", unit="combination"):
        # Appliquer le modèle CA-CFAR avec les paramètres actuels
        data_cfar = ca_cfar(data.copy(), ref_cells=ref_cells, guard_cells=guard_cells, far=far, margin=margin)
        
        # Calculer l'accuracy
        correct_predictions = (data_cfar['anomaly_true'] == data_cfar['anomaly']).sum()
        total_predictions = len(data_cfar)
        accuracy = correct_predictions / total_predictions

        # Si l'accuracy est meilleure, sauvegarder les paramètres
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_params = {
                "REFERENCE_CELLS": ref_cells,
                "GUARD_CELLS": guard_cells,
                "FAR": far,
                "MARGIN": margin
            }

    return {"best_params": best_params, "best_accuracy": best_accuracy}


In [49]:
# Définir l'espace de recherche
ref_cells_range = range(10, 15, 10)  
guard_cells_range = range(1, 15)      
far_range = [0.001, 0.003, 0.005, 0.01]
margin_range = [round(x, 2) for x in np.arange(1.0, 2.0, 0.05)]

# Appliquer la recherche en grille
results = grid_search_cafar(data5, ref_cells_range, guard_cells_range, far_range, margin_range)

# Afficher les résultats
print("Meilleurs paramètres :")
print(results["best_params"])
print(f"Meilleure accuracy : {results['best_accuracy']:.4f}")

Grid Search Progress: 100%|██████████| 1120/1120 [00:10<00:00, 103.07combination/s]

Meilleurs paramètres :
{'REFERENCE_CELLS': 10, 'GUARD_CELLS': 14, 'FAR': 0.005, 'MARGIN': 1.45}
Meilleure accuracy : 0.9896





In [50]:
# Rename keys to match the function's parameters
adjusted_params = {
    'ref_cells': results['best_params']['REFERENCE_CELLS'],
    'guard_cells': results['best_params']['GUARD_CELLS'],
    'far': results['best_params']['FAR'],
    'margin': results['best_params']['MARGIN'],
}

# Apply the CA-CFAR model with the adjusted parameters
data5_cfar_best = ca_cfar(data5.copy(), **adjusted_params)

# Visualize the results
visualize_cfar_results(data5_cfar_best, title="CA-CFAR Anomaly Detection Results (Best Parameters)")

In [52]:
def objective(trial):
    ref_cells = trial.suggest_int("ref_cells", 0, 100)
    guard_cells = trial.suggest_int("guard_cells", 0, 100)
    far = trial.suggest_float("far", 0.0005, 0.01)
    margin = trial.suggest_float("margin", 1.0, 2.0)
    
    data_cfar = ca_cfar(data5.copy(), ref_cells=ref_cells, guard_cells=guard_cells, far=far, margin=margin)
    correct_predictions = (data_cfar['anomaly_true'] == data_cfar['anomaly']).sum()
    total_predictions = len(data_cfar)
    return correct_predictions / total_predictions

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=300)

print("Meilleurs paramètres :", study.best_params)
print(f"Meilleure accuracy : {study.best_value:.5f}")

[I 2025-03-03 22:32:46,406] A new study created in memory with name: no-name-28e22ce5-1afc-41d7-8f57-37381ce53fbd


[I 2025-03-03 22:32:46,437] Trial 0 finished with value: 0.575 and parameters: {'ref_cells': 66, 'guard_cells': 37, 'far': 0.003744056909040859, 'margin': 1.0232606817511538}. Best is trial 0 with value: 0.575.
[I 2025-03-03 22:32:46,472] Trial 1 finished with value: 0.9902777777777778 and parameters: {'ref_cells': 35, 'guard_cells': 47, 'far': 0.004462591410710941, 'margin': 1.4740881124424774}. Best is trial 1 with value: 0.9902777777777778.
[I 2025-03-03 22:32:46,491] Trial 2 finished with value: 0.9826388888888888 and parameters: {'ref_cells': 4, 'guard_cells': 88, 'far': 0.003676439618630518, 'margin': 1.9031675786627362}. Best is trial 1 with value: 0.9902777777777778.
[I 2025-03-03 22:32:46,502] Trial 3 finished with value: 0.9881944444444445 and parameters: {'ref_cells': 11, 'guard_cells': 83, 'far': 0.0050188267313081274, 'margin': 1.4933497933319444}. Best is trial 1 with value: 0.9902777777777778.
[I 2025-03-03 22:32:46,522] Trial 4 finished with value: 0.6923611111111111 an

Meilleurs paramètres : {'ref_cells': 100, 'guard_cells': 38, 'far': 0.004260663498173623, 'margin': 1.4073432099780447}
Meilleure accuracy : 0.99792


In [53]:
# Apply the CA-CFAR model with the best parameters with Optuna
data5_cfar_optuna = ca_cfar(data5.copy(), **study.best_params)

# Visualize the results
visualize_cfar_results(data5_cfar_optuna, title="CA-CFAR Anomaly Detection Results (Optuna)")

# Generation


In [54]:
def generate_synthetic_data(output_path, num_anomalies=10, num_minutes=1440, kpi_range=(0.3, 0.7), anomaly_range=(0.701, 0.75)):
    """
    Génère un jeu de données synthétique avec anomalies et l'enregistre dans un fichier CSV.

    Parameters:
        output_path (str): Chemin où le fichier CSV sera sauvegardé.
        num_anomalies (int): Nombre d'anomalies à inclure dans les données.
        num_minutes (int): Nombre total de points de données (minutes).
        kpi_range (tuple): Intervalle des valeurs normales du KPI.
        anomaly_range (tuple): Intervalle des valeurs d'anomalies.
    """
    # Génération des timestamps
    time_steps = pd.date_range(start='2024-01-01', periods=num_minutes, freq='min')
    
    # Génération des valeurs normales du KPI
    kpi_values = np.random.uniform(kpi_range[0], kpi_range[1], size=len(time_steps))
    
    # Introduire des anomalies
    anomaly_indices = random.sample(range(len(kpi_values)), num_anomalies)
    kpi_values[anomaly_indices] = np.random.uniform(anomaly_range[0], anomaly_range[1], size=num_anomalies)
    
    # Création du DataFrame
    data = pd.DataFrame({
        'timestamp': time_steps,
        'kpi': kpi_values,
        'anomaly_true': [1 if i in anomaly_indices else 0 for i in range(len(time_steps))]
    })
    
    # Enregistrement dans un fichier CSV
    data.to_csv(output_path, index=False)
    print(f"Données synthétiques générées et sauvegardées dans : {output_path}")

# Exemple d'utilisation
generate_synthetic_data('../data/generated_data.csv', num_anomalies=100)

Données synthétiques générées et sauvegardées dans : ../data/generated_data.csv


In [55]:
# Load the generated data and plot 
generated_data = pd.read_csv('../data/generated_data.csv')
plot_time_series(generated_data, title="Generated Time-Series Data", show_anom=True)

In [56]:
# Search for the best parameters using Optuna
def optuna_search(data, n_trials=300):
    """
    Search for the best parameters using Optuna.

    Parameters:
        data (DataFrame): The dataset containing 'kpi' and 'anomaly_true' columns.
        n_trials (int): Number of trials for the optimization.

    Returns:
        dict: Best parameters and best accuracy.
    """
    def objective(trial):
        ref_cells = trial.suggest_int("ref_cells", 0, 100)
        guard_cells = trial.suggest_int("guard_cells", 0, 100)
        far = trial.suggest_float("far", 0.0005, 0.01)
        margin = trial.suggest_float("margin", 1.0, 2.0)
        
        data_cfar = ca_cfar(data.copy(), ref_cells=ref_cells, guard_cells=guard_cells, far=far, margin=margin)
        correct_predictions = (data_cfar['anomaly_true'] == data_cfar['anomaly']).sum()
        total_predictions = len(data_cfar)
        return correct_predictions / total_predictions

    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials, callbacks=[lambda study, trial: study.stop() if study.best_value == 1.0 else None])

    return {"best_params": study.best_params, "best_accuracy": study.best_value}


In [57]:
# Example usage
results = optuna_search(generated_data, n_trials=500)
print("Meilleurs paramètres :", results["best_params"])
print(f"Meilleure accuracy : {results['best_accuracy']:.5f}")

[I 2025-03-03 22:33:25,439] A new study created in memory with name: no-name-e8a22ff4-8b85-4429-92c8-6288f579fcad
[I 2025-03-03 22:33:25,457] Trial 0 finished with value: 0.9298611111111111 and parameters: {'ref_cells': 3, 'guard_cells': 99, 'far': 0.0020844545440869126, 'margin': 1.8268631854080135}. Best is trial 0 with value: 0.9298611111111111.


[I 2025-03-03 22:33:25,475] Trial 1 finished with value: 0.9305555555555556 and parameters: {'ref_cells': 43, 'guard_cells': 93, 'far': 0.0066440911159860675, 'margin': 1.9553917244497496}. Best is trial 1 with value: 0.9305555555555556.
[I 2025-03-03 22:33:25,497] Trial 2 finished with value: 0.9305555555555556 and parameters: {'ref_cells': 62, 'guard_cells': 0, 'far': 0.0023171018691767738, 'margin': 1.9384870839244888}. Best is trial 1 with value: 0.9305555555555556.
[I 2025-03-03 22:33:25,517] Trial 3 finished with value: 0.9638888888888889 and parameters: {'ref_cells': 40, 'guard_cells': 42, 'far': 0.0006474877026150438, 'margin': 1.3795071955186111}. Best is trial 3 with value: 0.9638888888888889.
[I 2025-03-03 22:33:25,532] Trial 4 finished with value: 0.9305555555555556 and parameters: {'ref_cells': 47, 'guard_cells': 34, 'far': 0.008233131729529499, 'margin': 1.6748874311680921}. Best is trial 3 with value: 0.9638888888888889.
[I 2025-03-03 22:33:25,549] Trial 5 finished with 

Meilleurs paramètres : {'ref_cells': 100, 'guard_cells': 94, 'far': 0.002308384445260484, 'margin': 1.3418950043440918}
Meilleure accuracy : 0.99444


In [58]:
# Apply the CA-CFAR model with the best parameters and visualize the results
generated_data_cfar_optuna = ca_cfar(generated_data.copy(), **results["best_params"])

# Visualize the results
visualize_cfar_results(generated_data_cfar_optuna, title="CA-CFAR Anomaly Detection Results (Optuna)")

# Test and Train Set

In [60]:
def train_test_cafar(data, n_trials=300, test_size=0.2, random_state=42):
    """
    Split the dataset into train and test sets, optimize CA-CFAR parameters on the train set, 
    and evaluate on the test set.

    Parameters:
        data (DataFrame): The dataset containing 'kpi' and 'anomaly_true' columns.
        n_trials (int): Number of trials for the optimization.
        test_size (float): Fraction of the data to be used for testing.
        random_state (int): Seed for reproducibility.

    Returns:
        dict: Results including best parameters, train accuracy, and test accuracy.
    """
    # Split the dataset into train and test sets
    train_data, test_data = train_test_split(data, test_size=test_size, random_state=random_state)

    # Optimize parameters using Optuna on the train set
    def objective(trial):
        ref_cells = trial.suggest_int("ref_cells", 0, 100)
        guard_cells = trial.suggest_int("guard_cells", 0, 100)
        far = trial.suggest_float("far", 0.0005, 0.01)
        margin = trial.suggest_float("margin", 1.0, 2.0)
        
        train_cfar = ca_cfar(train_data.copy(), ref_cells=ref_cells, guard_cells=guard_cells, far=far, margin=margin)
        correct_predictions = (train_cfar['anomaly_true'] == train_cfar['anomaly']).sum()
        total_predictions = len(train_cfar)
        return correct_predictions / total_predictions

    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials, callbacks=[lambda study, trial: study.stop() if study.best_value == 1.0 else None])

    best_params = study.best_params
    best_train_accuracy = study.best_value

    # Evaluate on the test set using the best parameters
    test_cfar = ca_cfar(test_data.copy(), **best_params)
    correct_predictions_test = (test_cfar['anomaly_true'] == test_cfar['anomaly']).sum()
    total_predictions_test = len(test_cfar)
    test_accuracy = correct_predictions_test / total_predictions_test

    return {
        "best_params": best_params,
        "best_train_accuracy": best_train_accuracy,
        "test_accuracy": test_accuracy,
        "train_data": train_data,
        "test_data": test_cfar
    }

def visualize_cfar_results_fixed(data, title="CA-CFAR Anomaly Detection Results"):
    """
    Visualizes KPI values, dynamic thresholds, and anomalies with enhanced readability.

    Parameters:
        data (DataFrame): DataFrame with 'timestamp', 'kpi', 'threshold', and 'anomaly' columns.
        title (str): Title for the plot.
    """
    # Ensure the data is sorted by timestamp
    data = data.sort_values(by="timestamp")

    # Extract anomalies
    anomalies = data[data['anomaly'] == 1]

    # Create the main line plot for KPI values
    fig = go.Figure()

    # Add KPI line plot
    fig.add_trace(go.Scatter(
        x=data['timestamp'],
        y=data['kpi'],
        mode='lines',
        name='KPI',
        line=dict(color='blue', width=1),
        opacity=0.8  # Add transparency to reduce clutter
    ))

    # Add anomalies as scatter points
    fig.add_trace(go.Scatter(
        x=anomalies['timestamp'],
        y=anomalies['kpi'],
        mode='markers',
        name='Anomalies',
        marker=dict(color='red', size=8, symbol='circle'),
        hoverinfo='x+y'
    ))

    # Add dynamic threshold plot
    fig.add_trace(go.Scatter(
        x=data['timestamp'],
        y=data['threshold'],
        mode='lines',
        name='Dynamic Threshold',
        line=dict(color='green', width=2, dash='dash')
    ))

    # Update layout for better visualization
    fig.update_layout(
        title=title,
        xaxis_title='Time',
        yaxis_title='KPI Value',
        yaxis=dict(range=[data['kpi'].min() - 0.1, data['kpi'].max() + 0.1]),  # Adjust y-axis
        legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
        template='plotly_white'
    )

    # Show the plot
    fig.show()


In [61]:
# Execute the train-test CA-CFAR process
results_split = train_test_cafar(generated_data, n_trials=300)

# Display results
print("Meilleurs paramètres :", results_split["best_params"])
print(f"Accuracy sur l'ensemble d'entraînement : {results_split['best_train_accuracy']:.5f}")
print(f"Accuracy sur l'ensemble de test : {results_split['test_accuracy']:.5f}")

# Visualize the test results
visualize_cfar_results_fixed(results_split["test_data"], title="CA-CFAR Test Set Results")


[I 2025-03-03 22:33:57,424] A new study created in memory with name: no-name-a1843c55-b9ee-4d67-b3d0-6fdbfb629aa3
[I 2025-03-03 22:33:57,436] Trial 0 finished with value: 0.9722222222222222 and parameters: {'ref_cells': 43, 'guard_cells': 83, 'far': 0.005849633779940218, 'margin': 1.3615216991039443}. Best is trial 0 with value: 0.9722222222222222.
[I 2025-03-03 22:33:57,446] Trial 1 finished with value: 0.9348958333333334 and parameters: {'ref_cells': 85, 'guard_cells': 68, 'far': 0.004614365454535255, 'margin': 1.5007407536304456}. Best is trial 0 with value: 0.9722222222222222.
[I 2025-03-03 22:33:57,457] Trial 2 finished with value: 0.9348958333333334 and parameters: {'ref_cells': 45, 'guard_cells': 93, 'far': 0.0011487285546796684, 'margin': 1.7319498634559163}. Best is trial 0 with value: 0.9722222222222222.
[I 2025-03-03 22:33:57,466] Trial 3 finished with value: 0.9288194444444444 and parameters: {'ref_cells': 1, 'guard_cells': 97, 'far': 0.009607505118349735, 'margin': 1.85632

Meilleurs paramètres : {'ref_cells': 73, 'guard_cells': 6, 'far': 0.0035246599795660883, 'margin': 1.3494764675685749}
Accuracy sur l'ensemble d'entraînement : 0.99219
Accuracy sur l'ensemble de test : 0.96181


In [62]:
def visualize_train_test_results(train_data, test_data, train_title="Train Set Results", test_title="Test Set Results"):
    """
    Visualizes the train and test datasets with their respective CA-CFAR results.

    Parameters:
        train_data (DataFrame): DataFrame containing 'timestamp', 'kpi', 'threshold', and 'anomaly' columns for the train set.
        test_data (DataFrame): DataFrame containing 'timestamp', 'kpi', 'threshold', and 'anomaly' columns for the test set.
        train_title (str): Title for the train set plot.
        test_title (str): Title for the test set plot.
    """
    # Ensure the train and test data are sorted by timestamp
    train_data = train_data.sort_values(by="timestamp")
    test_data = test_data.sort_values(by="timestamp")

    # Create the figure for the train set
    train_fig = go.Figure()

    # Add KPI line for the train set
    train_fig.add_trace(go.Scatter(
        x=train_data['timestamp'],
        y=train_data['kpi'],
        mode='lines',
        name='Train KPI',
        line=dict(color='blue', width=1.5),
        opacity=0.8
    ))

    # Add anomalies for the train set
    train_anomalies = train_data[train_data['anomaly'] == 1]
    train_fig.add_trace(go.Scatter(
        x=train_anomalies['timestamp'],
        y=train_anomalies['kpi'],
        mode='markers',
        name='Train Anomalies',
        marker=dict(color='red', size=8, symbol='circle'),
        hoverinfo='x+y'
    ))

    # Add dynamic threshold for the train set
    train_fig.add_trace(go.Scatter(
        x=train_data['timestamp'],
        y=train_data['threshold'],
        mode='lines',
        name='Train Threshold',
        line=dict(color='green', width=2, dash='dash')
    ))

    # Update layout for train plot
    train_fig.update_layout(
        title=train_title,
        xaxis_title='Time',
        yaxis_title='KPI Value',
        template='plotly_white',
        legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1)
    )

    # Create the figure for the test set
    test_fig = go.Figure()

    # Add KPI line for the test set
    test_fig.add_trace(go.Scatter(
        x=test_data['timestamp'],
        y=test_data['kpi'],
        mode='lines',
        name='Test KPI',
        line=dict(color='purple', width=1.5),
        opacity=0.8
    ))

    # Add anomalies for the test set
    test_anomalies = test_data[test_data['anomaly'] == 1]
    test_fig.add_trace(go.Scatter(
        x=test_anomalies['timestamp'],
        y=test_anomalies['kpi'],
        mode='markers',
        name='Test Anomalies',
        marker=dict(color='orange', size=8, symbol='circle'),
        hoverinfo='x+y'
    ))

    # Add dynamic threshold for the test set
    test_fig.add_trace(go.Scatter(
        x=test_data['timestamp'],
        y=test_data['threshold'],
        mode='lines',
        name='Test Threshold',
        line=dict(color='green', width=2, dash='dash')
    ))

    # Update layout for test plot
    test_fig.update_layout(
        title=test_title,
        xaxis_title='Time',
        yaxis_title='KPI Value',
        template='plotly_white',
        legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1)
    )

    # Show both plots
    train_fig.show()
    test_fig.show()


In [63]:
results_split["train_data"] = ca_cfar(results_split["train_data"], **results_split["best_params"])


In [64]:
visualize_train_test_results(
    results_split["train_data"], 
    results_split["test_data"], 
    train_title="CA-CFAR Results on Train Set", 
    test_title="CA-CFAR Results on Test Set"
)

In [65]:
def adjust_test_timestamps(train_data, test_data):
    """
    Adjust the timestamps of the test set to maintain continuity with the train set.
    
    Parameters:
        train_data (DataFrame): The train dataset containing the 'timestamp' column.
        test_data (DataFrame): The test dataset containing the 'timestamp' column.

    Returns:
        DataFrame: The adjusted test dataset.
    """
    # Convert timestamp column to datetime if it's not already
    train_data['timestamp'] = pd.to_datetime(train_data['timestamp'])
    test_data['timestamp'] = pd.to_datetime(test_data['timestamp'])
    
    # Get the last timestamp from the train set
    last_train_timestamp = train_data['timestamp'].max()
    
    # Calculate the offset between the last train timestamp and the first test timestamp
    time_delta = pd.Timedelta(minutes=1)  # Assuming data frequency is 1 minute
    
    # Adjust the test timestamps to start after the last train timestamp
    test_data = test_data.copy()
    test_data['timestamp'] = [last_train_timestamp + (i + 1) * time_delta for i in range(len(test_data))]
    
    return test_data

results_split["test_data"] = adjust_test_timestamps(results_split["train_data"], results_split["test_data"])

visualize_train_test_results(
    results_split["train_data"], 
    results_split["test_data"], 
    train_title="CA-CFAR Results on Train Set", 
    test_title="CA-CFAR Results on Test Set with Adjusted Timestamps"
)
