In [13]:
import numpy as np
import matplotlib.pyplot as plt
from pandas import read_csv
import math
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Bidirectional, Dense, Dropout
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from pathlib import Path
from sklearn.utils import compute_class_weight
from sklearn.metrics import precision_score
from sklearn.metrics import accuracy_score

In [3]:
X_attacker = np.load(Path("ClosestCarCSVsModel2/np_attacker.npy"))
X_normal = np.load(Path("ClosestCarCSVsModel2/np_normal.npy"))
X_normal_split, p = train_test_split(X_normal, test_size=0.70, random_state=42, shuffle=True)
print(X_normal_split.shape)
print(X_attacker.shape)

(11386, 100, 12)
(9583, 100, 12)


In [4]:
y = np.asarray([0]*X_normal_split.shape[0] + [1]*X_attacker.shape[0]).astype('float32')
X = np.concatenate((X_normal_split, X_attacker), axis=0)
print(X.shape)
print(y.shape)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42, shuffle=True)

(20969, 100, 12)
(20969,)


In [7]:
n_feats = 12

# define the model
model = Sequential()
model.add(Bidirectional(LSTM(64, input_shape=(None, n_feats)))) # "None" allows for varying input lengths
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))

# compile the model
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
class_weights = compute_class_weight(class_weight = "balanced", classes= np.unique(y_train), y= y_train)
class_weights = dict(zip(np.unique(y_train), class_weights))

mc = ModelCheckpoint('best_model2.h5', monitor='val_loss', mode='min', save_best_only=True)

model.fit(X_train, y_train, 
          epochs=3000, 
          batch_size=2000, 
          class_weight=class_weights,
          validation_split=0.2.
          callbacks=[mc])
model.save('model2_V1.h5')
test_loss, test_acc = model.evaluate(X_test, y_test)

print(test_loss, test_acc)

(14049,)
0.6963749527931213 0.5034682154655457


In [14]:
y_pred = model.predict(X_test, batch_size=2000)
y_pred[y_pred > 0.5] = 1
y_pred[y_pred <= 0.5] = 0
y_pred.flatten()

print('Accuracy:', accuracy_score(y_test, y_pred.flatten()))
print('Precision:', precision_score(y_test, y_pred.flatten()))

Accuracy: 0.5034682080924856
Precision: 0.46440281030444963


# Other Transformer

https://keras.io/examples/nlp/text_classification_with_transformer/

In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, GlobalMaxPooling1D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.layers import TransformerBlock

# Prepare the data
input_shape = (10, 1)  # input shape of each time series sequence
num_classes = 2  # number of classes for classification

x_train = ...  # input time series data of shape (num_samples, input_shape)
y_train = ...  # labels for the input data, shape (num_samples,)

# Define the model
inputs = Input(shape=input_shape)
x = inputs

embedding_dim = 128
num_heads = 8
ff_dim = 128

# Time series embedding layer
x = TransformerBlock(embed_dim=embedding_dim, num_heads=num_heads, ff_dim=ff_dim)(x)
x = GlobalMaxPooling1D()(x)
x = Dropout(0.1)(x)
x = Dense(128, activation="relu")(x)
x = Dropout(0.1)(x)

outputs = Dense(num_classes, activation="softmax")(x)

model = Model(inputs=inputs, outputs=outputs)

# Compile the model
model.compile(loss="categorical_crossentropy", optimizer=Adam(lr=1e-4), metrics=["accuracy"])

# Train the model
y_train = tf.keras.utils.to_categorical(y_train)  # convert labels to one-hot encoded vectors
model.fit(x_train, y_train, batch_size=32, epochs=10, validation_split=0.1)

# Transformer

https://keras.io/examples/timeseries/timeseries_transformer_classification/

In [None]:
import numpy as np


def readucr(filename):
    data = np.loadtxt(filename, delimiter="\t")
    y = data[:, 0]
    x = data[:, 1:]
    return x, y.astype(int)


root_url = "https://raw.githubusercontent.com/hfawaz/cd-diagram/master/FordA/"

x_train, y_train = readucr(root_url + "FordA_TRAIN.tsv")
x_test, y_test = readucr(root_url + "FordA_TEST.tsv")

x_train = x_train.reshape((x_train.shape[0], x_train.shape[1], 1))
x_test = x_test.reshape((x_test.shape[0], x_test.shape[1], 1))

n_classes = len(np.unique(y_train))

idx = np.random.permutation(len(x_train))
x_train = x_train[idx]
y_train = y_train[idx]

y_train[y_train == -1] = 0
y_test[y_test == -1] = 0


In [None]:
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0,
    mlp_dropout=0,
):
    inputs = keras.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(n_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs)




In [None]:
y_train.shape

In [None]:
input_shape = x_train.shape[1:]

model = build_model(
    input_shape,
    head_size=256,
    num_heads=4,
    ff_dim=4,
    num_transformer_blocks=4,
    mlp_units=[128],
    mlp_dropout=0.4,
    dropout=0.25,
)

model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=keras.optimizers.Adam(learning_rate=1e-4),
    metrics=["sparse_categorical_accuracy"],
)
model.summary()

callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)]

model.fit(
    x_train,
    y_train,
    validation_split=0.2,
    epochs=200,
    batch_size=64,
    callbacks=callbacks,
)

model.evaluate(x_test, y_test, verbose=1)
