In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow import keras
import tensorflow as tf
from sklearn.preprocessing import StandardScaler

import plotly.graph_objects as go

np.random.seed(1)
tf.random.set_seed(1)

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout, RepeatVector, TimeDistributed

df = pd.read_csv('AA.csv')
df = df[['Date', 'Close']]
df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
df = df.dropna(subset=['Date'])

print(df['Date'].min(), df['Date'].max())
cutoff_date = '2015-01-01'

train = df.loc[df['Date'] < cutoff_date]
test = df.loc[df['Date'] >= cutoff_date]

train.shape, test.shape

scaler = StandardScaler()
scaler = scaler.fit(train[['Close']])

train['Close'] = scaler.transform(train[['Close']])
test['Close'] = scaler.transform(test[['Close']])
TIME_STEPS=30

def create_sequences(X, y, time_steps=TIME_STEPS):
    Xs, ys = [], []
    for i in range(len(X)-time_steps):
        Xs.append(X.iloc[i:(i+time_steps)].values)
        ys.append(y.iloc[i+time_steps])

    return np.array(Xs), np.array(ys)

X_train, y_train = create_sequences(train[['Close']], train['Close'])
X_test, y_test = create_sequences(test[['Close']], test['Close'])

print(f'Training shape: {X_train.shape}')
print(f'Testing shape: {X_test.shape}')

model = Sequential()
model.add(LSTM(128, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dropout(rate=0.2))
model.add(RepeatVector(X_train.shape[1]))
model.add(LSTM(128, return_sequences=True))
model.add(Dropout(rate=0.2))
model.add(TimeDistributed(Dense(X_train.shape[2])))
model.compile(optimizer='adam', loss='mae')
model.summary()

history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.1,
                    callbacks=[keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, mode='min')], shuffle=False)

model.evaluate(X_test, y_test)


In [None]:
y_pred = model.predict(X_test)

y_pred_inverse = scaler.inverse_transform(y_pred.reshape(-1, 1))
y_test_inverse = scaler.inverse_transform(y_test.reshape(-1, 1))

plt.figure(figsize=(14, 7))
plt.plot(y_test_inverse, label='Original Close Prices', color='blue')
plt.plot(y_pred_inverse, label='Reconstructed Close Prices', color='orange')
plt.title('Original vs Reconstructed Close Prices')
plt.xlabel('Time Steps')
plt.ylabel('Close Price')
plt.legend()
plt.show()


In [None]:
X_test_pred = model.predict(X_test)

reconstruction_error = np.mean(np.abs(X_test_pred - X_test), axis=1)

threshold = np.percentile(reconstruction_error, 95)

anomalies = reconstruction_error > threshold

plt.figure(figsize=(12, 6))
plt.plot(reconstruction_error, label='Reconstruction Error')
plt.axhline(y=threshold, color='r', linestyle='--', label='Threshold')
plt.scatter(np.where(anomalies)[0], reconstruction_error[anomalies], color='red', label='Anomalies')
plt.title('Reconstruction Error on Test Data')
plt.xlabel('Time Step')
plt.ylabel('Reconstruction Error')
plt.legend()
plt.show()


In [None]:
y_pred = model.predict(X_test)
y_pred_last = y_pred[:, -1, :]
y_pred_inverse = scaler.inverse_transform(y_pred_last.reshape(-1, 1))
y_test_inverse = scaler.inverse_transform(y_test.reshape(-1, 1))

reconstruction_errors = np.abs(y_test_inverse - y_pred_inverse)

threshold = np.percentile(reconstruction_errors, 95)

anomalies = reconstruction_errors.flatten() > threshold

plt.figure(figsize=(14, 7))
plt.plot(y_test_inverse, label='Original Close Prices', color='blue')
plt.plot(y_pred_inverse, label='Reconstructed Close Prices', color='orange')
plt.scatter(np.where(anomalies)[0], y_test_inverse[anomalies], color='red', label='Anomalies', marker='o')
plt.axhline(y=threshold, color='r', linestyle='--', label='Anomaly Threshold')
plt.title('Anomaly Detection in Stock Prices')
plt.xlabel('Time Steps')
plt.ylabel('Close Price')
plt.legend()
plt.show()

anomaly_indices = np.where(anomalies)[0]
print("Indices of detected anomalies:", anomaly_indices)
print("Anomalies (Close Prices):", y_test_inverse[anomalies])


In [None]:
train_size = int(len(df) * 0.8)
val_size = int(len(df) * 0.1)

train_data = df.iloc[:train_size]
val_data = df.iloc[train_size:train_size + val_size]
test_data = df.iloc[train_size + val_size:]

scaler = StandardScaler()
scaler.fit(train_data[['Close']])

train_data['Close'] = scaler.transform(train_data[['Close']])
val_data['Close'] = scaler.transform(val_data[['Close']])
test_data['Close'] = scaler.transform(test_data[['Close']])

X_val, y_val = create_sequences(val_data[['Close']], val_data['Close'])

y_val_pred = model.predict(X_val)

y_val_pred_last = y_val_pred[:, -1, :]
y_val_pred_inverse = scaler.inverse_transform(y_val_pred_last.reshape(-1, 1))
y_val_inverse = scaler.inverse_transform(y_val.reshape(-1, 1))

reconstruction_errors_val = np.abs(y_val_inverse - y_val_pred_inverse)

plt.figure(figsize=(14, 7))
plt.plot(reconstruction_errors_val, label='Reconstruction Errors', color='purple')
plt.title('Reconstruction Errors on Validation Dataset')
plt.xlabel('Time Steps')
plt.ylabel('Reconstruction Error')
plt.axhline(y=np.percentile(reconstruction_errors_val, 95), color='red', linestyle='--', label='95th Percentile Threshold')
plt.legend()
plt.show()


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, RepeatVector, TimeDistributed, Dense, Input
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

def generate_data(num_sequences, sequence_length, feature_dim):
    return np.random.rand(num_sequences, sequence_length, feature_dim)

num_sequences = 1000
sequence_lengths = [10, 20, 30, 40, 50]
feature_dim = 5
latent_dim = 16
batch_size = 32
epochs = 1

def create_lstm_autoencoder(sequence_length, feature_dim, latent_dim):
    inputs = Input(shape=(sequence_length, feature_dim))
    encoded = LSTM(latent_dim, activation="relu")(inputs)
    decoded = RepeatVector(sequence_length)(encoded)
    decoded = LSTM(feature_dim, return_sequences=True)(decoded)
    return Model(inputs, decoded)

mse_scores = []
for seq_len in sequence_lengths:
    data = generate_data(num_sequences, seq_len, feature_dim)
    model = create_lstm_autoencoder(seq_len, feature_dim, latent_dim)
    model.compile(optimizer="adam", loss="mse")
    model.fit(data, data, epochs=epochs, batch_size=batch_size, verbose=0)

    reconstructed_data = model.predict(data)
    mse = mean_squared_error(data.reshape(-1, feature_dim), reconstructed_data.reshape(-1, feature_dim))
    mse_scores.append(mse)

plt.plot(sequence_lengths, mse_scores, marker='o')
plt.xlabel("Sequence Length")
plt.ylabel("Mean Squared Error (Reconstruction)")
plt.title("Reconstruction Error vs Sequence Length")
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

np.random.seed(42)

time_steps = 1000
anomaly_fraction = 0.1

t = np.arange(0, time_steps)
normal_data = np.sin(0.02 * t)

normal_data += 0.05 * np.random.normal(size=time_steps)

num_anomalies = int(anomaly_fraction * time_steps)
anomaly_indices = np.random.choice(time_steps, num_anomalies, replace=False)
anomalous_data = normal_data.copy()
anomalous_data[anomaly_indices] += np.random.uniform(3, 5, size=num_anomalies)

labels = np.zeros(time_steps)
labels[anomaly_indices] = 1

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, RepeatVector, TimeDistributed, Dense

def create_lstm_autoencoder(sequence_length, n_features):
    model = Sequential([
        LSTM(64, activation="relu", input_shape=(sequence_length, n_features), return_sequences=True),
        LSTM(32, activation="relu", return_sequences=False),
        RepeatVector(sequence_length),
        LSTM(32, activation="relu", return_sequences=True),
        LSTM(64, activation="relu", return_sequences=True),
        TimeDistributed(Dense(n_features))
    ])
    model.compile(optimizer="adam", loss="mse")
    return model

sequence_length = 50
n_features = 1

def create_sequences(data, seq_length):
    sequences = []
    for i in range(len(data) - seq_length):
        sequence = data[i:i + seq_length]
        sequences.append(sequence)
    return np.array(sequences)

normal_sequences = create_sequences(normal_data.reshape(-1, 1), sequence_length)
anomalous_sequences = create_sequences(anomalous_data.reshape(-1, 1), sequence_length)
labels_sequences = create_sequences(labels, sequence_length)
model = create_lstm_autoencoder(sequence_length, n_features)
history = model.fit(normal_sequences, normal_sequences, epochs=10, batch_size=32, validation_split=0.1, verbose=1)

plt.plot(history.history['loss'], label="Training Loss")
plt.plot(history.history['val_loss'], label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()
reconstructed_sequences = model.predict(anomalous_sequences)
mse = np.mean(np.power(anomalous_sequences - reconstructed_sequences, 2), axis=(1, 2))

threshold = np.percentile(mse[:len(normal_sequences)], 95)

predicted_anomalies = mse > threshold