In [102]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
import numpy as np
import pandas as pd
import ast
import math

In [103]:
class RNN(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, num_layers):
        super(RNN, self).__init__()
        self.embedding = nn.Embedding(input_size, input_size)
        self.rnn = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.decoder = nn.Linear(hidden_size, output_size)
    
    def forward(self, input_seq, hidden_state):
        embedding = self.embedding(input_seq)
        output, hidden_state = self.rnn(embedding, hidden_state)
        output = self.decoder(output)
        return output, (hidden_state[0].detach(), hidden_state[1].detach())
    

In [104]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [105]:

hidden_size = 512   # size of hidden state
seq_len = 100       # length of LSTM sequence
num_layers = 3      # num of layers in LSTM layer stack
lr = 0.002          # learning rate
epochs = 3          # max number of epochs
op_seq_len = 9    # total num of characters in output test sequence


In [114]:
def int_to_binary(num, width):
    if num >= 0:
        binary = bin(num)[2:].zfill(width)
        return "0" + binary 
    else:
        binary = bin(abs(num))[2:].zfill(width)
        return "1" + binary

int_to_binary(200, 16)


'00000000011001000'

In [115]:
def binary_to_int(str, width):
    sign = str[0]
    num = str[1:]
    num = int(num, 2)

    if sign == "0":
        return num
    else:
        return num * - 1

binary_to_int('00000000011001000', 16)

200

In [122]:


# load the text file

data = pd.read_csv('data/preprocessed-3-rnn.csv')
data = data[data.columns[0]].apply(ast.literal_eval)
data = data.apply(lambda seq: [int_to_binary(int(num), 16) for num in seq])
# data = data[data.columns[0]].str.split(',').to_numpy()
# data = [[int(num) for num in row] for row in data]

# data = data[data.columns[0]]
# data = [[int(num) for num in row] for row in data]
# # print(type(data[data.columns[0]].str.cat(sep = '')))
# chars = sorted(list(set(data)))
# # print(chars)
# data_size, vocab_size = len(data), len(chars)
# print("----------------------------------------")
# print("Data has {} characters, {} unique".format(data_size, vocab_size))
# print("----------------------------------------")
data.to_numpy()

array([list(['00000000000000001', '00000000000000010', '00000000000000001', '00000000000000101', '00000000000000101', '00000000000000001', '00000000000001011', '00000000000010000', '00000000000000111', '00000000000000001', '00000000000010111', '00000000000101100', '00000000000011110', '00000000000001001', '00000000000000001', '00000000000101111', '00000000001110000', '00000000001101000', '00000000000110000', '00000000000001011', '00000000000000001', '00000000001011111', '00000000100010000', '00000000101000000', '00000000011001000', '00000000001000110', '00000000000001101', '00000000000000001', '00000000010111111', '00000001010000000', '00000001110010000', '00000001011010000', '00000000101010100', '00000000001100000', '00000000000001111', '00000000000000001', '00000000101111111', '00000010111000000', '00000100110100000', '00000100100110000', '00000010101111000', '00000001000010100', '00000000001111110', '00000000000010001', '00000000000000001', '00000001011111111', '00000110100000000', 

In [146]:
all_bin_symbols = '01'
bin_n_symbols = len(all_bin_symbols)

def binToIndex(letter):
    return all_bin_symbols.find(letter)

# Just for demonstration, turn a letter into a <1 x n_letters> Tensor
def binToTensor(letter):
    tensor = torch.zeros(1, len(letter))
    for i, ch in enumerate(letter):
        tensor[:,i][binToIndex(letter)] = int(ch)
    return tensor

In [151]:
n = '00000000000000101'
binToTensor(n).size()


17

In [141]:
 torch.zeros(1, 10)[:,2]

tensor([0.])

In [125]:
# data tensor on device
data = torch.tensor(data).to(device)
data = torch.unsqueeze(data, dim=1)

# model instance
rnn = RNN(vocab_size, vocab_size, hidden_size, num_layers).to(device)

# loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(rnn.parameters(), lr=lr)


ValueError: too many dimensions 'str'

In [62]:
# training loop
for i_epoch in range(1, epochs+1):
    
    # random starting point (1st 100 chars) from data to begin
    data_ptr = np.random.randint(100)
    n = 0
    running_loss = 0
    hidden_state = None
    
    while True:
        print(data_ptr)
        input_seq = data[data_ptr : data_ptr+seq_len]
        target_seq = data[data_ptr+1 : data_ptr+seq_len+1]
        
        # forward pass
        output, hidden_state = rnn(input_seq, hidden_state)
        
        # compute loss
        loss = loss_fn(torch.squeeze(output), torch.squeeze(target_seq))
        running_loss += loss.item()
        
        # compute gradients and take optimizer step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # update the data pointer
        data_ptr += seq_len
        n +=1
        
        # if at end of data : break
        if data_ptr + seq_len + 1 > data_size:
            break
        
    # print loss and save weights after every epoch
    print("Epoch: {0} \t Loss: {1:.8f}".format(i_epoch, running_loss/n))
    
    # sample / generate a text sequence after every epoch
    data_ptr = 0
    hidden_state = None
    
    # random character from data to begin
    rand_index = np.random.randint(data_size-1)
    input_seq = data[rand_index : rand_index+1]
    
    print("----------------------------------------")
    while True:
        # forward pass
        output, hidden_state = rnn(input_seq, hidden_state)
        
        # construct categorical distribution and sample a character
        output = F.softmax(torch.squeeze(output), dim=0)
        dist = Categorical(output)
        index = dist.sample()
        
        # print the sampled character
        print(ix_to_char[index.item()], end='')
        
        # next input is current output
        input_seq[0][0] = index.item()
        data_ptr += 1
        
        if data_ptr > op_seq_len:
            break
        
    print("\n----------------------------------------")

66


TypeError: embedding(): argument 'indices' (position 2) must be Tensor, not list