<a href="https://colab.research.google.com/github/Mukil-Git/Synthetic_data_generation/blob/main/Synthetic_data_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os

In [None]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

In [None]:
class SyntheticImageGenerator:
    def __init__(self, input_shape=(64, 64, 3), latent_dim=128):
        self.input_shape = input_shape
        self.latent_dim = latent_dim
        self.encoder = None
        self.decoder = None
        self.vae = None

    def build_encoder(self):
        """Build the encoder network using CNN"""
        encoder_inputs = keras.Input(shape=self.input_shape)

        # CNN layers for feature extraction
        x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
        x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
        x = layers.Conv2D(128, 3, activation="relu", strides=2, padding="same")(x)
        x = layers.Conv2D(256, 3, activation="relu", strides=2, padding="same")(x)

        x = layers.Flatten()(x)
        x = layers.Dense(512, activation="relu")(x)

        # Latent space parameters
        z_mean = layers.Dense(self.latent_dim, name="z_mean")(x)
        z_log_var = layers.Dense(self.latent_dim, name="z_log_var")(x)
        z = Sampling()([z_mean, z_log_var])

        self.encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
        return self.encoder


In [None]:
   def build_decoder(self):
        """Build the decoder network"""
        latent_inputs = keras.Input(shape=(self.latent_dim,))

        # Calculate the shape after flattening in encoder
        conv_shape = (4, 4, 256)  # Adjusted based on input_shape and conv layers

        x = layers.Dense(np.prod(conv_shape), activation="relu")(latent_inputs)
        x = layers.Reshape(conv_shape)(x)

        # Transpose convolution layers for upsampling
        x = layers.Conv2DTranspose(256, 3, activation="relu", strides=2, padding="same")(x)
        x = layers.Conv2DTranspose(128, 3, activation="relu", strides=2, padding="same")(x)
        x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
        x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)

        # Output layer
        decoder_outputs = layers.Conv2DTranspose(
            self.input_shape[2], 3, activation="sigmoid", padding="same"
        )(x)

        self.decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
        return self.decoder

    def build_vae(self):
        """Build the complete VAE model"""
        self.build_encoder()
        self.build_decoder()
        self.vae = VAE(self.encoder, self.decoder)
        return self.vae

    def preprocess_data(self, images):
        """Preprocess client data for training"""
        # Normalize pixel values to [0, 1]
        images = images.astype("float32") / 255.0

        # Resize if needed
        if images.shape[1:] != self.input_shape:
            images = tf.image.resize(images, self.input_shape[:2])

        return images


In [None]:
  def load_client_data(self, data_path, augment=True):
        """Load and augment limited client data"""
        # This is a placeholder - adapt based on your client data format
        try:
            # Example for loading images from directory
            if os.path.isdir(data_path):
                image_files = [f for f in os.listdir(data_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
                images = []
                for file in image_files[:100]:  # Limit for demonstration
                    img = keras.preprocessing.image.load_img(
                        os.path.join(data_path, file),
                        target_size=self.input_shape[:2]
                    )
                    img_array = keras.preprocessing.image.img_to_array(img)
                    images.append(img_array)
                images = np.array(images)
            else:
                # If data_path is a numpy file
                images = np.load(data_path)

        except Exception as e:
            print(f"Error loading data: {e}")
            # Generate synthetic data for demonstration
            images = np.random.rand(100, *self.input_shape) * 255

        images = self.preprocess_data(images)

        # Data augmentation for limited dataset
        if augment and len(images) < 1000:
            images = self.augment_data(images)

        return images

    def augment_data(self, images):
        """Augment limited client data"""
        augmented = []

        # Original images
        augmented.extend(images)

        # Rotations
        for angle in [90, 180, 270]:
            rotated = tf.image.rot90(images, k=angle//90)
            augmented.extend(rotated)

        # Horizontal flip
        flipped = tf.image.flip_left_right(images)
        augmented.extend(flipped)

        # Brightness variations
        for brightness_delta in [-0.2, 0.2]:
            bright = tf.image.adjust_brightness(images, brightness_delta)
            bright = tf.clip_by_value(bright, 0.0, 1.0)
            augmented.extend(bright)

        return np.array(augmented)


In [None]:
 def train_model(self, train_data, epochs=100, batch_size=32, validation_split=0.2):
        """Train the VAE model"""
        if self.vae is None:
            self.build_vae()

        # Compile model
        self.vae.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-4))

        # Split data
        train_images, val_images = train_test_split(
            train_data, test_size=validation_split, random_state=42
        )

        # Callbacks
        callbacks = [
            keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
            keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5),
            keras.callbacks.ModelCheckpoint('vae_best.h5', save_best_only=True)
        ]

        # Train model
        history = self.vae.fit(
            train_images,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(val_images, val_images),
            callbacks=callbacks,
            verbose=1
        )

        return history

    def hyperparameter_tuning(self, train_data):
        """Perform hyperparameter tuning"""
        best_loss = float('inf')
        best_params = {}

        # Hyperparameters to tune
        latent_dims = [64, 128, 256]
        learning_rates = [1e-3, 1e-4, 1e-5]
        batch_sizes = [16, 32, 64]

        for latent_dim in latent_dims:
            for lr in learning_rates:
                for batch_size in batch_sizes:
                    print(f"Testing: latent_dim={latent_dim}, lr={lr}, batch_size={batch_size}")

                    # Rebuild model with new parameters
                    self.latent_dim = latent_dim
                    self.build_vae()
                    self.vae.compile(optimizer=keras.optimizers.Adam(learning_rate=lr))

                    # Quick training for evaluation
                    history = self.vae.fit(
                        train_data,
                        epochs=10,
                        batch_size=batch_size,
                        verbose=0
                    )

                    final_loss = history.history['loss'][-1]

                    if final_loss < best_loss:
                        best_loss = final_loss
                        best_params = {
                            'latent_dim': latent_dim,
                            'learning_rate': lr,
                            'batch_size': batch_size
                        }

        print(f"Best parameters: {best_params}")
        print(f"Best loss: {best_loss}")

        return best_params



In [None]:
def generate_synthetic_images(self, num_images=10, add_noise=True, noise_factor=0.1):
        """Generate synthetic images with optional noise"""
        if self.vae is None:
            raise ValueError("Model not trained yet!")

        # Sample random points in latent space
        random_latent_vectors = tf.random.normal(shape=(num_images, self.latent_dim))

        # Add noise if requested
        if add_noise:
            noise = tf.random.normal(shape=random_latent_vectors.shape) * noise_factor
            random_latent_vectors += noise

        # Generate images
        synthetic_images = self.decoder.predict(random_latent_vectors)

        return synthetic_images

    def interpolate_images(self, img1_idx=0, img2_idx=1, steps=10, source_images=None):
        """Generate interpolated images between two points in latent space"""
        if source_images is None or len(source_images) < 2:
            # Use random latent vectors
            z1 = tf.random.normal(shape=(1, self.latent_dim))
            z2 = tf.random.normal(shape=(1, self.latent_dim))
        else:
            # Encode source images to latent space
            z1_mean, _, _ = self.encoder.predict(source_images[img1_idx:img1_idx+1])
            z2_mean, _, _ = self.encoder.predict(source_images[img2_idx:img2_idx+1])
            z1, z2 = z1_mean, z2_mean

        # Create interpolation
        interpolated_images = []
        for i in range(steps):
            alpha = i / (steps - 1)
            z_interp = (1 - alpha) * z1 + alpha * z2
            img = self.decoder.predict(z_interp)
            interpolated_images.append(img[0])

        return np.array(interpolated_images)

    def plot_results(self, original_images, synthetic_images, save_path=None):
        """Visualize original vs synthetic images"""
        fig, axes = plt.subplots(2, 10, figsize=(20, 4))

        # Plot original images
        for i in range(min(10, len(original_images))):
            axes[0, i].imshow(original_images[i])
            axes[0, i].set_title("Original")
            axes[0, i].axis('off')

        # Plot synthetic images
        for i in range(min(10, len(synthetic_images))):
            axes[1, i].imshow(synthetic_images[i])
            axes[1, i].set_title("Synthetic")
            axes[1, i].axis('off')

        plt.tight_layout()
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()


In [None]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# Main execution example
def main():
    """Main execution pipeline"""
    # Initialize the generator
    generator = SyntheticImageGenerator(input_shape=(64, 64, 3), latent_dim=128)

    # Load client data (replace with your actual data path)
    print("Loading client data...")
    client_data = generator.load_client_data("path/to/client/data")  # Update this path
    print(f"Loaded {len(client_data)} images")

    # Optional: Hyperparameter tuning
    print("Performing hyperparameter tuning...")
    best_params = generator.hyperparameter_tuning(client_data)

    # Update model with best parameters
    generator.latent_dim = best_params.get('latent_dim', 128)
    generator.build_vae()

    # Train the model
    print("Training VAE model...")
    history = generator.train_model(
        client_data,
        epochs=100,
        batch_size=best_params.get('batch_size', 32)
    )

    # Generate synthetic images
    print("Generating synthetic images...")
    synthetic_images = generator.generate_synthetic_images(
        num_images=50,
        add_noise=True,
        noise_factor=0.1
    )

    # Visualize results
    generator.plot_results(client_data, synthetic_images, save_path="vae_results.png")

    # Save synthetic data
    np.save("synthetic_images.npy", synthetic_images)
    print("Synthetic images saved successfully!")

    return generator, synthetic_images

if __name__ == "__main__":
    generator, synthetic_images = main()