In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.neighbors import LocalOutlierFactor
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
import os

In [None]:
# Set workspace
workspace_path = r"C:\Users\kaefer\Desktop\Schulemappe\Master\MasterEBusiness\E Business Seminar\NAB-master"
os.chdir(workspace_path)
print(f"Current working directory: {os.getcwd()}")

In [None]:
# Load Time Series Dataset from CSV
path = "data//artificialNoAnomaly//art_daily_small_noise.csv"
df = pd.read_csv(path, parse_dates=["timestamp"])


In [None]:
df["timestamp"] = pd.to_datetime(df["timestamp"])
df.set_index("timestamp", inplace=True)

In [None]:
scaler = MinMaxScaler()
df["scaled_value"] = scaler.fit_transform(df[["value"]])


In [None]:
sequence_length = 10
X, y = [], []
for i in range(len(df) - sequence_length):
    X.append(df["scaled_value"].iloc[i:i + sequence_length].values)
    y.append(df["scaled_value"].iloc[i + sequence_length])

X, y = np.array(X), np.array(y)

In [None]:
model = Sequential([
    LSTM(50, activation="relu", return_sequences=False, input_shape=(sequence_length, 1)),
    Dense(1)
])
model.compile(optimizer="adam", loss="mse")
model.fit(X, y, epochs=50, batch_size=8, verbose=0)


In [None]:
predictions = model.predict(X)
df["forecast"] = np.nan
df.iloc[sequence_length:, df.columns.get_loc("forecast")] = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()


In [None]:
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
df.dropna(subset=["forecast"], inplace=True)
df["anomaly"] = lof.fit_predict(df[["forecast"]]) == -1

In [None]:
#  Anomaly Mitigation  Interpolation**
df["smoothed_value"] = df["value"]
df.loc[df["anomaly"], "smoothed_value"] = np.nan
df["smoothed_value"] = df["smoothed_value"].interpolate(method="time")

In [None]:
# Plot 
plt.figure(figsize=(12, 6))
plt.plot(df.index, df["value"], label="Original Values", color="blue", alpha=0.6)
plt.plot(df.index, df["forecast"], label="LSTM Forecast", linestyle="dashed", color="green", alpha=0.6, linewidth=0.7)
plt.scatter(df.index[df["anomaly"]], df["forecast"][df["anomaly"]], color="red", label="Anomalies", marker="x")
plt.plot(df.index, df["smoothed_value"], label="Interpolated Forecast", color="orange", linewidth=0.5)
plt.title("Trend and Anomalies Detection using LSTM_LOF_Interpolation")
plt.xlabel("Timestamp")
plt.ylabel("Value")
plt.legend()
plt.xticks(rotation=45)
plt.show()


In [None]:
# Statistics on Anomalies
original_anomalies_count = df["anomaly"].sum()
print(f"Original Anomalies Detected: {original_anomalies_count}")
original_variance = np.var(df["value"])
smoothed_variance = np.var(df["smoothed_value"])
print(f"Original Variance: {original_variance:.4f}")
print(f"Smoothed Variance: {smoothed_variance:.4f}")
print(f"Variance Reduction: {original_variance - smoothed_variance:.4f}")