# Data augmentation using SMOTE-GAN

### Ehsan Farahbakhsh, Nathan Wake, R. Ditmar M&uuml;ller

*EarthByte Group, School of Geosciences, University of Sydney, NSW 2006, Australia*

This notebook is crafted to augment positive data, specifically known mineral occurrences, within the framework of mineral prospectivity mapping. It utilises a hybrid approach that integrates the Synthetic Minority Oversampling Technique (SMOTE) with Generative Adversarial Networks (GAN) to enhance the dataset.

### Libraries

In [None]:
from imblearn.over_sampling import SMOTE
import numpy as np
from numpy import ones
from numpy import zeros
from numpy.random import randint
from numpy.random import randn
import pandas as pd
from scipy import stats

from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.regularizers import L1

### SMOTE

In [None]:
Xy_train_file = './Datasets/Outputs/Xy_train.csv'
# Xy_train_file = './Datasets/Outputs/Xy_train_important.csv'
Xy_train = pd.read_csv(Xy_train_file, index_col=False)

Xy_train_positive = Xy_train.loc[Xy_train['label']==1]
Xy_train_unlabelled = Xy_train.loc[Xy_train['label']==0]

features_label_lst = Xy_train.columns.tolist()
num_features_lst = []
cat_features_lst = []

for column in features_label_lst:
    if column.startswith('Intrusions_') or column.startswith('MetamorphicFacies') or column.startswith('RockUnits'):
        cat_features_lst.append(column)
    else:
        num_features_lst.append(column)

num_features_labels = Xy_train[num_features_lst]
cat_features = Xy_train[cat_features_lst]

Xy_train_num = num_features_labels.to_numpy()
num_features = Xy_train_num[:, :-1]
num_labels = Xy_train_num[:, -1]

# SMOTE
smote = SMOTE(random_state=42)
X_sm, y_sm = smote.fit_resample(num_features, num_labels)
smote_samples = np.concatenate((X_sm, y_sm.reshape(-1, 1)), axis=1)
X_positive = smote_samples[np.where(smote_samples[:, -1]==1)]
X_positive = X_positive[:, 0:-1]

### GAN

In [None]:
learning_rate = 1e-2

# define the standalone discriminator model
def define_discriminator(n_inputs):
    model = Sequential()
    model.add(Dense(int(n_inputs*10), activation='LeakyReLU', activity_regularizer=L1(learning_rate), kernel_initializer='he_uniform', input_dim=n_inputs))
    model.add(Dense(1, activation='sigmoid', activity_regularizer=L1(learning_rate)))
    # compile model
    model.compile(loss='binary_crossentropy', optimizer='Adam', metrics=['accuracy'])
    return model

# define the standalone generator model
def define_generator(latent_dim, n_outputs):
    model = Sequential()
    model.add(Dense(int(n_outputs*5), activation='LeakyReLU', activity_regularizer=L1(learning_rate), kernel_initializer='he_uniform', input_dim=latent_dim))
    model.add(Dense(n_outputs, activation='linear', activity_regularizer=L1(learning_rate)))
    return model

# define the combined generator and discriminator model for updating the generator
def define_gan(generator, discriminator):
    # make weights in the discriminator not trainable
    discriminator.trainable = False
    # connect them
    model = Sequential()
    # add generator
    model.add(generator)
    # add the discriminator
    model.add(discriminator)
    # compile model
    model.compile(loss='binary_crossentropy', optimizer='adam')
    return model

n_samples = int(num_features.shape[0]-(2*num_labels.sum()))

# sample real data
def sample_real_data(n=n_samples):
    X_rand = X_positive[randint(X_positive.shape[0], size=n), :]
    y_rand = ones((n, 1))
    return X_rand, y_rand

# generate points in the latent space as input for the generator
def generate_latent_points(latent_dim, n=n_samples):
    # generate points in the latent space
    x_input = randn(latent_dim * n)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n, latent_dim)
    return x_input

# use the generator to generate n fake examples with class labels
def generate_fake_samples(generator, latent_dim, n=n_samples):
    # generate points in the latent space
    x_input = generate_latent_points(latent_dim, n)
    # predict outputs
    X_fake = generator.predict(x_input, verbose=0)
    # create class labels
    y_fake = zeros((n, 1))
    return X_fake, y_fake

# evaluate the discriminator
def summarize_performance(epoch, generator, discriminator, latent_dim):
    # sample real data
    x_real, y_real = sample_real_data()
    # evaluate the discriminator on real examples
    _, acc_real = discriminator.evaluate(X_sm, y_sm, verbose=0)
    # prepare fake examples
    x_fake, y_fake = generate_fake_samples(generator, latent_dim)
    # evaluate the discriminator on fake examples
    _, acc_fake = discriminator.evaluate(x_fake, y_fake, verbose=0)
    return x_real, x_fake, acc_real, acc_fake

# train the generator and discriminator
def train(g_model, d_model, gan_model, latent_dim, n_epochs, n_batch=128, n_eval=100):
    # determine half the size of one batch, for updating the discriminator
    half_batch = int(n_batch / 2)
    acc_real_all = []
    acc_fake_all = []
    # manually enumerate epochs
    for i in range(n_epochs):
        # prepare real samples
        x_real, y_real = sample_real_data(half_batch)
        # prepare fake examples
        x_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
        # update discriminator
        d_model.train_on_batch(x_real, y_real)
        d_model.train_on_batch(x_fake, y_fake)
        # prepare points in the latent space as input for the generator
        x_gan = generate_latent_points(latent_dim, n_batch)
        # create inverted labels for the fake samples
        y_gan = ones((n_batch, 1))
        # update the generator via the discriminator's error
        gan_model.train_on_batch(x_gan, y_gan)
        
        # plot data points
        # ----------------
        # evaluate the model every n_eval epochs
        if (i+1) % n_eval == 0:
            # summarize discriminator performance
            x_real, x_fake, acc_real, acc_fake = summarize_performance(i, g_model, d_model, latent_dim)
            print(i+1, acc_real, acc_fake)
    
        if i == n_epochs-1:
            # remove outliers
            x_fake = x_fake[(np.abs(stats.zscore(x_fake)) < 3).all(axis=1)]
            x_fake_ones = np.full((x_fake.shape[0], 1), 2)
            x_fake = np.concatenate((x_fake, x_fake_ones), axis=1)
            x_fake_num = x_fake.shape[0]
            smote_gan_samples = np.concatenate((Xy_train_num, x_fake), axis=0)
            smote_gan_samples = pd.DataFrame(smote_gan_samples, columns=num_features_lst)
    
    return smote_gan_samples, x_fake_num

### Multiple Iterations

In [None]:
# size of the latent space
latent_dim = int(num_features.shape[1]*0.5)

n_iter = 10

Xy_train_cat = Xy_train[cat_features_lst]
Xy_train_cat_positive = Xy_train_positive[cat_features_lst]
Xy_train_cat_positive_mode = Xy_train_cat_positive.mode()

for i in range(n_iter):
    print('--------------------')
    print(f'Iteration {i+1}')
    print('--------------------')

    # create the discriminator
    discriminator = define_discriminator(num_features.shape[1])
    # create the generator
    generator = define_generator(latent_dim, num_features.shape[1])
    # create the gan
    gan_model = define_gan(generator, discriminator)
    # train model
    smote_gan_samples, x_fake_num = train(generator, discriminator, gan_model, latent_dim, n_epochs=5000)
    
    Xy_train_cat_positive_mode_ = Xy_train_cat_positive_mode.loc[Xy_train_cat_positive_mode.index.repeat(x_fake_num)].reset_index(drop=True)
    Xy_train_cat_ = pd.concat([Xy_train_cat, Xy_train_cat_positive_mode_]).reset_index(drop=True)
    smote_gan_samples = pd.concat([smote_gan_samples, Xy_train_cat_], axis=1).reset_index(drop=True)
    smote_gan_samples = smote_gan_samples[features_label_lst]
    smote_gan_samples.to_csv(f'./Datasets/Outputs/smote_gan_important_{i+1}.csv', index=False)