In [122]:
import torch
import numpy as np
import utils
import os


In [None]:
class RBM:

    i = 0
    
    def init_weights(shape, name = 'weights'):
        return tf.Variable(tf.truncated_normal(shape, stddev = 0.1), name = name)

    def init_biases(shape, name = 'biases'):
        return tf.Variable(tf.constant(0.1, shape = shape), name = name)
    
    def __init__(self, n_visible = 784, n_hidden = 500, k = 1):
        
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.k = k
        
        print('Start building computation graph...')
        
        self.learning_rate = tf.placeholder(dtype = tf.float32)
        
        # weights
        self.w = RBM.init_weights([n_visible, n_hidden], name = 'w')
        # biases
        self.vb = RBM.init_biases([n_visible], name = 'vb')
        self.hb = RBM.init_biases([n_hidden], name = 'hb')

    def forward(self, visible):
        f_ = tf.matmul(visible, self.w) + self.hb
        return tf.nn.sigmoid(f_)

    def backward(self, hidden):
        b_ = tf.matmul(hidden, tf.transpose(self.w)) + self.vb
        return tf.nn.sigmoid(b_)
    
    def sample_h_given_v(self, visible_sample):
        hidden_probs = self.forward(visible_sample)
        hidden_sample = tf.nn.relu(tf.sign(hidden_probs - tf.random_uniform(tf.shape(hidden_probs))))
        return hidden_sample

    def sample_v_given_h(self, hidden_sample):
        visible_probs = self.backward(hidden_sample)
        visible_sample = tf.nn.relu(tf.sign(visible_probs - tf.random_uniform(tf.shape(visible_probs))))
        return visible_sample

    def CD_k(self, visible):
        visible_sample = visible
        hidden_sample = self.sample_h_given_v(visible_sample)
        
        for i in range(self.k):
            visible_sample = self.sample_v_given_h(hidden_sample)
            hidden_sample = self.sample_h_given_v(visible_sample)
            
        hidden_probs = self.forward(visible)
        
        # gradients for weights
        positive_phase = tf.matmul(tf.transpose(visible), hidden_probs)
        negative_phase = tf.matmul(tf.transpose(visible_sample), hidden_sample)
        w_grad = (positive_phase - negative_phase) / tf.to_float(tf.shape(visible)[0])
        
        # gradients for biases
        vb_grad = tf.reduce_mean(visible - visible_sample, axis = 0)
        hb_grad = tf.reduce_mean(hidden_probs - hidden_sample, axis = 0)
        
        return w_grad, vb_grad, hb_grad
    
    def updater(self, visible):
        w_grad, vb_grad, hb_grad = self.CD_k(visible)
        
        # operators
        update_w = tf.assign(self.w, self.w + self.learning_rate * w_grad)
        update_vb = tf.assign(self.vb, self.vb + self.learning_rate * vb_grad)        
        update_hb = tf.assign(self.hb, self.hb + self.learning_rate * hb_grad)        

        return [update_w, update_vb, update_hb]
    
    def free_energy(self, visible):
        visible_term = tf.matmul(visible, tf.reshape(self.vb, [tf.shape(self.vb)[0], 1]))
        hidden_term = tf.reduce_sum(tf.log(1 + tf.exp(tf.matmul(visible, self.w) + self.hb)), axis = 1)
        return - visible_term - hidden_term
    
    def pseudo_likelihood(self, visible):
        x = tf.round(visible)
        free_energy_x = self.free_energy(x)
        split0, split1, split2 = tf.split(x, [self.i, 1, tf.shape(x)[1] - self.i - 1], axis = 1)
        xi = tf.concat([split0, 1 - split1, split2], axis = 1)
        free_energy_xi = self.free_energy(xi)
        self.i = (self.i + 1) % self.n_visible
        return tf.reduce_mean(self.n_visible * tf.log(tf.nn.sigmoid(free_energy_xi - free_energy_x)), axis = 0)

    def sampler(self, visible, steps = 15):
        v_sample = visible
        for i in range(steps):
            v_sample = self.sample_v_given_h(self.sample_h_given_v(v_sample))
        return v_sample
    
    def train(self, data, epochs, batch_size, learning_rate):
        sample_dir = './sample'
        
        x = tf.placeholder(dtype = tf.float32, shape = [None, self.n_visible])
        test_x, test_y = data.sample_batch()
        
        # operators
        updates = self.updater(x)
        pseudo_likelihood = self.pseudo_likelihood(x)
        sampler = self.sampler(x)
        
        epoch_cost = []
        epoch = 1
        with tf.Session() as sess:
   
            init = tf.global_variables_initializer()
            sess.run(init)
            print('Computation graph complied...')
            
            for i in range(epochs * data.batch_num):
                
                if i % 500 == 0:
                    samples = sess.run(sampler, feed_dict = {x: test_x})
                    samples = samples.reshape([data.batch_size, 28, 28])
                    utils.save_images(samples, [8, 8], os.path.join(sample_dir, 'iteration_%d.png' % i))
                    
                
                batch_x, _ = data.next_batch()
                sess.run(updates, feed_dict = {x: batch_x, self.learning_rate: learning_rate})
                iter_cost = sess.run(pseudo_likelihood, feed_dict = {x: batch_x})
                epoch_cost.append(iter_cost)
                
                if i is not 0 and data.batch_index is 0:
                    print('Epoch %d Cost %g' % (epoch, np.mean(epoch_cost)))
                    epoch_cost = []
                    epoch += 1
        print(test_y)
                
train_dir = {
        'X': './mnist/train-images-idx3-ubyte.gz',
        'Y': './mnist/train-labels-idx1-ubyte.gz'
        }        
            
data = utils.DataSet(data_dir = train_dir, batch_size = 64, one_hot = False)

rbm = RBM(n_visible = 784, n_hidden = 500, k = 15)
rbm.train(data = data, epochs = 30, batch_size = 64, learning_rate = 0.01)

        
        
        

In [149]:
import numpy as np
import random



def prepare_data():
    
        
    global train_samples, train_labels, test_samples, test_labels

    train_samples = open('/home/libedev/mute/mute-hero/download/dataset/new_train_samples.txt').read().strip().split('\n')
    #train_labels = [int(label) for label in open('/home/libedev/mute/mute-hero/download/dataset/new_train_labels.txt').read().strip().split('\n')]   
    
    train_labels = np.genfromtxt('/home/libedev/mute/mute-hero/download/dataset/new_train_labels.txt', encoding='ascii',dtype=int)

    test_samples = open('/home/libedev/mute/mute-hero/download/dataset/new_test_samples.txt').read().strip().split('\n')
    #test_labels = [int(label) for label in open('/home/libedev/mute/mute-hero/download/dataset/new_test_labels.txt').read().strip().split('\n')]
    
    test_labels=np.genfromtxt('/home/libedev/mute/mute-hero/download/dataset/new_test_labels.txt', encoding='ascii',dtype=int)


    
def get_random_sample(part,option):
    
    
    global train_samples, train_labels, test_samples, test_labels
    
    
    
    train_samples = open('/home/libedev/mute/mute-hero/download/dataset/new_train_samples.txt').read().strip().split('\n')
    #train_labels = [int(label) for label in open('/home/libedev/mute/mute-hero/download/dataset/new_train_labels.txt').read().strip().split('\n')]     

    train_labels = np.genfromtxt('/home/libedev/mute/mute-hero/download/dataset/new_train_labels.txt', encoding='ascii',dtype=int)

    test_samples = open('/home/libedev/mute/mute-hero/download/dataset/new_test_samples.txt').read().strip().split('\n')
    #test_labels = [int(label) for label in open('/home/libedev/mute/mute-hero/download/dataset/new_test_labels.txt').read().strip().split('\n')]

    test_labels = np.genfromtxt('/home/libedev/mute/mute-hero/download/dataset/new_test_labels.txt', encoding='ascii',dtype=int)
      
    if part == 'new_train':
        
        samples = train_samples
        labels = train_labels   
        
    elif part == 'new_test':
        
        samples = test_samples
        labels = test_labels
        
    else :
        
        print('Please use train, valid, or test for the part name')

    i = random.randrange(len(samples))
    spectrum = np.load('/home/libedev/mute/mute-hero/download/dataset/'
                           +str(part)+'/'+str(option)+'/'+samples[i]+'.npy')
        
        
    return spectrum, int(labels[i])        

    
    
def get_random_batch(part,option):
    
    
    global train_samples, train_labels, test_samples, test_labels
    
    #option = 'spectrum_Stft'

    if part == 'new_train':
        
        data_amount = len(train_samples)
        example_data = np.load('/home/libedev/mute/mute-hero/download/dataset/'
                               +str(part)+'/'+str(option)+'/'+train_samples[0]+'.npy')
    else :
        
        data_amount = len(test_samples)
        example_data = np.load('/home/libedev/mute/mute-hero/download/dataset/'
                               +str(part)+'/'+str(option)+'/'+test_samples[0]+'.npy')
    
    
    X = np.zeros((data_amount, example_data.shape[0], example_data.shape[1], 1))
    Y = np.zeros((data_amount,))
    
    for i in range(data_amount):
        
        s,l = get_random_sample(part,option)
        
        X[i, :, :, 0] = s[:example_data.shape[0], :example_data.shape[1]]
        Y[i] =  l
        #Y = Y.astype('int16')
        f_labels = np.save('/home/libedev/mute/mute-hero/download/dataset/'+str(option)+'.npy',Y)
        new_Y = np.load('/home/libedev/mute/mute-hero/download/dataset/'+str(option)+'.npy')
        
    return X,new_Y




In [150]:
prepare_data()
a,b=get_random_batch(part='new_train',option='Mel_S_pca')
b=np.array(b,np.int16)
train_label = torch.as_tensor(b)
print(train_label)
print(train_label.dtype)


tensor([1, 1, 0, 3, 3, 6, 2, 7, 6, 7, 3, 6, 0, 4, 3, 7, 0, 5, 1, 4, 7, 7, 6, 3,
        0, 4, 6, 5, 7, 7, 1, 4, 4, 2, 4, 4, 4, 7, 2, 0, 7, 6, 6, 2, 2, 0, 0, 4,
        3, 7, 6, 5, 5, 4, 1, 3, 2, 4, 6, 6, 6, 1, 4, 1, 7, 6, 0, 5, 7, 4, 1, 7,
        6, 6, 1, 1, 1, 5, 0, 5, 2, 1, 0, 1, 4, 4, 2, 3, 5, 3, 3, 5, 7, 1, 2, 3,
        5, 6, 5, 1, 2, 4, 6, 0, 7, 4, 7, 7, 6, 7, 6, 6, 7, 7, 0, 5, 6, 0, 0, 3,
        7, 0, 2, 3, 4, 7, 2, 6, 1, 2, 7, 5, 0, 5, 6, 7, 3, 2, 6, 4, 1, 7, 7, 2,
        0, 4, 1, 4, 7, 7, 3, 4, 4, 1, 7, 2, 4, 7, 1, 1, 6, 7, 1, 1, 1, 2, 1, 1,
        1, 3, 7, 2, 0, 4, 4, 2, 7, 0, 4, 1, 1, 0, 4, 7, 2, 4, 1, 4, 5, 2, 4, 6,
        0, 1, 6, 6, 0, 4, 0, 7, 6, 5, 4, 1, 6, 5, 0, 4, 7, 1, 6, 0, 7, 2, 7, 5,
        4, 4, 5, 5, 2, 4, 2, 7, 2, 7, 6, 3, 4, 0, 7, 6, 5, 6, 5, 6, 0, 7, 0, 6,
        1, 3, 2, 7, 7, 2, 1, 0, 5, 6, 2, 4, 2, 1, 2, 0, 6, 1, 5, 2, 0, 1, 4, 1,
        3, 6, 2, 7, 7, 2, 2, 4, 5, 5, 4, 6, 1, 6, 0, 3, 1, 0, 2, 6, 2, 5, 7, 7,
        3, 2, 3, 5, 1, 7, 3, 4, 7, 5, 6,

In [129]:
data=np.load('/home/libedev/mute/mute-hero/download/dataset/new_train_label.npy')

In [147]:
print(data)
data=torch.as_tensor(np.array(data))
print(data)
data[0].dtype

tensor([7, 7, 4, 4, 0, 6, 7, 5, 1, 2, 7, 6, 6, 2, 5, 3, 0, 7, 7, 4, 5, 3, 3, 1,
        0, 1, 3, 7, 3, 5, 6, 0, 3, 0, 1, 3, 0, 7, 0, 2, 2, 0, 3, 4, 4, 4, 4, 7,
        3, 4, 2, 7, 0, 7, 0, 0, 5, 2, 3, 5, 7, 5, 1, 7, 3, 6, 4, 5, 2, 0, 4, 3,
        1, 4, 6, 1, 7, 1, 2, 2, 2, 1, 3, 7, 7, 6, 0, 7, 0, 3, 6, 3, 1, 0, 5, 4,
        4, 3, 5, 3, 1, 7, 5, 2, 0, 6, 0, 7, 4, 4, 1, 1, 6, 3, 0, 1, 1, 3, 5, 7,
        7, 5, 0, 5, 7, 2, 6, 4, 3, 4, 3, 1, 7, 2, 4, 4, 7, 1, 0, 4, 5, 7, 3, 4,
        0, 1, 7, 3, 0, 7, 6, 4, 3, 6, 2, 2, 2, 6, 2, 0, 7, 7, 6, 6, 1, 5, 4, 5,
        5, 4, 3, 4, 4, 4, 6, 7, 5, 3, 3, 6, 5, 6, 2, 1, 7, 3, 2, 0, 0, 1, 7, 0,
        0, 0, 6, 6, 1, 6, 1, 2, 1, 6, 5, 7, 2, 5, 3, 3, 3, 1, 4, 0, 2, 3, 7, 7,
        5, 3, 1, 7, 1, 0, 1, 7, 6, 0, 4, 2, 2, 0, 5, 0, 7, 1, 4, 6, 7, 3, 5, 6,
        0, 0, 4, 5, 4, 3, 2, 6, 0, 2, 5, 3, 4, 4, 2, 2, 3, 5, 2, 6, 1, 4, 1, 6,
        3, 7, 2, 5, 2, 6, 7, 5, 6, 4, 2, 5, 6, 2, 2, 3, 5, 2, 4, 2, 5, 6, 4, 0,
        5, 1, 2, 3, 5, 4, 5, 0, 1, 7, 1,

torch.int64