The purpose of this notebook is to test the data augmentation technique from this paper: https://onlinelibrary.wiley.com/doi/full/10.1002/int.23013

The topics for investigation are:
- How effective is this technique at generating new samples for our dataset
- How long does it take to train

In [1]:
import os
import pickle
import numpy as np
import math
import matplotlib.pyplot as plt
from datetime import datetime

In [2]:
# Loading ex-vivo data
path_to_data = "Data\\Ex-Vivo\\"
files = os.listdir(path_to_data)

# Removing readme
files.remove("READ_ME.txt")

data = []

for file in files:
    with open(path_to_data + file, 'rb') as f:
        data.append(pickle.load(f))

In [6]:
print(data[1]['samplematrix'])

sample 1 bare


In [7]:
# These values were found by experimenting and inspecting the resulting pulses
window_start = 3750
window_end = 4850

data = [d for d in data if 'sample' in d['samplematrix']]
X = [d['scan'][0]['forward_scan']['signal'][window_start:window_end] for d in data]
Y = [d['samplematrix'].split()[2] for d in data]

X = np.array(X)
Y = np.array(Y)

print("X shape: ", X.shape)
print("Y shape: ", Y.shape)

from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
Y = le.fit_transform(Y)

print("y values first 10: ", Y[:10])
print("distinct y values: ", np.unique(Y))

X = X.astype(np.float32)

print("X dtype: ", X.dtype)
print("Y dtype: ", Y.dtype)

X shape:  (92, 1100)
Y shape:  (92,)
y values first 10:  [2 2 2 2 2 2 2 2 3 1]
distinct y values:  [0 1 2 3]
X dtype:  float32
Y dtype:  int64


In [8]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LeakyReLU, Dropout, BatchNormalization, Reshape, Flatten
from tensorflow.keras.optimizers import Adam
import uuid
from pathlib import Path

# Load your dataset
# For demonstration, let's create a dummy dataset
# Replace this with loading your actual dataset
pulses = np.random.rand(44, 1100)  # 44 pulses, each with 1100 features

# Normalize the dataset
pulses = (pulses - 0.5) / 0.5

# Define GAN components
latent_dim = 100

# Generator
def build_generator():
    model = Sequential()
    model.add(Dense(256, input_dim=latent_dim))
    model.add(LeakyReLU(alpha=0.2))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(512))
    model.add(LeakyReLU(alpha=0.2))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(1024))
    model.add(LeakyReLU(alpha=0.2))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(1100, activation='tanh'))
    return model

# Discriminator
def build_discriminator():
    model = Sequential()
    model.add(Dense(512, input_shape=(1100,)))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.4))
    model.add(Dense(256))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.4))
    model.add(Dense(1, activation='sigmoid'))
    return model


            

# Build and compile the discriminator
discriminator = build_discriminator()
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])

# Build the generator
generator = build_generator()

# The generator takes noise as input and generates pulses
z = tf.keras.Input(shape=(latent_dim,))
pulse = generator(z)

# For the combined model, only train the generator
discriminator.trainable = False

# The discriminator takes generated pulses as input and determines validity
validity = discriminator(pulse)

# Combined model (stacked generator and discriminator)
combined = tf.keras.Model(z, validity)
combined.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))

# Training the GAN
def train(epochs, batch_size=32, save_interval=50):
    valid = np.ones((batch_size, 1))
    fake = np.zeros((batch_size, 1))

    for epoch in range(epochs):
        # Train Discriminator
        idx = np.random.randint(0, pulses.shape[0], batch_size)
        real_pulses = pulses[idx]

        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        gen_pulses = generator.predict(noise)

        d_loss_real = discriminator.train_on_batch(real_pulses, valid)
        d_loss_fake = discriminator.train_on_batch(gen_pulses, fake)
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # Train Generator
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        g_loss = combined.train_on_batch(noise, valid)

        # Print the progress
        print(f"{epoch} [D loss: {d_loss[0]}, acc.: {100*d_loss[1]}%] [G loss: {g_loss}]")

        # Save generated pulse samples
        if epoch % save_interval == 0:
            save_pulses(epoch)

def save_pulses(epoch, n=5):
    noise = np.random.normal(0, 1, (n, latent_dim))
    gen_pulses = generator.predict(noise)
    for pulse in gen_pulses:
        save_pulse(epoch, pulse)

def save_pulse(epoch, pulse):
    x = treated_data[0]['time_cut']
    y = pulse

    plt.figure(figsize=(15, 5))

    plt.plot(x, y, label="signal")
    plt.xlabel("Time(s)")
    plt.ylabel("Signal (nA)")
    plt.title(f'Generated Signal from epoch: {epoch}')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()

    path = f'generated/epoch_{epoch}'

    Path(path).mkdir(parents=True, exist_ok=True)

    plt.savefig(f"{path}/{str(uuid.uuid4())[:8]}")
    plt.close()

# Train the GAN
train(epochs=1000, batch_size=16, save_interval=50)


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 250ms/step




0 [D loss: 0.7829387187957764, acc.: 45.3125%] [G loss: [array(0.78807676, dtype=float32), array(0.78807676, dtype=float32), array(0.40625, dtype=float32)]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 113ms/step


NameError: name 'treated_data' is not defined

Does only output random noise, does no seem to learn any patterns. But at least, the noise is not identical