**This jupyer file together with two other jupyer files (the three files are: dataset-builder, utils, and tester) contain the code for redoing the tests conducted in the following paper:**


> Nazari, Ehsan and  Branco, Paula "On Oversampling via Generative Adversarial Networks under Different Data Difficult Factors " International Workshop on Learning with Imbalanced Domains: Theory and Applications. PMLR, 2021.

This ipynb file contains the code for building a CGAN, and the needed functions for conducting our tests. 
The code for building CGAN is obtained from: https://github.com/eriklindernoren/Keras-GAN/tree/master/cgan 

In [None]:
from keras.datasets import mnist
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, multiply
from keras.layers import BatchNormalization, Activation, Embedding, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam

import matplotlib.pyplot as plt

import numpy as np

import os
from math import sqrt

class CGANRAW():
    def __init__(self,FEATURES, dominant_class_count, imbalance_rate):
        self.features_squared = int(sqrt(FEATURES))
        if(self.features_squared != sqrt(FEATURES)):
          raise Exception("number of features must be a whole number") 
        self.img_shape = (FEATURES,1)
        self.num_classes = 2
        self.latent_dim = 50
        self.classes_list = [0,1]
        self.name = 'class0_quantity_of_'+str(dominant_class_count)+'_with_imbalance_rate_of_'+str(imbalance_rate)+'_images'
        os.makedirs(self.name, exist_ok=True)

        optimizer = Adam(0.0002, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss=['binary_crossentropy'],
            optimizer=optimizer,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise and the target label as input
        # and generates the corresponding digit of that label
        noise = Input(shape=(self.latent_dim,))
        label = Input(shape=(1,))
        img = self.generator([noise, label])

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated image as input and determines validity
        # and the label of that image
        valid = self.discriminator([img, label])

        # The combined model  (stacked generator and discriminator)
        # Trains generator to fool discriminator
        self.combined = Model([noise, label], valid)
        self.combined.compile(loss=['binary_crossentropy'],
            optimizer=optimizer)

    def build_generator(self):

        model = Sequential()

        model.add(Dense(256, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(1024))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(np.prod(self.img_shape), activation='tanh'))
        model.add(Reshape(self.img_shape))

        # model.summary()

        noise = Input(shape=(self.latent_dim,))
        label = Input(shape=(1,), dtype='int32')
        label_embedding = Flatten()(Embedding(self.num_classes, self.latent_dim)(label))

        model_input = multiply([noise, label_embedding])
        img = model(model_input)

        return Model([noise, label], img)

    def build_discriminator(self):

        model = Sequential()

        model.add(Dense(512, input_dim=np.prod(self.img_shape)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.4))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.4))
        model.add(Dense(1, activation='sigmoid'))
        # model.summary()

        img = Input(shape=self.img_shape)
        label = Input(shape=(1,), dtype='int32')

        label_embedding = Flatten()(Embedding(self.num_classes, np.prod(self.img_shape))(label))
        flat_img = Flatten()(img)

        model_input = multiply([flat_img, label_embedding])

        validity = model(model_input)

        return Model([img, label], validity)

    def train(self, epochs,X_train,y_train, batch_size=32):

        # Adversarial ground truths
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random half batch of images
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            imgs, labels = X_train[idx], y_train[idx]

            # Sample noise as generator input
            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

            # Generate a half batch of new images
            gen_imgs = self.generator.predict([noise, labels])

            # Train the discriminator
            d_loss_real = self.discriminator.train_on_batch([imgs, labels], valid)
            d_loss_fake = self.discriminator.train_on_batch([gen_imgs, labels], fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            # Condition on labels
            sampled_labels = np.random.randint(0, 2, batch_size).reshape(-1, 1)

            # Train the generator
            g_loss = self.combined.train_on_batch([noise, sampled_labels], valid)

            # Plot the progress

            # If at save interval => save generated image samples
            # if epoch % 20 == 0:
            #   self.sample_images('{}_{:05d}'.format(self.name, epoch))
              # print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))


    def sample_images(self, epoch):
        r, c = 2, 5 #do not change
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        sampled_labels0 = np.zeros(5).reshape(-1, 1)
        sampled_labels1 = np.ones(5).reshape(-1, 1)
        sampled_labels = np.concatenate((sampled_labels0,sampled_labels1))

        gen_imgs = self.generator.predict([noise, sampled_labels])

        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i,j].imshow(gen_imgs[cnt,:,:].reshape(self.features_squared,self.features_squared), cmap='gray')
                axs[i,j].set_title("class %d" % sampled_labels[cnt])
                axs[i,j].axis('off')
                cnt += 1
        fig.savefig("{}/{}.png".format(self.name,epoch))
        plt.close()



Imbalacing the given dataset:

In [None]:
from sklearn.utils import shuffle

def imbalancer(data, label, dominant_class_count, imbalance_rate):
  if(dominant_class_count>10000):
    raise Exception("Maximum number of samples for class0 is 10,000") 
  x_train_class0 = data[label == 0]
  x_train_class0 = x_train_class0[0:dominant_class_count]
  x_train_class1 = data[label == 1]
  class1_count = int(dominant_class_count*imbalance_rate)
  x_train_class1 = x_train_class1[0:class1_count]

  data = np.concatenate((x_train_class0,x_train_class1))
  label = np.concatenate( (np.zeros(x_train_class0.shape[0]), np.ones(x_train_class1.shape[0]) ) ) 
  
  return shuffle(data, label, random_state=0)


balancing the dataset by augmenting samples for the second class via given cgan:

In [None]:
def balancer(generator, dominant_class_count, imbalance_rate,
             epochs, train_x, train_y):
  gan = generator(train_x.shape[-1], dominant_class_count, imbalance_rate)
  gan.train(epochs, train_x, train_y)
  
  class0_count = np.count_nonzero(train_y == 0)
  class1_count = np.count_nonzero(train_y == 1)
  if class0_count < class1_count:
    raise Exception("class 1 has more samples than class zero! (should be vice versa)") 
  how_many_class1_samples_should_be_generated = class0_count - class1_count

  
  labels = np.ones(how_many_class1_samples_should_be_generated).reshape(-1,1)
  noise = np.random.normal(0, 1, (how_many_class1_samples_should_be_generated, gan.latent_dim))

  gen_imgs = gan.generator.predict([noise, labels])

  train_x = np.append(train_x, gen_imgs.reshape(-1,train_x.shape[-1],), axis=0) 
  train_y = np.append(train_y, labels.reshape(-1,))

  train_x, train_y = shuffle(train_x, train_y, random_state=0)

  return train_x, train_y

In [None]:
def print_result(precision_class_0, recall_class_0, precision_class_1, recall_class_1):
  print('precision of class zero: ', precision_class_0)
  print('recall   of  class zero: ', recall_class_0)
  print('precision of  class one: ', precision_class_1)
  print('recall of   class   one: ', recall_class_1)

In [None]:

def five_fold_train_gan_then_balance_then_train_classifier(data,
                                                           label,
                                                           dominant_class_count,
                                                           imbalance_rate,
                                                           classifier,
                                                           epochs):
  '''
  a balanced dataset with two classes, each of which having 10,000 samples must be inputted

  if the value of data_features, the sixth parameter is set to false,
  the classifier will be trained with the given dataset untouched.
  '''

  data, label = imbalancer(data, label, dominant_class_count, imbalance_rate)

  c0 = np.count_nonzero(label == 0)
  c1 = np.count_nonzero(label == 1)

  from sklearn.model_selection import KFold
  kf = KFold(n_splits=5, random_state=0, shuffle=True)
  precision_0, recall_0,f1_0, precision_1, recall_1,f1_1 = 0,0,0,0,0,0
  for train_index, test_index in kf.split(data):
    X_train, X_test = data[train_index], data[test_index]
    y_train, y_test = label[train_index], label[test_index]
    #data augmentation:
    #if the dataset is not balanced:
    if c0 != c1:
      X_train, y_train = balancer(CGANRAW,dominant_class_count,
                                  imbalance_rate, epochs, X_train, y_train)
    #data augmentation:
    p0, r0, f0,p1, r1,f1 = classifier(X_train,y_train, X_test, y_test)
    precision_0 +=p0
    recall_0 +=r0
    f1_0 +=f0
    precision_1 +=p1
    recall_1 +=r1
    f1_1 +=f1

  precision_0 /=5
  recall_0 /=5
  f1_0 /=5
  precision_1 /=5
  recall_1 /=5
  f1_1 /=5

  print_result(precision_0, recall_0, precision_1, recall_1)
  return precision_0, recall_0, f1_0, precision_1, recall_1, f1_1

In [None]:
def five_fold_train_classifier(data,
                              label,
                              dominant_class_count,
                              imbalance_rate,
                              classifier):
  '''
  a balanced dataset with two classes, each of which having 10,000 samples must be inputted
  '''

  data, label = imbalancer(data, label, dominant_class_count, imbalance_rate)

  from sklearn.model_selection import KFold
  kf = KFold(n_splits=5, random_state=0, shuffle=True)
  precision_0, recall_0,f1_0, precision_1, recall_1,f1_1 = 0,0,0,0,0,0
  for train_index, test_index in kf.split(data):
    X_train, X_test = data[train_index], data[test_index]
    y_train, y_test = label[train_index], label[test_index]
    p0, r0, f0,p1, r1,f1 = classifier(X_train,y_train, X_test, y_test)
    precision_0 +=p0
    recall_0 +=r0
    f1_0 +=f0
    precision_1 +=p1
    recall_1 +=r1
    f1_1 +=f1

  precision_0 /=5
  recall_0 /=5
  f1_0 /=5
  precision_1 /=5
  recall_1 /=5
  f1_1 /=5

  print_result(precision_0, recall_0, precision_1, recall_1)
  return precision_0, recall_0, f1_0, precision_1, recall_1, f1_1

In [None]:
import matplotlib.pyplot as plt
import os
def save_imgs(data, label,name):
  os.makedirs('sample_images_'+name, exist_ok=True)
  r, c = 2, 5 #do not change
  sampled_labels0 = np.zeros(5).reshape(-1, 1)
  sampled_labels1 = np.ones(5).reshape(-1, 1)
  sampled_labels = np.concatenate((sampled_labels0,sampled_labels1))

  class0 = data[label == 0]
  class1 = data[label == 1]

  gen_imgs = np.concatenate((class0[0:5],class1[0:5]))

  # Rescale images 0 - 1
  gen_imgs = 0.5 * gen_imgs + 0.5

  fig, axs = plt.subplots(r, c)
  cnt = 0
  for i in range(r):
      for j in range(c):
          axs[i,j].imshow(gen_imgs[cnt,:,:], cmap='gray')
          axs[i,j].set_title("class %d" % sampled_labels[cnt])
          axs[i,j].axis('off')
          cnt += 1
  fig.savefig("{}/1.png".format('sample_images_'+name))
  plt.close()