In [1]:
import numpy as np
from sklearn.preprocessing import LabelEncoder

def readucr(filename):
    data = np.loadtxt(filename, delimiter="\t")
    y = data[:, 0]
    x = data[:, 1:]
    return x, y.astype(int)

root_url = "https://raw.githubusercontent.com/hfawaz/cd-diagram/master/FordA/"

X_train, y_train = readucr(root_url + "FordA_TRAIN.tsv")
X_test, y_test = readucr(root_url + "FordA_TEST.tsv")

X_train=X_train.reshape((X_train.shape[0],X_train.shape[1],1))
X_test=X_test.reshape((X_test.shape[0],X_test.shape[1],1))

le = LabelEncoder()
y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)

X_train1=X_train[:2000]
y_train1=y_train[:2000]

X_train2=X_train[2000:]
y_train2=y_train[2000:]

X_train.shape,y_train.shape

((3601, 500, 1), (3601,))

In [2]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.callbacks import LambdaCallback

class EWCLoss:
    def __init__(self, model):
        self.model = model
        self.star_vars = []

    def compute_fisher(self, x, n_samples=100):
        fisher = []
        for v in range(len(self.model.trainable_variables)):
            fisher.append(np.zeros(self.model.trainable_variables[v].shape))

        for _ in range(n_samples):
            # Sample data and compute gradients
            idx = np.random.choice(x.shape[0], 1, replace=False)
            sample = x[idx]
            with tf.GradientTape() as tape:
                preds = self.model(sample)
                loss = BinaryCrossentropy()(sample, preds)
            grads = tape.gradient(loss, self.model.trainable_variables)

            # Accumulate squared gradients
            for v, g in zip(range(len(grads)), grads):
                fisher[v] += np.square(g.numpy())

        # Average and normalize
        for v in range(len(fisher)):
            fisher[v] /= n_samples
            fisher[v] /= np.sum(fisher[v])

        self.star_vars = [v.numpy() for v in self.model.trainable_variables]
        self.fisher=fisher
        return fisher

    def ewc_loss(self, lam=10000):
        def loss(y_true, y_pred):
            base_loss = BinaryCrossentropy()(y_true, y_pred)
            ewc_loss = 0
            for v, var_star, fish in zip(self.model.trainable_variables, self.star_vars, self.fisher):
                ewc_loss += tf.reduce_sum(fish * tf.square(var_star - v))
            return base_loss + (lam / 2) * ewc_loss

        return loss

# Build the LSTM model for time series binary classification
def create_model(input_shape):
    model = Sequential([
        LSTM(128, input_shape=input_shape),
        Dense(16,activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    return model

input_shape = X_train.shape[1:]



model = create_model(input_shape)
optimizer = Adam()


# Train the model on the first task
model.compile(optimizer, loss=BinaryCrossentropy(), metrics=['accuracy'])
model.fit(X_train1, y_train1, epochs=20, validation_split=0.2, batch_size=32)

ewc_loss = EWCLoss(model)
fisher = ewc_loss.compute_fisher(X_train)

model.compile(optimizer, loss=ewc_loss.ewc_loss(), metrics=['accuracy'])
model.fit(X_train2, y_train2, epochs=10, validation_split=0.2, batch_size=32)


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f4e6d632198>

In [9]:
from sklearn.metrics import accuracy_score,roc_auc_score,confusion_matrix,precision_score,recall_score,f1_score

predy=model.predict(X_test)

print('roc auc score:',roc_auc_score(y_test,predy))
print('accuracy score:',accuracy_score(y_test,predy.astype(int)))
print('confusion matrix:',confusion_matrix(y_test,predy.astype(int)))
print('precision score:',precision_score(y_test,predy.astype(int)))
print('recall score:',recall_score(y_test,predy.astype(int)))
print('f1 score:',f1_score(y_test,predy.astype(int)))

roc auc score: 0.492200551982149
accuracy score: 0.5159090909090909
confusion matrix: [[681   0]
 [639   0]]
precision score: 0.0
recall score: 0.0
f1 score: 0.0


  _warn_prf(average, modifier, msg_start, len(result))
