In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Conv1D, Flatten
from tensorflow.keras.optimizers import Adam

In [9]:
# -- TadGAN Components --

# Generator (Encoder)
def build_encoder(input_shape, latent_dim):
    inputs = Input(shape=input_shape)
    x = LSTM(100, return_sequences=True)(inputs)
    x = LSTM(latent_dim)(x)  # Latent space representation
    model = Model(inputs, x, name='encoder')
    return model

# Generator (Decoder)
def build_decoder(latent_dim, output_shape):
    inputs = Input(shape=(latent_dim,))
    x = Dense(output_shape[0])(inputs)
    x = LSTM(100, return_sequences=True)(tf.expand_dims(x, -1))
    outputs = LSTM(output_shape[1], return_sequences=True)(x)
    model = Model(inputs, outputs, name='decoder')
    return model

# Critic (for time series)
def build_critic(input_shape):
    inputs = Input(shape=input_shape)
    x = Conv1D(64, kernel_size=3, strides=1, padding='same', activation='relu')(inputs)
    x = Flatten()(x)
    outputs = Dense(1)(x)
    model = Model(inputs, outputs, name='critic')
    return model


In [3]:
# -- Loss Functions --

# Wasserstein loss
def wasserstein_loss(y_true, y_pred):
    return tf.reduce_mean(y_true * y_pred)

# Cycle consistency loss (L2 norm)
def cycle_consistency_loss(real, reconstructed):
    return tf.reduce_mean(tf.square(real - reconstructed))


In [12]:
# -- TadGAN Model --

class TadGAN:
    def __init__(self, input_shape, latent_dim):
        self.input_shape = input_shape
        self.latent_dim = latent_dim
        
        # Build Generator (Encoder and Decoder)
        self.encoder = build_encoder(input_shape, latent_dim)
        self.decoder = build_decoder(latent_dim, input_shape)
        
        # Build Critics
        self.critic_x = build_critic(input_shape)
        self.critic_z = build_critic((latent_dim,1))

        # Optimizers
        self.critic_optimizer = Adam(learning_rate=1e-4)
        self.generator_optimizer = Adam(learning_rate=1e-4)

    # Training step
    def train_step(self, real_data):
        batch_size = tf.shape(real_data)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        
        with tf.GradientTape(persistent=True) as tape:
            # Forward pass through the encoder and decoder (Generator)
            encoded = self.encoder(real_data)
            reconstructed = self.decoder(encoded)
            
            # Forward pass through the critics
            fake_data = self.decoder(random_latent_vectors)
            critic_real_x = self.critic_x(real_data)
            critic_fake_x = self.critic_x(fake_data)
            critic_real_z = self.critic_z(random_latent_vectors)
            critic_fake_z = self.critic_z(encoded)
            
            # Compute losses
            cycle_loss = cycle_consistency_loss(real_data, reconstructed)
            critic_loss_x = wasserstein_loss(tf.ones_like(critic_real_x), critic_real_x) + \
                            wasserstein_loss(-tf.ones_like(critic_fake_x), critic_fake_x)
            critic_loss_z = wasserstein_loss(tf.ones_like(critic_real_z), critic_real_z) + \
                            wasserstein_loss(-tf.ones_like(critic_fake_z), critic_fake_z)
            generator_loss = -wasserstein_loss(tf.ones_like(critic_fake_x), critic_fake_x) - \
                             wasserstein_loss(tf.ones_like(critic_fake_z), critic_fake_z) + cycle_loss

        # Apply gradients to the critics
        critic_grads_x = tape.gradient(critic_loss_x, self.critic_x.trainable_weights)
        critic_grads_z = tape.gradient(critic_loss_z, self.critic_z.trainable_weights)
        self.critic_optimizer.apply_gradients(zip(critic_grads_x, self.critic_x.trainable_weights))
        self.critic_optimizer.apply_gradients(zip(critic_grads_z, self.critic_z.trainable_weights))
        
        # Apply gradients to the generator
        generator_grads = tape.gradient(generator_loss, self.encoder.trainable_weights + self.decoder.trainable_weights)
        self.generator_optimizer.apply_gradients(zip(generator_grads, self.encoder.trainable_weights + self.decoder.trainable_weights))
        
        return {"critic_loss_x": critic_loss_x, "critic_loss_z": critic_loss_z, "generator_loss": generator_loss}


In [13]:

# -- Data Preparation and Training Example --

# Generate synthetic data for time series (e.g., sine wave data)
def generate_synthetic_data(samples, timesteps, features):
    t = np.linspace(0, 100, timesteps)
    data = np.sin(t) + 0.1 * np.random.randn(samples, timesteps, features)
    return data

# Instantiate the model
input_shape = (100, 1)  # 100 timesteps, 1 feature
latent_dim = 20  # Latent space dimension

tadgan = TadGAN(input_shape, latent_dim)

# Generate synthetic training data
X_train = generate_synthetic_data(1000, 100, 1)

# Compile training step into a TensorFlow function
@tf.function
def training_step(real_data):
    return tadgan.train_step(real_data)

# Train the model for some epochs
epochs = 1000
for epoch in range(epochs):
    losses = training_step(X_train)
    if epoch % 100 == 0:
        print(f"Epoch {epoch}: {losses}")

# -- Testing Example --

# Test the trained model on new data
X_test = generate_synthetic_data(10, 100, 1)
encoded_test = tadgan.encoder(X_test)
reconstructed_test = tadgan.decoder(encoded_test)

# Compute reconstruction error
reconstruction_error = np.mean(np.abs(X_test - reconstructed_test), axis=(1, 2))
print("Reconstruction Error:", reconstruction_error)




ValueError: in user code:

    File "C:\Users\farra\AppData\Local\Temp\ipykernel_9936\2502222530.py", line 21, in training_step  *
        return tadgan.train_step(real_data)
    File "C:\Users\farra\AppData\Local\Temp\ipykernel_9936\341070802.py", line 27, in train_step  *
        encoded = self.encoder(real_data)
    File "c:\Users\farra\anaconda3\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler  **
        raise e.with_traceback(filtered_tb) from None
    File "c:\Users\farra\anaconda3\lib\site-packages\keras\engine\input_spec.py", line 295, in assert_input_compatibility
        raise ValueError(

    ValueError: Exception encountered when calling layer "encoder" "                 f"(type Functional).
    
    Input 0 of layer "lstm_11" is incompatible with the layer: expected shape=(None, None, 1), found shape=(1000, 100, 100)
    
    Call arguments received by layer "encoder" "                 f"(type Functional):
      • inputs=tf.Tensor(shape=(1000, 100, 100), dtype=float64)
      • training=None
      • mask=None
