### Libraries

In [None]:
import numpy as np
from numpy import genfromtxt
from numpy import ones
from numpy import zeros
from numpy.random import randint
from numpy.random import randn

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from imblearn.over_sampling import SMOTE

### SMOTE

In [None]:
# enter the target commodity
commodity = 'Co'
features_labels_file = f'./{commodity}/features_labels_{commodity}.csv'
features_labels = genfromtxt(features_labels_file, delimiter=',')
features = features_labels[:, :-1]
labels = features_labels[:, -1]

# SMOTE
smote = SMOTE(random_state=1)
X_sm, y_sm = smote.fit_resample(features, labels)
smote_samples = np.concatenate((X_sm, y_sm.reshape(y_sm.shape[0], 1)), axis=1)
# np.savetxt('SMOTE_Samples.csv', smote_samples, delimiter=',')

X_positive = smote_samples[np.where(smote_samples[:, -1]==1)]
X_positive = X_positive[:, 0:-1]

### GAN

In [None]:
# define the standalone discriminator model
def define_discriminator(n_inputs):
    model = Sequential()
    model.add(Dense(25, activation='relu', kernel_initializer='he_uniform', input_dim=n_inputs))
    model.add(Dense(1, activation='sigmoid'))
    # compile model
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

# define the standalone generator model
def define_generator(latent_dim, n_outputs):
    model = Sequential()
    model.add(Dense(15, activation='relu', kernel_initializer='he_uniform', input_dim=latent_dim))
    model.add(Dense(n_outputs, activation='linear'))
    return model

# define the combined generator and discriminator model for updating the generator
def define_gan(generator, discriminator):
    # make weights in the discriminator not trainable
    discriminator.trainable = False
    # connect them
    model = Sequential()
    # add generator
    model.add(generator)
    # add the discriminator
    model.add(discriminator)
    # compile model
    model.compile(loss='binary_crossentropy', optimizer='adam')
    return model

# sample real data
def sample_real_data(n):
    X_rand = X_positive[randint(X_positive.shape[0], size=n), :]
    y_rand = ones((n, 1))
    return X_rand, y_rand

# generate points in the latent space as input for the generator
def generate_latent_points(latent_dim, n=int(features.shape[0]-(2*labels.sum()))):
    # generate points in the latent space
    x_input = randn(latent_dim * n)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n, latent_dim)
    return x_input

# use the generator to generate n fake examples with class labels
def generate_fake_samples(generator, latent_dim, n=int(features.shape[0]-(2*labels.sum()))):
    # generate points in the latent space
    x_input = generate_latent_points(latent_dim, n)
    # predict outputs
    X_fake = generator.predict(x_input)
    # create class labels
    y_fake = zeros((n, 1))
    return X_fake, y_fake

# evaluate the discriminator
def summarize_performance(epoch, generator, discriminator, latent_dim):
    # evaluate the discriminator on real examples
    _, acc_real = discriminator.evaluate(X_sm, y_sm, verbose=0)
    # prepare fake examples
    x_fake, y_fake = generate_fake_samples(generator, latent_dim)
    # evaluate discriminator on fake examples
    _, acc_fake = discriminator.evaluate(x_fake, y_fake, verbose=0)
    # summarize the discriminator performance
    print(epoch, acc_real, acc_fake)
    if epoch == 999:
        x_fake_ones = ones((x_fake.shape[0], 1))
        x_fake = np.concatenate((x_fake, x_fake_ones), axis=1)
        smote_gan_samples = np.concatenate((features_labels, x_fake), axis=0)
        np.savetxt(f'./{commodity}/smote_gan_{commodity}.csv', smote_gan_samples, delimiter=',')

# train the generator and discriminator
def train(g_model, d_model, gan_model, latent_dim, n_epochs=1000, n_batch=128, n_eval=100):
    # determine half the size of one batch for updating the discriminator
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(n_epochs):
        # prepare real samples
        x_real, y_real = sample_real_data(half_batch)
        # prepare fake examples
        x_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
        # update the discriminator
        d_model.train_on_batch(x_real, y_real)
        d_model.train_on_batch(x_fake, y_fake)
        # prepare points in the latent space as input for the generator
        x_gan = generate_latent_points(latent_dim, n_batch)
        # create inverted labels for the fake samples
        y_gan = ones((n_batch, 1))
        # update the generator via the discriminator's error
        gan_model.train_on_batch(x_gan, y_gan)
        # evaluate the model every n_eval epochs
        if (i+1) % n_eval == 0:
            summarize_performance(i, g_model, d_model, latent_dim)

In [None]:
# size of the latent space
latent_dim = 20
# create the discriminator
discriminator = define_discriminator(features.shape[1])
# create the generator
generator = define_generator(latent_dim, features.shape[1])
# create the gan
gan_model = define_gan(generator, discriminator)
# train the model
train(generator, discriminator, gan_model, latent_dim)