In [None]:
import pandas as pd

In [None]:
file = './highD/data/01_tracks.csv'

df = pd.read_csv(file)

In [None]:

columns_to_drop = [
    'precedingId', 'followingId', 'leftPrecedingId', 'leftAlongsideId', 
    'leftFollowingId', 'rightPrecedingId', 'rightAlongsideId', 'rightFollowingId', 'precedingXVelocity'
]

# Drop the columns from the DataFrame
df= df.drop(columns=columns_to_drop)
print(df.head())# List of columns to drop

In [None]:
print(df.info())


In [None]:
df_sorted = df.sort_values(by=['id', 'frame'])
grouped = df_sorted.groupby('id')

In [None]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# Sort and group the DataFrame
df_sorted = df.sort_values(by=['id', 'frame'])
grouped = df_sorted.groupby('id', group_keys=False)

# Feature engineerin
def create_lagged_features(group, n_lags=5):
    features = ['x', 'y', 'xVelocity', 'yVelocity' , 'xAcceleration' ,'yAcceleration', 'frontSightDistance', 'backSightDistance', 'dhw' ,'thw']
    for feature in features:
        for lag in range(1, n_lags + 1):
            group[f'{feature}_lag_{lag}'] = group[feature].shift(lag)
    return group


# Apply feature engineering
df_features = grouped.apply(create_lagged_features, include_groups=False).dropna()


# Initialize the MinMaxScaler with a range of -1 to 1
scaler = MinMaxScaler(feature_range=(-1, 1))

# Select columns to scale (excluding 'frame', 'id', 'laneId')
columns_to_scale = df_features.columns.difference(['frame', 'id', 'laneId'])

# Scale these columns
df_features[columns_to_scale] = scaler.fit_transform(df_features[columns_to_scale])



In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf

# split data
train, test = train_test_split(df_features, test_size=0.2, random_state=42)
train, val = train_test_split(train, test_size=0.20, random_state=42) 

# set x and y features
feature_columns = [col for col in train.columns if 'lag' in col]
X_train = scaler.fit_transform(train[feature_columns])
X_val = scaler.transform(val[feature_columns])
X_test = scaler.transform(test[feature_columns])

# set ttc to target
y_train = train['ttc'].values
y_val = val['ttc'].values
y_test = test['ttc'].values

def build_model(input_shape, learning_rate=0.0001):
    inputs = tf.keras.Input(shape=input_shape)
    x = tf.keras.layers.LayerNormalization()(inputs)
    x = tf.keras.layers.MultiHeadAttention(num_heads=3, key_dim=input_shape[-1])(x, x)
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    x = tf.keras.layers.Dense(25, activation='relu')(x)
    outputs = tf.keras.layers.Dense(1)(x)  # Assuming a regression problem
    model = tf.keras.models.Model(inputs=inputs, outputs=outputs)
    return model

model = build_model((X_train.shape[1], 1), learning_rate=0.0001)  # Update the shape based on your actual input feature shape
model.compile(optimizer='adam', loss='mse')

# train
history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=10, batch_size=32)

#evaluate
test_loss = model.evaluate(X_test, y_test)


In [None]:
import matplotlib.pyplot as plt

# 1. Training history
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

# 2. Test loss
print("Test Loss:", test_loss)
