<a href="https://colab.research.google.com/github/hdilab/hpm/blob/master/Char-LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import os
Colab = False
NumOnBits = 10
NumBits = 512
Seed = 42

In [2]:
torch.cuda.device(2)
torch.cuda.current_device()

0

In [3]:
torch.cuda.set_device(2)

In [4]:
torch.cuda.current_device()

2

In [6]:
if Colab:
    from google.colab import drive
    drive.mount('/content/drive')
    with open('/content/drive/My Drive/Colab/data/short.txt','r') as f:
        text = f.read()
else:
    with open('data/medium.txt','r') as f:
        text = f.read()

In [7]:
text[:100]

'The Project Gutenberg EBook of Pride and Prejudice, by Jane Austen\n\nThis eBook is for the use of any'

In [8]:
asc_chars = [chr(i) for i in range(128)]
chars = tuple(asc_chars)
int2char = dict(enumerate(chars))
char2int = {c:i for i, c in int2char.items()}

encoded = np.array([char2int[ch] for ch in text])
encoded[:100]

array([ 84, 104, 101,  32,  80, 114, 111, 106, 101,  99, 116,  32,  71,
       117, 116, 101, 110,  98, 101, 114, 103,  32,  69,  66, 111, 111,
       107,  32, 111, 102,  32,  80, 114, 105, 100, 101,  32,  97, 110,
       100,  32,  80, 114, 101, 106, 117, 100, 105,  99, 101,  44,  32,
        98, 121,  32,  74,  97, 110, 101,  32,  65, 117, 115, 116, 101,
       110,  10,  10,  84, 104, 105, 115,  32, 101,  66, 111, 111, 107,
        32, 105, 115,  32, 102, 111, 114,  32, 116, 104, 101,  32, 117,
       115, 101,  32, 111, 102,  32,  97, 110, 121])

In [9]:
"""
SDR class
Handles issues with SDR
Given a char input, generate SDR
"""

import random


class SDR(object):
    """
      Class implementing the SDR.

      :param input_list: (List) List for input_values.
            For ASCII it will be [chr(0), chr(1), ... chr(127)]

      :param numBits: (int) Number of bits for SDR. Default value ``512``

      :param numOnBits: (int) Number of Active bits for SDR. Default value ``10``.
            It is 2% sparcity for 512 bit

      :param seed: (int) Seed for the random number generator. Default value ``42``.
    """

    def __init__(self,
                 input_list,
                 numBits=512,
                 numOnBits=10,
                 seed=42,
                 inputNoise=0.1):

        random.seed(seed)
        self.population = [i for i in range(numBits)]
        self.numOnBits = numOnBits
        self.inputNoise = inputNoise
        self.sdr_dict = {i:random.sample(self.population, numOnBits) for i in input_list}


    def getSDR(self, input):
        return self.sdr_dict[input]


    def getNoisySDR(self, input):
        inputSDR = self.sdr_dict[input]
        inputSDR = [i for i in inputSDR if random.random() > self.inputNoise]
        noise = random.sample(self.population, int(self.numOnBits * self.inputNoise))
        return inputSDR + noise



    def getInput(self, sdr):
        """
        Need to implement the function which returns the corresponding input from SDR
        This requires a probabilistic approach. Count the number of overlapping bit and nonoverlapping field.
        """
        return 0

    def getCollisionProb(self, n, a, s, theta):
        """
        Calculating the probability for the cases where more than theta synapses are activated
        for different cell activation pattern
        :param n: Number of cells
        :param a: Number of active cells
        :param s: Number of synapses
        :param theta: Threshold for the dendritic activation
        :return: The probability where dendritic activation for the different cell activation pattern
        """
        numerator = 0
        for b in range(theta, s+1):
            numerator += combinatorial(s, b) * combinatorial(n-s, a-b)

        denominator = combinatorial(n, a)

        return numerator*1.0/denominator

    def getRandomSDR(self):
        noise = random.sample(self.population, numOnBits)
        return noise


def combinatorial(a,b):
    return factorial(a)*1.0/factorial(a-b)/factorial(a)

def factorial(a):
    if a == 1:
        return 1
    else:
        return a*factorial(a-1)



In [10]:
char_sdr = SDR(asc_chars,
                numBits=NumBits,
                numOnBits=NumOnBits,
                seed=Seed,
                inputNoise=0.1)

In [11]:
def one_hot_encoder(arr, n_labels):
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1. 
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    return one_hot

def multi_hot_encoder(arr, n_labels):
    multi_hot = np.zeros((arr.shape[0], arr.shape[1], n_labels), dtype=np.float32)
    for i in range(arr.shape[0]):
        for j in range(arr.shape[1]):
            sdr = char_sdr.getNoisySDR(int2char[arr[i][j]])
            multi_hot[i][j][np.array(sdr)] = 1  
    return multi_hot

In [12]:
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr
       
       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''
    
    batch_size_total = batch_size * seq_length
    n_batches = len(arr) // batch_size_total
    
    arr = arr[:n_batches * batch_size_total]
    arr = arr.reshape((batch_size, -1))
    
    for n in range(0, arr.shape[1], seq_length):
        x = arr[:, n:n+seq_length]
        y = np.zeros_like(x) 
        try:
            y[:, :-1], y[:, -1] = x[:,1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:,1:], arr[:,0] 
        yield x, y 
        

In [13]:
batches = get_batches(encoded, 1, 3)
x, y = next(batches)

In [14]:
# check if GPU is available
train_on_gpu = torch.cuda.is_available()
if(train_on_gpu):
    print('Training on GPU!')
else: 
    print('No GPU available, training on CPU; consider making n_epochs very small.')

Training on GPU!


In [15]:
def accuracySDR(output, target):
    output, target = output.cpu(), target.cpu()
    _ , outputIndex = output.topk(NumOnBits, dim=1)
    _ , targetIndex = target.topk(NumOnBits, dim=1)
    accuracy = np.zeros((outputIndex.shape[0]))
    
    for j in range(outputIndex.shape[0]):
        intersection = [i for i in outputIndex[j] if i in targetIndex[j]]
        accuracy[j] = len(intersection)*1.0/NumOnBits
        
    result = np.mean(accuracy)
    return result

In [16]:
class CharRNN(nn.Module):
    def __init__(self, tokens, n_hidden=612, n_layers=4, drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch:ii for ii, ch in self.int2char.items()}
        
        self.lstm = nn.LSTM(NumBits, n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc = nn.Linear(n_hidden, NumBits)
        
    def forward(self, x, hidden):
        r_output, hidden = self.lstm(x,hidden)
        
        out = self.dropout(r_output)
        
        out = out.contiguous().view(-1, self.n_hidden)
        
        out = self.fc(out)
        
        return out, hidden
    
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        
        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden

In [17]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    ''' Training a network 
    
        Arguments
        ---------
        
        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss
    
    '''
    net.train()
    
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.BCEWithLogitsLoss()
    
    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if(train_on_gpu):
        net.cuda()
    
    counter = 0
    n_chars = NumBits
    for e in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # One-hot encode our data and make them Torch tensors
            x = multi_hot_encoder(x, n_chars)
            y = multi_hot_encoder(y, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            if(train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            net.zero_grad()
            
            # get the output from the model
            output, h = net(inputs, h)
            
            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(batch_size*seq_length, NumBits))
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            #SDR loss
            accuracy = accuracySDR(output, targets.view(batch_size*seq_length, NumBits))
            train.accuracy = 0.999*train.accuracy + 0.001*accuracy
            
            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                accuacy = accuracySDR(output, targets.view(batch_size*seq_length, NumBits))
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = multi_hot_encoder(x, n_chars)
                    y = multi_hot_encoder(y, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    if(train_on_gpu):
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length, NumBits))
                
                    val_losses.append(val_loss.item())
                
                net.train() # reset to train mode after iterationg through validation data
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)),
                      "SDR Acc: {:.3f}".format(train.accuracy))
                

In [18]:
# define and print the net
n_hidden=1024
n_layers=4

net = CharRNN(chars, n_hidden, n_layers)
print(net)

CharRNN(
  (lstm): LSTM(512, 1024, num_layers=4, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=1024, out_features=512, bias=True)
)


In [20]:
batch_size = 4
seq_length = 10 #max length verses
n_epochs = 3000 # start smaller if you are just testing initial behavior

# train the model
train.accuracy = 0 
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.0001, print_every=500)

Epoch: 12/3000... Step: 500... Loss: 0.0798... Val Loss: 0.1250 SDR Acc: 0.084
Epoch: 24/3000... Step: 1000... Loss: 0.0829... Val Loss: 0.1415 SDR Acc: 0.135
Epoch: 35/3000... Step: 1500... Loss: 0.0784... Val Loss: 0.1282 SDR Acc: 0.165
Epoch: 47/3000... Step: 2000... Loss: 0.0793... Val Loss: 0.1322 SDR Acc: 0.186
Epoch: 59/3000... Step: 2500... Loss: 0.0750... Val Loss: 0.1353 SDR Acc: 0.199
Epoch: 70/3000... Step: 3000... Loss: 0.0783... Val Loss: 0.1392 SDR Acc: 0.205
Epoch: 82/3000... Step: 3500... Loss: 0.0805... Val Loss: 0.1289 SDR Acc: 0.211
Epoch: 94/3000... Step: 4000... Loss: 0.0232... Val Loss: 0.1471 SDR Acc: 0.214
Epoch: 105/3000... Step: 4500... Loss: 0.0779... Val Loss: 0.1507 SDR Acc: 0.217
Epoch: 117/3000... Step: 5000... Loss: 0.0750... Val Loss: 0.1479 SDR Acc: 0.220
Epoch: 128/3000... Step: 5500... Loss: 0.0785... Val Loss: 0.1488 SDR Acc: 0.220
Epoch: 140/3000... Step: 6000... Loss: 0.0778... Val Loss: 0.1532 SDR Acc: 0.223
Epoch: 152/3000... Step: 6500... Loss

Epoch: 1187/3000... Step: 51000... Loss: 0.0231... Val Loss: 0.1617 SDR Acc: 0.405
Epoch: 1198/3000... Step: 51500... Loss: 0.0798... Val Loss: 0.1600 SDR Acc: 0.408
Epoch: 1210/3000... Step: 52000... Loss: 0.0367... Val Loss: 0.1641 SDR Acc: 0.405
Epoch: 1221/3000... Step: 52500... Loss: 0.0812... Val Loss: 0.1605 SDR Acc: 0.410
Epoch: 1233/3000... Step: 53000... Loss: 0.0760... Val Loss: 0.1660 SDR Acc: 0.418
Epoch: 1245/3000... Step: 53500... Loss: 0.0229... Val Loss: 0.1611 SDR Acc: 0.420
Epoch: 1256/3000... Step: 54000... Loss: 0.0784... Val Loss: 0.1602 SDR Acc: 0.420
Epoch: 1268/3000... Step: 54500... Loss: 0.0474... Val Loss: 0.1523 SDR Acc: 0.433
Epoch: 1280/3000... Step: 55000... Loss: 0.0225... Val Loss: 0.1556 SDR Acc: 0.419
Epoch: 1291/3000... Step: 55500... Loss: 0.0754... Val Loss: 0.1631 SDR Acc: 0.426
Epoch: 1303/3000... Step: 56000... Loss: 0.0646... Val Loss: 0.1644 SDR Acc: 0.434
Epoch: 1314/3000... Step: 56500... Loss: 0.0810... Val Loss: 0.1598 SDR Acc: 0.431
Epoc

Epoch: 2338/3000... Step: 100500... Loss: 0.0248... Val Loss: 0.1558 SDR Acc: 0.662
Epoch: 2349/3000... Step: 101000... Loss: 0.0368... Val Loss: 0.1592 SDR Acc: 0.699
Epoch: 2361/3000... Step: 101500... Loss: 0.0418... Val Loss: 0.1596 SDR Acc: 0.723
Epoch: 2373/3000... Step: 102000... Loss: 0.0237... Val Loss: 0.1613 SDR Acc: 0.725
Epoch: 2384/3000... Step: 102500... Loss: 0.0358... Val Loss: 0.1664 SDR Acc: 0.745
Epoch: 2396/3000... Step: 103000... Loss: 0.0227... Val Loss: 0.1502 SDR Acc: 0.732
Epoch: 2407/3000... Step: 103500... Loss: 0.0380... Val Loss: 0.1615 SDR Acc: 0.746
Epoch: 2419/3000... Step: 104000... Loss: 0.0319... Val Loss: 0.1678 SDR Acc: 0.758
Epoch: 2431/3000... Step: 104500... Loss: 0.0238... Val Loss: 0.1557 SDR Acc: 0.759
Epoch: 2442/3000... Step: 105000... Loss: 0.0664... Val Loss: 0.1548 SDR Acc: 0.751
Epoch: 2454/3000... Step: 105500... Loss: 0.0251... Val Loss: 0.1678 SDR Acc: 0.743
Epoch: 2466/3000... Step: 106000... Loss: 0.0236... Val Loss: 0.1611 SDR Acc

In [None]:
batches = get_batches(encoded, 2, 16)
x, y = next(batches)

x = multi_hot_encoder(x, NumBits)
y = multi_hot_encoder(y, NumBits)

inputs, targets = torch.from_numpy(x), torch.from_numpy(y)

if(train_on_gpu):
    inputs, targets = inputs.cuda(), targets.cuda()
    
h = net.init_hidden(batch_size)

output, h = net(inputs, h)

In [None]:
# a = targets.view(batch_size*seq_length, NumBits)
# a = a.cpu()
# print(np.argwhere(a>0))
b = output.cpu()
values, indices = b.topk(NumOnBits, dim=1)
print(indices.shape)
print(indices)
print(np.argwhere(b>0))

In [None]:
model_dante = 'rnn_20_epoch.net'

checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}

with open(model_dante, 'wb') as f:
    torch.save(checkpoint, f)

In [None]:
def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encoder(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        if(train_on_gpu):
            inputs = inputs.cuda()
        
        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        # apply softmax to get p probabilities for the likely next character giving x
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
        
        # get top characters
        # considering the k most probable characters with topk method
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

In [None]:
def sample(net, size, prime='Il', top_k=None):
        
    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [None]:
print(sample(net, 1000, prime='This ', top_k=5))


In [None]:
y

In [None]:
x, y = next(batches)
print(x,y)