In [None]:
from numpy import argmax
from tqdm import tqdm
from sklearn import metrics
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import MinMaxScaler
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, f1_score, accuracy_score, precision_score, recall_score

import random
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

#SMOTE
def SMOTE(X, n_nbrs, n_samples):
    synthetic_data=[]
    nbrs = NearestNeighbors(n_neighbors=n_nbrs+1, algorithm='auto').fit(X)
    distances, indices = nbrs.kneighbors(X)
    for i in range(len(X)):
        for _ in range(n_samples):
            idx=random.randint(1, n_nbrs-1)
            selected_nbr=indices[i][idx]
            coef=np.random.uniform(0,1,1)[0]
            syn_sample=coef*X[i]+(1-coef)*X[selected_nbr]
            synthetic_data.append(syn_sample)
    return synthetic_data


# Simple RBM class
def sample(probabilities, mode='continuous'):
    ''' Sample a tensor based on the probabilities (A tensor given by get_probabilities)'''
    if mode=='continuous':
        return probabilities
    elif mode=='binary':
        return tf.floor(probabilities + tf.random.uniform(tf.shape(probabilities), 0, 1))



class RBM:

    def __init__(self, n_visible, n_hidden, lr, epochs, mode='continuous'):
        ''' Initialize a model for an RBM with one layer of hidden units '''
        self.mode = mode # bernoulli or gaussian RBM
        self.n_hidden = n_hidden #  Number of hidden nodes
        self.n_visible = n_visible # Number of visible nodes
        self.lr = lr # Learning rate for the CD algorithm
        self.epochs = epochs # Number of iterations to run the algorithm for

        # Initialize weights and biases
        with tf.name_scope('Weights'):
            self.W = tf.Variable(tf.random.normal([self.n_visible, self.n_hidden], mean=0., stddev=4 * np.sqrt(6. / (self.n_visible + self.n_hidden))), name="weights")
        tf.summary.histogram('weights',self.W)
        self.vb = tf.Variable(tf.zeros([1, self.n_visible]),tf.float32, name="visible_bias")
        self.hb = tf.Variable(tf.zeros([1, self.n_hidden]),tf.float32, name="hidden_bias")


    def get_probabilities(self, layer, val):
        ''' Return a tensor of probabilities associated with the layer specified'''
        if layer == 'hidden':
            with tf.name_scope("Hidden_Probabilities"):
                return tf.nn.sigmoid(tf.matmul(val, self.W) + self.hb)

        elif layer == 'visible':
            with tf.name_scope("Visible_Probabilities"):
                return tf.nn.sigmoid(tf.matmul(val, tf.transpose(self.W)) + self.vb)


    def CD(self, v, K=1):
        ''' K-step Contrastive Divergence using Gibbs sampling. Return parameters update. '''
        with tf.name_scope("Contrastive_Divergence"):
            h_prob = self.get_probabilities('hidden', v)
            h_state = sample(h_prob, mode=self.mode)
            pos_divergence = tf.matmul(tf.transpose(v), h_prob) # Positive Divergence + h(v).v^T

            fake_v_prob = self.get_probabilities('visible', h_state)
            fake_v_state = fake_v_prob #sample(fake_v_prob)

            fake_h_prob = self.get_probabilities('hidden', fake_v_state)
            fake_h_state = sample(fake_h_prob, mode=self.mode)

            for i in range(K-1): # Number of steps to run the algorithm

                fake_v_prob = self.get_probabilities('visible', fake_h_state)
                fake_v_state = fake_v_prob #sample(fake_v_prob)

                fake_h_prob = self.get_probabilities('hidden', fake_v_state)
                fake_h_state = sample(fake_h_prob, mode=self.mode)

            neg_divergence = tf.matmul(tf.transpose(fake_v_state), fake_h_prob) # Negative Divergence - h(v').v'^T

            dW = pos_divergence-neg_divergence
            dvb = v-fake_v_state
            dhb = h_prob-fake_h_prob

            # Similarity between reconstructed visible layer and input during training. 
            self.rec_error = tf.reduce_mean(tf.squared_difference(v, fake_v_state))
            tf.summary.scalar('reconstruction_error', self.rec_error)

            self.div = tf.reduce_mean(tf.abs(dW))
            tf.summary.scalar('weights_increment', self.div)

            return dW, dvb, dhb


    def update(self, v, K=1):
        batch_size = tf.cast(tf.shape(v)[0], tf.float32) # batch size
        dW, dvb, dhb = self.CD(v, K=K) # contrastive divergence

        delta_w = (self.lr/batch_size)*dW # weight gradient
        delta_vb = (self.lr/batch_size)*(tf.reduce_sum(dvb, 0, keep_dims=True)) # visible bias gradient
        delta_hb = (self.lr/batch_size)*(tf.reduce_sum(dhb, 0, keep_dims=True)) # hidden bias gradient

        train_op = [self.W.assign_add(delta_w), self.vb.assign_add(delta_vb), self.hb.assign_add(delta_hb)] 
        return train_op


    def gibbs(self, steps, v):
        ''' Use the Gibbs sampler for a network of hidden and visible units. Return a sampled version of the input'''
        with tf.name_scope("Gibbs_sampling"):
            for i in range(steps): # Number of steps to run the algorithm
                hidden_p = self.get_probabilities('hidden', v) # v: input data
                h = sample(hidden_p, mode=self.mode)

                visible_p = self.get_probabilities('visible', h)
                v = visible_p
                #v = sample(visible_p)
            return visible_p


    def free_energy(self, v):
        ''' Compute the free energy for a visible state'''
        vbias_term = tf.matmul(v, tf.transpose(self.vb))
        x_b = tf.matmul(v, self.W) + self.hb
        hidden_term = tf.reduce_sum(tf.log(1 + tf.exp(x_b)))
        return - hidden_term - vbias_term


    def get_feature_map(self):
        ''' Return hidden features'''
        ft_map = {}
        for k in range(self.n_hidden):
            ft_map[k] = self.get_probabilities('visible', tf.expand_dims(tf.one_hot(k+1, self.n_hidden),0))
        return ft_map

### Simulate data for classification

In [2]:
#simulate data
X, y = make_classification(n_samples=1000, n_features=30, n_informative=20, n_classes=2,weights=[0.9,0.1])

#train/test split
X_train, X_test, y_train, y_test=train_test_split(X, y, test_size=0.2, random_state=0)

#minmax scaling
scaler = MinMaxScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

### Generate synthetic samples using SMOTE

In [3]:
# collect samples of the minority class
pos_class_train=[]
for i in range(len(y_train)):
    if y_train[i]==1:
        pos_class_train.append(X_train[i])
pos_class_train=np.array(pos_class_train)

pos_class_test=[]
for i in range(len(y_test)):
    if y_test[i]==1:
        pos_class_test.append(X_test[i])
pos_class_test=np.array(pos_class_test)

# generate synthetic samples
synthetic_data=SMOTE(pos_class_train, 5, 15)
synthetic_data=np.array(synthetic_data)

### Apply RBM to samples generated by SMOTE

In [4]:
# Initialize RBM Model
rbm_model = RBM(n_visible = 30, n_hidden = 5, lr = tf.constant(0.05, tf.float32), epochs = 1000, mode='continuous')

# Placeholder for the visible layer of the RBM computation graph.
v = tf.placeholder(tf.float32, shape=[None, rbm_model.n_visible], name="visible_layer")

# Update rule
k=1
train_op = rbm_model.update(v, K=k)

# Free energy
energy = rbm_model.free_energy(v=v)
tf.summary.scalar('free_energy', tf.reduce_mean(energy))

# Merge summaries for Tensorboard visualization
summary = tf.summary.merge_all()
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

#Training
for epoch in tqdm(range(rbm_model.epochs)):
    if epoch % 100 == 0:
        result = sess.run([rbm_model.rec_error, summary], feed_dict = {v: pos_class_train.reshape(len(pos_class_train),-1).astype(np.float32)})
        if epoch % 10 == 0: 
            print("Reconstruction error at step {}: {:.3f}".format(epoch, result[0]))
    sess.run(train_op, feed_dict = {v: pos_class_test.reshape(len(pos_class_test),-1).astype(np.float32)})

Instructions for updating:
keep_dims is deprecated, use keepdims instead


  0%|          | 0/1000 [00:00<?, ?it/s]

Reconstruction error at step 0: 0.149
Reconstruction error at step 100: 0.027
Reconstruction error at step 200: 0.025


100%|██████████| 1000/1000 [00:00<00:00, 4494.78it/s]

Reconstruction error at step 300: 0.025
Reconstruction error at step 400: 0.025
Reconstruction error at step 500: 0.025
Reconstruction error at step 600: 0.025
Reconstruction error at step 700: 0.025
Reconstruction error at step 800: 0.025
Reconstruction error at step 900: 0.025





In [5]:
# Correct synthetic samples of SMOTE
rbm_samples=[]
for i in range(len(synthetic_data)):
    if i % 100 == 0:
        print((i,len(synthetic_data)))
    rbm_syn=rbm_model.gibbs(1, v=v).eval(session=sess, feed_dict={v: synthetic_data[i].reshape(1,-1).astype(np.float32)})
    rbm_samples.append(rbm_syn)

(0, 1215)
(100, 1215)
(200, 1215)
(300, 1215)
(400, 1215)
(500, 1215)
(600, 1215)
(700, 1215)
(800, 1215)
(900, 1215)
(1000, 1215)
(1100, 1215)
(1200, 1215)


### Classification without synthetic samples

In [6]:
def to_label(prob,thres):
    return (prob>=thres).astype(int)

logit=LogisticRegression(random_state=0).fit(X_train, y_train)
yhat=logit.predict_proba(X_test)[:,1]

print("f1_score of a logistic regression without synthetic samples is %s" % f1_score(y_test,to_label(yhat, 0.5)))


f1_score of a logistic regression without synthetic samples is 0.3448275862068966


### Classification with synthetic samples

In [7]:
rbm_s=[item[0] for item in rbm_samples]
rbm_s=np.array(rbm_s)
X_syn=np.concatenate((X_train, rbm_s))
y_syn=np.concatenate((y_train,np.array([1]*len(rbm_s))))

In [12]:
logit=LogisticRegression(random_state=0).fit(X_syn, y_syn)
yhat=logit.predict_proba(X_test)[:,1]
print("f1_score of a logistic regression with synthetic samples is %s" % f1_score(y_test,to_label(yhat, 0.5)))

f1_score of a logistic regression with synthetic samples is 0.6229508196721311
