In [5]:
# Importing necessary libraries
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Bidirectional
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras import backend as K
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import itertools
import seaborn as sns


# Mount Google Drive (specific to Google Colab)


data = pd.read_csv('01_tracks.csv')

# Data preprocessing steps
def get_lane_changed_ID(data):
    unique_ID = len(data['id'].unique())
    r_to_l = list()
    l_to_r = list()
    for i in range(1, unique_ID):
        vehicle = data.loc[data["id"] == i]
        if vehicle['laneId'].iloc[0] < 3.5 and vehicle['laneId'].iloc[0] - vehicle['laneId'].iloc[-1] > 0:
            r_to_l.append(i)
        if vehicle['laneId'].iloc[0] < 3.5 and vehicle['laneId'].iloc[0] - vehicle['laneId'].iloc[-1] < 0:
            l_to_r.append(i)
        if vehicle['laneId'].iloc[0] > 3.5 and vehicle['laneId'].iloc[0] - vehicle['laneId'].iloc[-1] > 0:
            l_to_r.append(i)
        if vehicle['laneId'].iloc[0] > 3.5 and vehicle['laneId'].iloc[0] - vehicle['laneId'].iloc[-1] < 0:
            r_to_l.append(i)

    return r_to_l, l_to_r

r_to_l, l_to_r = get_lane_changed_ID(data)

# Mapping 'lane_change' to numeric labels

data['lane_change'] = 0
data.loc[data['id'].isin(r_to_l), 'lane_change'] = 1
data.loc[data['id'].isin(l_to_r), 'lane_change'] = 2

# Drop unwanted columns
columns_to_drop = [
    'frontSightDistance', 'backSightDistance', 'dhw', 'thw', 'ttc',
    'precedingXVelocity', 'precedingId', 'followingId', 'leftPrecedingId',
    'leftAlongsideId', 'leftFollowingId', 'rightPrecedingId', 'rightAlongsideId',
    'rightFollowingId', 'laneId', 'width', 'height', 'frame'
]
data = data.drop(columns_to_drop, axis=1)

# Group data by ID and create sequences
grouped = data.groupby('id')
sequences = [group.drop('id', axis=1).values for _, group in grouped]


max_sequence_length = max(len(s) for s in sequences)
padded_sequences = pad_sequences(sequences, maxlen=max_sequence_length, padding='post', dtype='float32')

# Convert labels to one-hot encoded format
labels = np.array([group['lane_change'].iloc[0] for _, group in grouped])
one_hot_labels = to_categorical(labels, num_classes=3)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(padded_sequences, one_hot_labels, test_size=0.2, random_state=42)

# Build the LSTM model
model = tf.keras.Sequential([
    Bidirectional(tf.keras.layers.LSTM(50, return_sequences=True, input_shape=(max_sequence_length, padded_sequences.shape[2]), dropout=0.2)),
    Bidirectional(tf.keras.layers.LSTM(30, dropout=0.2)),
    tf.keras.layers.Dense(3, activation='softmax')
])

# Custom F1 Score Metric
def F1Score(y_true, y_pred):
    # Calculating Precision
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    # Calculating Recall
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    # Calculating F1 Score
    f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
    return f1_val

# Compile the model
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy', Precision(), Recall(), F1Score])

# Define class weights
class_weights = {0: 1, 1: 2, 2: 2}

# Train the model
history = model.fit(X_train, y_train, epochs=10, batch_size=12, validation_split=0.2, class_weight=class_weights,)

# Evaluate the model
loss, accuracy, precision, recall, f1_score = model.evaluate(X_test, y_test)
print(f"Test Loss: {loss:.4f}")
print(f"Test Accuracy: {accuracy*100:.2f}%")
print(f"Test Precision: {precision*100:.2f}%")
print(f"Test Recall: {recall*100:.2f}%")
print(f"Test F1 Score: {f1_score:.4f}")


# Calculate F1 Score
f1_score = F1Score(y_test, model.predict(X_test))
print(f"Test F1 Score: {f1_score:.4f}")
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)
cm = confusion_matrix(y_true_classes, y_pred_classes)

plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

Epoch 1/10


AttributeError: in user code:

    File "/Users/andrashertelendy/miniforge3/lib/python3.10/site-packages/keras/engine/training.py", line 1249, in train_function  *
        return step_function(self, iterator)
    File "/Users/andrashertelendy/miniforge3/lib/python3.10/site-packages/keras/engine/training.py", line 1233, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/Users/andrashertelendy/miniforge3/lib/python3.10/site-packages/keras/engine/training.py", line 1222, in run_step  **
        outputs = model.train_step(data)
    File "/Users/andrashertelendy/miniforge3/lib/python3.10/site-packages/keras/engine/training.py", line 1023, in train_step
        y_pred = self(x, training=True)
    File "/Users/andrashertelendy/miniforge3/lib/python3.10/site-packages/keras/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/Users/andrashertelendy/miniforge3/lib/python3.10/site-packages/keras/layers/rnn/gru_lstm_utils.py", line 213, in generate_defun_backend
        return tf.__internal__.function.defun_with_attributes(

    AttributeError: Exception encountered when calling layer 'forward_lstm_6' (type LSTM).
    
    module 'tensorflow._api.v2.compat.v2.__internal__.function' has no attribute 'defun_with_attributes'
    
    Call arguments received by layer 'forward_lstm_6' (type LSTM):
      • inputs=tf.Tensor(shape=(None, 486, 7), dtype=float32)
      • mask=None
      • training=True
      • initial_state=None
