In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import math
from sklearn.metrics import precision_score, recall_score, f1_score, roc_curve, auc
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras import backend as K
tf.config.experimental_run_functions_eagerly(True)

class PEFActivation(tf.keras.layers.Layer):
    def __init__(self):
        super(PEFActivation, self).__init__()
        self.beta = tf.Variable(0.1)

    def call(self, inputs):
        # print('beta', self.beta)
        return inputs * self.beta / (1 + tf.abs(inputs))

def split_sequence(sequence, n_steps):
    X, y = list(), list()
    for i in range(len(sequence)):
        end_ix = i + n_steps
        if end_ix > len(sequence) - 1:
            break
        seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
        X.append(seq_x)
        y.append(seq_y)
    return np.array(X), np.array(y)

def QuantileLoss(perc, delta=1e-4):
    perc = np.array(perc).reshape(-1)
    perc.sort()
    perc = perc.reshape(1, -1)
    def _qloss(y, pred):
        I = tf.cast(y <= pred, tf.float32)
        d = K.abs(y - pred)
        correction = I * (1 - perc) + (1 - I) * perc
        huber_loss = K.sum(correction * tf.where(d <= delta, 0.5 * d ** 2 / delta, d - 0.5 * delta), -1)
        q_order_loss = K.sum(K.maximum(0.0, pred[:, :-1] - pred[:, 1:] + 1e-6), -1)
        return huber_loss + q_order_loss
    return _qloss

n_steps = 3
data = pd.read_csv('yahoo1.csv')
data = data.astype('float32')
X_data = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
y_true = y[n_steps:]
dataframe1 = pd.read_csv('yahoo1.csv', usecols=[0])
dataset1 = dataframe1.values

model = Sequential([
    LSTM(64, input_shape=(n_steps, 1), activation=PEFActivation(), recurrent_activation=PEFActivation()),
    Dense(256, activation=PEFActivation()),
    Dense(128, activation=PEFActivation()),
    Dense(64, activation=PEFActivation()),
    Dense(5, activation=PEFActivation())
])

train_threshold = math.floor(0.7 * len(data))
X_train, y_train = split_sequence(X_data[:train_threshold][y[:train_threshold] == 0], n_steps)
X_test, y_test = split_sequence(X_data, n_steps)

model.compile(optimizer='adam', loss=QuantileLoss([0.01, 0.25, 0.5, 0.75, 0.99]))
model.fit(X_train, y_train, epochs=50, verbose=0)

pred = model.predict(X_test)
y_pred = []

for itr in range(len(pred)):
    inner_loop = pred[itr]
    y_pred.append(np.absolute(inner_loop[0] - inner_loop[4]))

lper = np.percentile(y_pred, 0.9)
uper = np.percentile(y_pred, 99.1)
y_pred_algo = (y_pred <= lper) | (y_pred >= uper)

count = 0
predictions = np.zeros(len(y_true))

# Identify anomalies based on percentiles
for itr in range(len(y_pred)):
    if y_pred[itr] <= lper or y_pred[itr] >= uper:
        count = count + 1
        predictions[itr] = 1

print('number of anomalies: ', count)
# Evaluation metrics
precision_auto = precision_score(y_true, predictions)
recall_auto = recall_score(y_true, predictions)
f1_score_auto = f1_score(y_true, predictions)
fpr, tpr, thresholds = roc_curve(y_true, predictions)
auc_roc_auto = auc(fpr, tpr)

print("Precision: {:.4f}".format(precision_auto))
print("Recall: {:.4f}".format(recall_auto))
print("F1-score: {:.4f}".format(f1_score_auto))
print("AUC-ROC: {:.4f}".format(auc_roc_auto))