In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

In [None]:
#Data Preprocessing
def preprocess_data(data_path):
    data = pd.read_csv(data_path)
    # Normalize the GPS coordinates
    scaler = MinMaxScaler()
    data[['latitude', 'longitude']] = scaler.fit_transform(data[['latitude', 'longitude']])
    return data

In [None]:
def create_look_back_dataset(data, look_back_period):
    X, y = [], []
    for i in range(len(data) - look_back_period - 1):
        X.append(data[i:(i + look_back_period), 0])
        y.append(data[i + look_back_period, 0])
    return np.array(X), np.array(y)

# Construct the dataset path 
current_dir = os.path.dirname(__file__)
dataset_path = os.path.join(current_dir, '..', 'data', 'ngsim_dataset.csv')
#Load and preprocess data
ngsim_data = preprocess_data('\path\ngsim_dataset.csv')

# Create look-back dataset
look_back_period = 5
X, y = create_look_back_dataset(ngsim_data.values, look_back_period)

# Reshape X for model input, assuming single feature
X = X.reshape((X.shape[0], look_back_period, ngsim_data.shape[1])) 

print("Shape of X:", X.shape)
print("Shape of y:", y.shape)


In [None]:
#Model Architecture
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import RMSprop

# Hyperparameters
latent_dim = 10  # Length of the latent vector
look_back_period = 5  # Sliding window size
learning_rate = 0.0001  # Learning rate for the optimizer
epochs = 75  # Number of training epochs
batch_size = 64  # Batch size for training

# Weights for the loss functions in the generator-discriminator model
gamma_1 = 0.8
gamma_2 = 0.2
gamma_3 = 0.1

def build_encoder(input_shape):
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv1D(64, kernel_size=3, activation='relu', padding='same'),
        layers.MaxPooling1D(pool_size=2),
        layers.Conv1D(128, kernel_size=3, activation='relu', padding='same'),
        layers.MaxPooling1D(pool_size=2),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
    ])
    return model

def build_decoder(latent_dim):
    model = models.Sequential([
        layers.Input(shape=(latent_dim,)),
        layers.Dense(128, activation='relu'),
        layers.Dense(256, activation='relu'),
        layers.Reshape((latent_dim // 16, 16)),
        layers.Conv1DTranspose(128, kernel_size=3, activation='relu', padding='same'),
        layers.UpSampling1D(size=2),
        layers.Conv1DTranspose(64, kernel_size=3, activation='relu', padding='same'),
        layers.UpSampling1D(size=2),
        layers.Conv1DTranspose(1, kernel_size=3, activation='sigmoid', padding='same'),
    ])
    return model

def build_discriminator(input_shape):
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv1D(64, kernel_size=3, activation='relu', padding='same'),
        layers.MaxPooling1D(pool_size=2),
        layers.Conv1D(128, kernel_size=3, activation='relu', padding='same'),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(1, activation='sigmoid'),
    ])
    model.compile(optimizer=RMSprop(learning_rate=learning_rate), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_gan(generator, discriminator):
    discriminator.trainable = False
    gan_input = layers.Input(shape=(generator.input_shape[1:]))
    generated_data = generator(gan_input)
    gan_output = discriminator(generated_data)
    gan = models.Model(gan_input, gan_output)
    gan.compile(optimizer=RMSprop(learning_rate=learning_rate), loss='binary_crossentropy')
    return gan

input_shape = (look_back_period, 1)
encoder = build_encoder(input_shape)
decoder = build_decoder(latent_dim)

# Generator and Discriminator
generator = models.Sequential([encoder, decoder])
discriminator = build_discriminator((look_back_period, 1))

gan = build_gan(generator, discriminator)



In [None]:
#Loss Fucntion

def gan_loss(y_true, y_pred):
    loss_1 = gamma_1 * tf.keras.losses.binary_crossentropy(y_true, y_pred)
    loss_2 = gamma_2 * tf.keras.losses.mean_squared_error(y_true, y_pred)
    loss_3 = gamma_3 * tf.keras.losses.mean_absolute_error(y_true, y_pred)
    return loss_1 + loss_2 + loss_3

# Applying loss for the generator
generator.compile(optimizer=RMSprop(learning_rate=learning_rate), loss=gan_loss)


In [None]:
#Training

def train_gan(gan, generator, discriminator, data, epochs=epochs, batch_size=batch_size):
    for epoch in range(epochs):
        # Sample random batch of real data
        real_data = data[np.random.randint(0, data.shape[0], size=batch_size)]

        # Generate fake data
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        fake_data = generator.predict(noise)

        # Train the discriminator
        d_loss_real = discriminator.train_on_batch(real_data, np.ones((batch_size, 1)))
        d_loss_fake = discriminator.train_on_batch(fake_data, np.zeros((batch_size, 1)))
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # Train the generator
        g_loss = gan.train_on_batch(noise, np.ones((batch_size, 1)))

       
        if epoch % 1000 == 0:
            print(f"{epoch}/{epochs} [D loss: {d_loss}] [G loss: {g_loss}]")

train_gan(gan, generator, discriminator, X)


In [None]:
#Evaluation

from sklearn.metrics import precision_recall_curve, auc

def evaluate_model(generator, real_data, fake_data):
    # Generating the  predictions
    real_labels = np.ones((real_data.shape[0], 1))
    fake_labels = np.zeros((fake_data.shape[0], 1))

    predictions = np.vstack((real_labels, fake_labels))

    precision, recall, _ = precision_recall_curve(predictions, generator.predict(fake_data))
    auc_pr = auc(recall, precision)
    return auc_pr

fake_data = generator.predict(np.random.normal(0, 1, (1000, latent_dim)))
auc_pr = evaluate_model(generator, X, fake_data)
print(f"AUC-PR: {auc_pr}")
