# LOF 

In [2]:
import numpy as np
import pandas as pd
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler

# Load data
train_data = np.load("/home/haoqian/anomaly/Anomaly-TransformerX/dataset/SMD/SMD_train.npy")
test_data = np.load("/home/haoqian/anomaly/Anomaly-TransformerX/dataset/SMD/SMD_test.npy")
test_labels = np.load("/home/haoqian/anomaly/ml/fusion/SMD_test_label.npy")

# Function to preprocess data with scaling and time series features
def preprocess_data(data):
    # Standardization
    scaler = StandardScaler()
    data_scaled = scaler.fit_transform(data)

    # Convert to DataFrame for feature extraction
    df = pd.DataFrame(data_scaled)

    # Add time series features
    rolling_mean = df.rolling(window=5).mean().fillna(0)  # Rolling mean
    rolling_std = df.rolling(window=5).std().fillna(0)    # Rolling std
    df_features = pd.concat([df, rolling_mean.add_suffix('_mean'), rolling_std.add_suffix('_std')], axis=1)

    return df_features.values

# Preprocess training and test data
train_data_processed = preprocess_data(train_data)
test_data_processed = preprocess_data(test_data)

# Initialize and fit Local Outlier Factor
lof = LocalOutlierFactor()
lof.fit(train_data_processed)

# Predict anomalies in the test set
test_predictions = lof.fit_predict(test_data_processed)
test_predictions = np.where(test_predictions == -1, 1, 0)

# Evaluate the model
precision = precision_score(test_labels, test_predictions)
recall = recall_score(test_labels, test_predictions)
f1 = f1_score(test_labels, test_predictions)
accuracy = accuracy_score(test_labels, test_predictions)
auc_roc = roc_auc_score(test_labels, test_predictions)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC ROC Score: {auc_roc:.4f}")
print(f"Accuracy: {accuracy:.4f}")


Precision: 0.1479
Recall: 0.0826
F1 Score: 0.1060
AUC ROC Score: 0.5310
Accuracy: 0.9421


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler

# Load data
train_data = np.load("/home/haoqian/anomaly/ml/fusion/fusion_train.npy")
test_data = np.load("/home/haoqian/anomaly/ml/fusion/fusion_test.npy")
test_labels = np.load("/home/haoqian/anomaly/ml/fusion/SMD_test_label.npy")

# Function to preprocess data with scaling and time series features
def preprocess_data(data):
    # Standardization
    scaler = StandardScaler()
    data_scaled = scaler.fit_transform(data)

    # Convert to DataFrame for feature extraction
    df = pd.DataFrame(data_scaled)

    # Add time series features
    rolling_mean = df.rolling(window=5).mean().fillna(0)  # Rolling mean
    rolling_std = df.rolling(window=5).std().fillna(0)    # Rolling std
    df_features = pd.concat([df, rolling_mean.add_suffix('_mean'), rolling_std.add_suffix('_std')], axis=1)

    return df_features.values

# Preprocess training and test data
train_data_processed = preprocess_data(train_data)
test_data_processed = preprocess_data(test_data)

# Initialize and fit Local Outlier Factor
lof = LocalOutlierFactor()
lof.fit(train_data_processed)

# Predict anomalies in the test set
test_predictions = lof.fit_predict(test_data_processed)
test_predictions = np.where(test_predictions == -1, 1, 0)

# Evaluate the model
precision = precision_score(test_labels, test_predictions)
recall = recall_score(test_labels, test_predictions)
f1 = f1_score(test_labels, test_predictions)
accuracy = accuracy_score(test_labels, test_predictions)
auc_roc = roc_auc_score(test_labels, test_predictions)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC ROC Score: {auc_roc:.4f}")
print(f"Accuracy: {accuracy:.4f}")

# Calculate anomaly scores
test_scores = -lof.decision_function(test_data_processed)  # Convert to positive scores
threshold = np.quantile(test_scores, 0.99)  # Determine threshold

# Visualization of anomaly detection results
plt.figure(figsize=(14, 6))
plt.plot(test_scores[:1000], label='Anomaly Score', color='blue', alpha=0.7)
plt.scatter(np.where(test_labels[:1000] == 1)[0], 
            test_scores[test_labels[:1000] == 1], 
            color='red', label='True Anomalies', s=30, alpha=0.7)
plt.axhline(y=threshold, color='green', linestyle='--', 
            linewidth=1.5, label='Threshold')
plt.title('Anomaly Detection Results (First 1000 Samples)', fontsize=14)
plt.xlabel('Time Step', fontsize=12)
plt.ylabel('Anomaly Score', fontsize=12)
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
