In [None]:
import pandas as pd
import seaborn as sns
import numpy as np

In [None]:
#df = pd.read_csv('Data_Latest.csv')

df = pd.read_csv('dataCompressed3_125_1000.csv')
print(list(df.columns))
df = df.drop('series_id', axis=1)

df.isna().sum()

In [None]:
df.head()

In [None]:
#df = df.drop('series_id', axis=1)
df.columns

In [None]:
features = list(df.columns)
features.remove("target")
features

In [None]:
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
df[features] = scaler.fit_transform(df[features])
df.head()

In [None]:
import numpy as np
import pandas as pd
def create_sequences(data, seq_length):
    xs, ys = [], []
    for i in range(0, len(data) - seq_length, round(seq_length/2)):
        x = data[i:i+seq_length][['x_0', 'x_1']].values
        y0 = data.iloc[i]['target']
        y = data.iloc[i+seq_length]['target']
        #xs.append(x)
        #ys.append(y)
        if y0 == y:
            xs.append(x)
            ys.append(y)
    return np.array(xs), np.array(ys)
SEQ_LENGTH = 20
x_data, y_data = create_sequences(df, SEQ_LENGTH)
print(len(y_data))

In [None]:
print(x_data[1])

In [None]:
type(x_data)

In [None]:
# Placeholder lists for the final training and test sets
x_train_list, x_test_list = [], []
y_train_list, y_test_list = [], []
for label in df['target'].unique():
    x_data_class, y_data_class = create_sequences(df[df['target'] == label], SEQ_LENGTH)
    train_size = int(len(x_data_class) * 0.8)

    # Split the data for this class
    x_train_class, x_test_class = x_data_class[:train_size], x_data_class[train_size:]
    y_train_class, y_test_class = y_data_class[:train_size], y_data_class[train_size:]

    # Append to the final lists
    x_train_list.append(x_train_class)
    x_test_list.append(x_test_class)
    y_train_list.append(y_train_class)
    y_test_list.append(y_test_class)

# Concatenate data from all classes to get the final training and test sets
x_train = np.concatenate(x_train_list, axis=0)
x_test = np.concatenate(x_test_list, axis=0)
y_train = np.concatenate(y_train_list, axis=0)
y_test = np.concatenate(y_test_list, axis=0)

In [None]:
# For training data
unique_labels_train, counts_train = np.unique(y_train, return_counts=True)
for label, count in zip(unique_labels_train, counts_train):
    print(f"Label {label} in training data: {count} instances")

print("\n")  # Just to separate the outputs

# For test data
unique_labels_test, counts_test = np.unique(y_test, return_counts=True)
for label, count in zip(unique_labels_test, counts_test):
    print(f"Label {label} in test data: {count} instances")

In [None]:
from tensorflow.keras.metrics import SparseCategoricalAccuracy
from tensorflow.keras import Sequential
from tensorflow.keras.layers import LSTM, BatchNormalization, Dense, Flatten

metrics = [SparseCategoricalAccuracy(name="accuracy")]

model = Sequential()
model.add(LSTM(300, activation='relu', return_sequences=True, input_shape=(SEQ_LENGTH, 2)))
model.add(BatchNormalization())
model.add(LSTM(200, activation='relu', return_sequences=True))
model.add(BatchNormalization())
model.add(LSTM(100, activation='relu', return_sequences=True))
model.add(BatchNormalization())
model.add(LSTM(50, activation='relu'))
model.add(BatchNormalization())
model.add(Flatten())
model.add(Dense(200, activation='relu'))
model.add(Dense(10, activation='softmax'))

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [None]:
#!pip install keras-tuner

In [None]:
from tensorflow.python.client import device_lib
import tensorflow as tf
def get_available_devices():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos]

print(get_available_devices())
print(tf.config.list_physical_devices('GPU'))

In [None]:
from keras_tuner import RandomSearch

def build_model(hp):
    model = Sequential()
    model.add(LSTM(
        units=hp.Int('units_1', min_value=200, max_value=500, step=50),
        activation='relu',
        return_sequences=True,
        input_shape=(SEQ_LENGTH, 2)
    ))
    model.add(BatchNormalization())
    model.add(LSTM(
        units=hp.Int('units_2', min_value=100, max_value=300, step=50),
        activation='relu',
        return_sequences=True
    ))
    model.add(BatchNormalization())
    model.add(LSTM(
        units=hp.Int('units_3', min_value=50, max_value=200, step=50),
        activation='relu'
    ))
    model.add(BatchNormalization())
    model.add(Flatten())
    model.add(Dense(
        units=hp.Int('dense_units', min_value=100, max_value=300, step=50),
        activation='relu'
    ))
    model.add(Dense(10, activation='softmax'))

    model.compile(
        optimizer=hp.Choice('optimizer', values=['adam', 'sgd', 'rmsprop']),
        loss='sparse_categorical_crossentropy',
        metrics=metrics
    )
    return model

In [None]:
tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=10,  # or however many trials you wish to run
    #directory='C:\\Users\\evenf\\OneDrive - Universitetet i Agder\\Even and Hamza PhD project work\\Human trajectory simulations paper\\LSTM Model with Dataset\\keras_tuner_dir'
)

tuner.search(x_train, y_train, epochs=10, validation_data=(x_test, y_test))

In [None]:
from tensorflow.keras.callbacks import Callback, CSVLogger, EarlyStopping, LearningRateScheduler, ReduceLROnPlateau
import tensorflow as tf
csv_logger = CSVLogger('logs.csv', separator = ',', append = True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=3, verbose=1)
def custom_lr_schedule(epoch):
    if epoch < 30:
        return 0.001
    else:
        return 0.001 * tf.math.exp(0.1 * (30 - epoch))
lr_scheduler = LearningRateScheduler(custom_lr_schedule, verbose=1)
lstm_history = model.fit(x_train, y_train, epochs=100, validation_data=(x_test, y_test), batch_size=16, callbacks=[reduce_lr, lr_scheduler, csv_logger])

# Save model and history
model.save('model_lstm.keras')
np.save('history_lstm.npy',lstm_history.history)

In [None]:
predictions = model.predict(x_test)
classes_x=np.argmax(predictions,axis=1)

print(x_test[1].shape)
print(predictions.shape)
print(classes_x)

In [None]:
cm = tf.math.confusion_matrix(labels=y_test, predictions=classes_x, num_classes=10)
cm

In [None]:
import matplotlib.pyplot as plt
font_style = {'family' : 'sans-serif', # 'Times New Roman'
        'weight' : 'normal',
        'size'   : 14}

font_style_nr = {'family' : 'sans-serif',
        'weight' : 'normal',
        'size'   : 12}

hm = sns.heatmap(cm, annot=True, cmap='Blues',fmt='g', annot_kws={'fontdict': font_style_nr})
hm.set_xticklabels(hm.get_xticklabels(), fontdict=font_style_nr)
hm.set_yticklabels(hm.get_yticklabels(), fontdict=font_style_nr)
cbar = hm.collections[0].colorbar
# Set font style for colorbar tick labels
for label in cbar.ax.get_yticklabels():
    label.set_fontsize(12)
    label.set_fontname('sans-serif')
    label.set_fontweight('normal')

plt.xlabel('Predicted Labels', fontdict=font_style)
plt.ylabel('True Labels', fontdict=font_style)
plt.show()

In [None]:
from sklearn.metrics import classification_report
print(classification_report(y_test, classes_x))

In [None]:
history = lstm_history


fig, ax = plt.subplots()
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('')
plt.ylabel('Accuracy')
plt.xlabel('Epochs')
plt.legend(['train', 'validation'], loc='best')
plt.show()

image_format = 'pdf' # e.g .png, .svg, etc.
image_name = 'accuracy.pdf'

#fig.savefig(image_name, format=image_format, dpi=1200)