In [None]:
import numpy as np
import pandas as pd
from keras.layers import Input, Dense, LSTM, TimeDistributed, Reshape, Bidirectional
from keras.models import Model, Sequential
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from oldslidingWindow import read_data, segment_data_by_day, sliding_window
FILE_PATH = '../Processed Data/Aruba_17/processed_data.csv'

data_df = read_data(FILE_PATH)
daily_segments = segment_data_by_day(data_df)
# only use the first 10 days
# daily_segments = daily_segments[:120]
window_size = 7816
overlap_ratio = 0.2
windows = sliding_window(daily_segments, window_size=window_size, overlap_ratio=overlap_ratio)

# Prepare the data
windows = np.asarray([window.to_numpy() for window in windows])

# Split the data into training and testing sets
batch_size = 32
validation_split = 0.2
timesteps = 7816
input_dim = windows[0].shape[1]

In [None]:
# Create a new train-test split using the windows
window_train, window_val = train_test_split(windows, test_size=validation_split, shuffle=False)

# Prepare the input data for the model by concatenating the windows along the time axis
X_train = np.concatenate(window_train, axis=0)
X_val = np.concatenate(window_val, axis=0)

# Normalize the data using minMaxScaler
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)

X_train = X_train.reshape((-1, timesteps, input_dim))
X_val = X_val.reshape((-1, timesteps, input_dim))

# Generator model
generator = Sequential()
generator.add(Dense(256, activation='relu', input_shape=(timesteps, input_dim)))
generator.add(Bidirectional(LSTM(128, return_sequences=True)))
generator.add(TimeDistributed(Dense(input_dim)))

# Discriminator model
discriminator = Sequential()
discriminator.add(Bidirectional(LSTM(128, return_sequences=True), input_shape=(timesteps, input_dim)))
discriminator.add(TimeDistributed(Dense(1, activation='sigmoid')))
discriminator.compile(optimizer='adam', loss='binary_crossentropy')

# GAN model
discriminator.trainable = False
gan_input = Input(shape=(timesteps, input_dim))
gan_output = discriminator(generator(gan_input))
gan = Model(gan_input, gan_output)
gan.compile(optimizer='adam', loss='binary_crossentropy')

# Train the GAN
epochs = 300
num_batches = X_train.shape[0] // batch_size

for epoch in range(epochs):
    # print(f'Epoch: {epoch + 1}/{epochs}')
    for batch in range(num_batches):
        # Train the discriminator
        noise = np.random.normal(0, 1, size=(batch_size, timesteps, input_dim))
        generated_sequences = generator.predict(noise)
        real_sequences = X_train[batch * batch_size:(batch + 1) * batch_size]

        x_combined = np.concatenate((real_sequences, generated_sequences))
        y_combined = np.concatenate((np.ones((batch_size, timesteps, 1)), np.zeros((batch_size, timesteps, 1))))

        d_loss = discriminator.train_on_batch(x_combined, y_combined)

        # Train the generator
        noise = np.random.normal(0, 1, size=(batch_size, timesteps, input_dim))
        y_mislabeled = np.ones((batch_size, timesteps, 1))

        g_loss = gan.train_on_batch(noise, y_mislabeled)

        # print(f'Batch {batch + 1}/{num_batches} - D loss: {d_loss:.4f} - G loss: {g_loss:.4f}')

In [None]:
# Use the generator to create synthetic data
noise = np.random.normal(0, 1, size=(X_val.shape[0], timesteps, input_dim))
generated_data = generator.predict(noise)

# Inverse transform the generated data
generated_data_inverse = scaler.inverse_transform(generated_data.reshape(-1, input_dim))
generated_data_inverse = np.rint(generated_data_inverse)

# Save the generated data to a text file
np.savetxt('generated_data.txt', generated_data_inverse, fmt='%.8f', delimiter=',')