In [1]:
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from imblearn.over_sampling import  SMOTE, BorderlineSMOTE, SVMSMOTE
from sklearn.metrics import confusion_matrix
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier
from numpy import ones
from numpy import zeros
from numpy.random import rand
from numpy.random import randint
from tensorflow.keras.optimizers import Adam
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LeakyReLU
from sklearn.model_selection import train_test_split

In [2]:
data = pd.read_csv('heart_2020_cleaned.csv')


#//-------------------------------------------------------------

## Here the number of samples (the first parameter of the experiments) is chosen

# Select number of data points to experiment
data = data.iloc[0:1000]
# transform labels
labels = np.where(data['HeartDisease'] == 'Yes', 1, 0)
# drop columns
features = data.drop(columns = ['HeartDisease', 'KidneyDisease', 'SkinCancer'])

In [3]:
def preprocessing(dataframe):

    categorical_var = []
    bool_var = []
    numeric_var = []

    for col in data.columns:
        if data[col].dtypes == 'object':
            categorical_var.append(col)
        elif data[col].dtypes == 'bool':
            bool_var.append(col)
        else:
            numeric_var.append(col)

    # Initialize an encoder
    encoder_cat = OneHotEncoder(sparse=False)

    # Transform the categorical features
    one_hot_cat = encoder_cat.fit_transform(data[categorical_var])

    # Initialize encoder
    encoder_bool = OneHotEncoder(sparse=False, drop='first')

    # Transform the boolean features
    one_hot_bool = encoder_bool.fit_transform(data[bool_var])

    # Scale numeric features with minmax scaler
    scaler = MinMaxScaler()
    numeric_data = scaler.fit_transform(data[numeric_var])

    # Stack all the matrices
    new = np.hstack((one_hot_bool, one_hot_cat, numeric_data))

    # Remove nan values
    nan_indexes = np.argwhere(np.isnan(new))
    new = np.delete(new, nan_indexes[:,0], axis=0)

    return new

In [4]:
def generate_data_classical_models(original_data, original_labels, target_label, other_label, percentage, model_class):

    model = model_class()

    features_target_class = original_data[original_labels == target_label]

    number_new_samples = int(features_target_class.shape[0] * percentage)

    features_other_class = np.ones((features_target_class.shape[0] + number_new_samples, features_target_class.shape[1]))

    X_temp = np.vstack((features_other_class, features_target_class))

    y_temp = np.hstack((np.full(features_other_class.shape[0], other_label), np.full(features_target_class.shape[0], target_label)))


    X, y = model.fit_resample(X_temp, y_temp)


    return X[X_temp.shape[0]:,:], y[X_temp.shape[0]:]


In [5]:
class gan1:

    def __init__(self, latent_space):

        self.latent_dim = latent_space
        # create the discriminator
        self.d_model = self.define_discriminator()
        # create the generator
        self.g_model = self.define_generator()
        # create the gan
        self.gan_model = self.define_gan()


    # define the standalone discriminator model
    def define_discriminator(self):

        model = Sequential()

        model.add(Dense(16, activation='relu'))
        model.add(Dense(8, activation='relu'))

        model.add(Dense(1, activation='sigmoid'))
        # compile model
        opt = Adam(learning_rate=0.0002, beta_1=0.5)
        model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
        return model


    # define the standalone generator model
    def define_generator(self):
        model = Sequential()

        model.add(Dense(self.latent_dim, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(20, activation='relu'))
        model.add(Dense(30, activation='relu'))
        model.add(Dense(52, activation='sigmoid'))

        return model


    def define_gan(self):

        # make weights in the discriminator not trainable
        self.d_model.trainable = False
        # connect them
        model = Sequential()
        # add generator
        model.add(self.g_model)
        # add the discriminator
        model.add(self.d_model)
        # compile model
        opt = Adam(learning_rate=0.0002, beta_1=0.5)
        model.compile(loss='binary_crossentropy', optimizer=opt)
        return model


    # generate points in latent space as input for the generator
    def generate_latent_points(self, n_samples):
        # generate points in the latent space
        x_input = rand(self.latent_dim * n_samples)
        # reshape into a batch of inputs for the network
        x_input = x_input.reshape(n_samples, self.latent_dim)
        return x_input


    # use the generator to generate n fake examples, with class labels
    def generate_fake_samples(self, n_samples):
        # generate points in latent space
        x_input = self.generate_latent_points(n_samples)
        # predict outputs
        X = self.g_model.predict(x_input)
        # create 'fake' class labels (0)
        y = zeros((n_samples, 1))
        return X, y


    # select real samples
    def generate_real_samples(self, dataset, n_samples):
        # choose random instances
        ix = randint(0, dataset.shape[0], n_samples)
        # retrieve selected samples
        X = dataset[ix]
        # generate 'real' class labels (1)
        y = ones((n_samples, 1))
        return X, y


    def summarize_performance(self, epoch, dataset, n_samples=100):
        # prepare real samples
        X_real, y_real = self.generate_real_samples(dataset, n_samples)
        # evaluate discriminator on real examples
        _, acc_real = self.d_model.evaluate(X_real, y_real, verbose=0)
        # prepare fake examples
        x_fake, y_fake = self.generate_fake_samples(n_samples)
        # evaluate discriminator on fake examples
        _, acc_fake = self.d_model.evaluate(x_fake, y_fake, verbose=0)
        # summarize discriminator performance
        filename = 'generator_model_%03d.h5' % (epoch + 1)
        self.g_model.save(filename)
        #print('>Accuracy real: %.0f%%, fake: %.0f%%' % (acc_real*100, acc_fake*100))

    def train(self, dataset, n_epochs=100, n_batch=256):
        bat_per_epo = int(dataset.shape[0] / n_batch)
        half_batch = int(n_batch / 2)
        # manually enumerate epochs
        for i in range(n_epochs):
            # enumerate batches over the training set
            for j in range(bat_per_epo):
                # get randomly selected 'real' samples
                X_real, y_real = self.generate_real_samples(dataset, half_batch)
                # generate 'fake' examples
                X_fake, y_fake = self.generate_fake_samples(half_batch)
                # create training set for the discriminator
                X, y = np.vstack((X_real, X_fake)), np.vstack((y_real, y_fake))
                # update discriminator model weights
                d_loss, _ = self.d_model.train_on_batch(X, y)
                # prepare points in latent space as input for the generator
                X_gan = self.generate_latent_points(n_batch)
                # create inverted labels for the fake samples
                y_gan = ones((n_batch, 1))
                # update the generator via the discriminator's error
                g_loss = self.gan_model.train_on_batch(X_gan, y_gan)
                # summarize loss on this batch
                #print('>%d, %d/%d, d=%.3f, g=%.3f' % (i+1, j+1, bat_per_epo, d_loss, g_loss))

            if (i+1) % 50 == 0:
                self.summarize_performance(i, dataset)

    def bring_generator(self):

        return self.g_model


In [6]:
# Experimental procedure

# In this section the different configurations are implemented

##-------------------------------------------------------------------------------------------

features = preprocessing(features)

X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.15, random_state=42)


# Classical models to explore
classical_models = [SMOTE, SVMSMOTE]
classical_models_str = ['SMOTE',  'SVMSMOTE']

# Save the results in a dictionary
results = {}

# New proposed model
new_models_LS = [30, 50]
new_models_str = ['GAN with LS of 30', 'GAN with LS of 50']

# Iterate through the classical models
for i in range(len(classical_models)):


    new_features_yes, new_labels_yes = generate_data_classical_models(X_train, y_train, 1, 0, 0.8, classical_models[i])
    new_features_no, new_labels_no = generate_data_classical_models(X_train, y_train, 0, 1, 0.5, classical_models[i])

    synthetic_data = np.vstack((new_features_yes,new_features_no))
    synthetic_labels = np.hstack((new_labels_yes, new_labels_no))
    s_X, s_y = shuffle(synthetic_data, synthetic_labels)

    # train knn with synthetic data
    knn = KNeighborsClassifier(5)

    knn.fit(s_X, s_y)

    results[classical_models_str[i]] = knn.score(X_test, y_test)

for i in range(len(new_models_LS)):

    gan_yes = gan1(new_models_LS[i])

    gan_yes.train(X_train[y_train == 1], n_epochs=100, n_batch=256)

    generator_yes = gan_yes.bring_generator()

    num_new_samples = int(X_train[y_train == 1].shape[0] * 0.8)

    # generate points
    latent_points = gan_yes.generate_latent_points(num_new_samples)

    # generate samples
    s_X_yes = generator_yes.predict(latent_points)

    gan_no= gan1(new_models_LS[i])

    gan_no.train(X_train[y_train == 0], n_epochs=100, n_batch=256)

    generator_no = gan_no.bring_generator()

    num_new_samples = int(X_train[y_train == 0].shape[0] * 0.5)

    # generate points
    latent_points = gan_no.generate_latent_points(num_new_samples)

    # generate samples
    s_X_no = generator_no.predict(latent_points)

    s_X_temp = np.vstack((s_X_yes, s_X_no))


    labels_temp = np.zeros(s_X_temp.shape[0])
    labels_temp[0:s_X_yes.shape[0]] = 1

    s_X, s_y = shuffle(s_X_temp, labels_temp)

    # train knn with synthetic data
    knn = KNeighborsClassifier(5)

    knn.fit(s_X, s_y)

    results[new_models_str[i]] = knn.score(X_test, y_test)


print(results)



{'SMOTE': 0.9933333333333333, 'SVMSMOTE': 0.94, 'GAN with LS of 30': 0.12666666666666668, 'GAN with LS of 50': 0.6133333333333333}


In [7]:
print(results)

{'SMOTE': 0.9933333333333333, 'SVMSMOTE': 0.94, 'GAN with LS of 30': 0.12666666666666668, 'GAN with LS of 50': 0.6133333333333333}
