In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [2]:
file_path = '/content/drive/Shareddrives/LovePoem/lovepoem_baseline.txt'
with open(file_path, 'r', encoding='utf-8') as file:
    poems = file.read()

In [3]:
print(poems[:200])

A million stars up in the sky
One shines brighter   I can't deny
A love so precious a love so true
a love that comes from me to you
The angels sing when you are near
Within your arms I have nothing to


In [4]:
chars = tuple(set(poems))
int2char = dict(enumerate(chars))
char2int = {ch:ii for ii, ch in int2char.items()}
encoded = np.array([char2int[i] for i in poems])

In [5]:
def one_hot_encode(arr, n_labels):

    # Initialize the the encoded array
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)

    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.

    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))

    return one_hot

In [6]:
test_seq = np.array([[0, 5, 1]])
one_hot = one_hot_encode(test_seq, 8)
print(one_hot)

[[[1. 0. 0. 0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 1. 0. 0.]
  [0. 1. 0. 0. 0. 0. 0. 0.]]]


In [7]:
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.

       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''

    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total

    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))

    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [23]:
train_on_gpu = torch.cuda.is_available()
class CharRNN(nn.Module):

    def __init__(self, tokens, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr

        # creating character dictionaries
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}

        #lstm layer
        self.lstm=nn.LSTM(len(self.chars),n_hidden,n_layers,
                          dropout=drop_prob,batch_first=True)

        #dropout layer
        self.dropout=nn.Dropout(drop_prob)

        #output layer
        self.fc=nn.Linear(n_hidden,len(self.chars))

    def forward(self, x, hidden):
        ''' Forward pass through the network.
            These inputs are x, and the hidden/cell state `hidden`. '''
        ## Get the outputs and the new hidden state from the lstm
        r_output, hidden = self.lstm(x, hidden)

        ## pass through a dropout layer
        out = self.dropout(r_output)

        # Stack up LSTM outputs using view
        # you may need to use contiguous to reshape the output
        out = out.contiguous().view(-1, self.n_hidden)

        ## put x through the fully-connected layer
        out = self.fc(out)
        return out, hidden


    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data

        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())

        return hidden

In [21]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    ''' Training a network

        Arguments
        ---------

        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss

    '''
    net.train()

    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]

    if(train_on_gpu):
        net.cuda()

    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)

        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1

            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)

            if(train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()
            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])
            # zero accumulated gradients
            net.zero_grad()

            # get the output from the model
            output, h = net(inputs, h)

            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()

            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)

                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])

                    inputs, targets = x, y
                    if(train_on_gpu):
                        inputs, targets = inputs.cuda(), targets.cuda()
                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length).long())

                    val_losses.append(val_loss.item())

                net.train() # reset to train mode after iterationg through validation data

                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [29]:
# define and print the net
n_hidden = 256
n_layers = 2
net = CharRNN(chars, n_hidden, n_layers, drop_prob=0.5)
print(net)
batch_size = 25
seq_length = 50
n_epochs =  30
# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, val_frac=0.12, print_every=100)

CharRNN(
  (lstm): LSTM(72, 256, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=256, out_features=72, bias=True)
)
Epoch: 2/30... Step: 100... Loss: 3.0678... Val Loss: 3.0786
Epoch: 3/30... Step: 200... Loss: 2.6642... Val Loss: 2.6415
Epoch: 4/30... Step: 300... Loss: 2.4926... Val Loss: 2.3557
Epoch: 5/30... Step: 400... Loss: 2.2000... Val Loss: 2.2270
Epoch: 7/30... Step: 500... Loss: 2.2513... Val Loss: 2.1563
Epoch: 8/30... Step: 600... Loss: 2.1307... Val Loss: 2.0932
Epoch: 9/30... Step: 700... Loss: 2.1090... Val Loss: 2.0596
Epoch: 10/30... Step: 800... Loss: 1.9575... Val Loss: 2.0038
Epoch: 12/30... Step: 900... Loss: 2.0175... Val Loss: 1.9902
Epoch: 13/30... Step: 1000... Loss: 1.9700... Val Loss: 1.9528
Epoch: 14/30... Step: 1100... Loss: 1.9989... Val Loss: 1.9285
Epoch: 15/30... Step: 1200... Loss: 1.8185... Val Loss: 1.9095
Epoch: 17/30... Step: 1300... Loss: 1.9247... Val Loss: 1.8976
Epoch: 18/30..

In [30]:
torch.save(net, '/content/drive/Shareddrives/LovePoem/baselineModel')

In [31]:
def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''

        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)

        if(train_on_gpu):
            inputs = inputs.cuda()

        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)
        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu

        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()

        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())

        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h
def sample(net, size, prime='The', top_k=None):

    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()

    net.eval() # eval mode

    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)
    chars.append(char)

    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)
    return ''.join(chars)

In [32]:
print(sample(net, 5000, prime='Love', top_k = 2))

Love to me

In an my harl that stars of my lone 
I love your say 
I said the sky of me the bed 
I love thee word a song to me
And the was and shall that have the bed

In a song the brows and some there
I love to me a dow to than shall me to me 
And to to more to the soft to more

The shade they beat to the wild will shall breath and to me

The stars a seaden the beet of the song
A love a seading and star a dow

The songe the beat the way of stall that
When the stillen to the song and to be

I love thee to the song that
I love the softed to me 

I love thee to me to me touck to me

In the soft and there the brows and song
And shadow and she with me

I live the seeting there the stars
I love it the houth to the song

The sky to to she with the bed that some
The shalow the song the wild with me and the brows
And shall still be and that some and stalled to this beaut
And the stars a song the soft
I shall the seath and the soul to may

I shall be to she the soully betoren ther
And that the 