# **Afan Oromoo Text Generation Using RNN LSTM**

# Segni Dessalegn

# UGR/8961/12

Afaan Oromoo text generation implemented using Pytorch

# import libraries

In [4]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory

import os
print(os.listdir("./"))

# Any results you write to the current directory are saved as output.


['assignment_6.ipynb', 'rnn.net', 'data.txt']


In [5]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from torch.autograd import Variable


In [6]:
!ls -al ./


total 17360
drwxrwxr-x 2 segni segni     4096 Feb  3 14:33 .
drwxrwxr-x 4 segni segni     4096 Feb  3 14:26 ..
-rw-rw-r-- 1 segni segni    39681 Feb  3 14:59 assignment_6.ipynb
-rw-rw-r-- 1 segni segni  3866497 Feb  1 15:57 data.txt
-rw-rw-r-- 1 segni segni 13857793 Feb  3 14:08 rnn.net


# Load dataset

In [8]:
text = (open("data.txt").read())


In [9]:
text[:100]


"Dhaabbanni keenya Waldaan Aksiyoona Faanaa Broodkaastiing Koorporeet bara 1987 yammuu hundaa'u\nQajee"

In [10]:
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}
encoded = np.array([char2int[ch] for ch in text])


# One hot encoder

In [11]:
def one_hot_encode(arr, n_labels):

    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)

    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.

    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))

    return one_hot


# Get batches of data

In [12]:
def get_batches(arr, n_seqs, n_steps):
    '''Create a generator that returns mini-batches of size
       n_seqs x n_steps from arr.
    '''

    batch_size = n_seqs * n_steps
    n_batches = len(arr)//batch_size

    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size]
    # Reshape into n_seqs rows
    arr = arr.reshape((n_seqs, -1))

    for n in range(0, arr.shape[1], n_steps):
        # The features
        x = arr[:, n:n+n_steps]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+n_steps]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y


# Implement RNN class

In [13]:
class CharRNN(nn.Module):
    def __init__(self, tokens, n_steps=100, n_hidden=256, n_layers=2,
                               drop_prob=0.6, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr

        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}

        self.dropout = nn.Dropout(drop_prob)
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers,
                            dropout=drop_prob, batch_first=True)
        self.fc = nn.Linear(n_hidden, len(self.chars))

        self.init_weights()

    def forward(self, x, hc):
        ''' Forward pass through the network '''

        x, (h, c) = self.lstm(x, hc)
        x = self.dropout(x)

        # Stack up LSTM outputs
        x = x.view(x.size()[0]*x.size()[1], self.n_hidden)

        x = self.fc(x)

        return x, (h, c)

    def predict(self, char, h=None, cuda=False, top_k=None):
        ''' Given a character, predict the next character.

            Returns the predicted character and the hidden state.
        '''
        if cuda:
            self.cuda()
        else:
            self.cpu()

        if h is None:
            h = self.init_hidden(1)

        x = np.array([[self.char2int[char]]])
        x = one_hot_encode(x, len(self.chars))
        inputs = Variable(torch.from_numpy(x), volatile=True)
        if cuda:
            inputs = inputs.cuda()

        h = tuple([Variable(each.data, volatile=True) for each in h])
        out, h = self.forward(inputs, h)

        p = F.softmax(out).data
        if cuda:
            p = p.cpu()

        if top_k is None:
            top_ch = np.arange(len(self.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()

        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())

        return self.int2char[char], h

    def init_weights(self):
        ''' Initialize weights for fully connected layer '''
        initrange = 0.1

        # Set bias tensor to all zeros
        self.fc.bias.data.fill_(0)
        # FC weights as random uniform
        self.fc.weight.data.uniform_(-1, 1)

    def init_hidden(self, n_seqs):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x n_seqs x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        return (Variable(weight.new(self.n_layers, n_seqs, self.n_hidden).zero_()),
                Variable(weight.new(self.n_layers, n_seqs, self.n_hidden).zero_()))


# A function to train the model

Arguments
- net: CharRNN network
- data: text data to train the network
- epochs: Number of epochs to train
- n_seqs: Number of mini-sequences per mini-batch, aka batch size
- n_steps: Number of character steps per mini-batch
- lr: learning rate
- clip: gradient clipping
- val_frac: Fraction of data to hold out for validation
- cuda: Train with CUDA on a GPU
- print_every: Number of steps for printing training and validation loss

In [15]:
def train(net, data, epochs=10, n_seqs=10, n_steps=50, lr=0.001, clip=5, val_frac=0.1, cuda=False, print_every=10):
    net.train()
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]

    if cuda:
        net.cuda()

    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        h = net.init_hidden(n_seqs)
        for x, y in get_batches(data, n_seqs, n_steps):
            counter += 1

            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            x, y = torch.from_numpy(x), torch.from_numpy(y)

            inputs, targets = Variable(x), Variable(y)
            if cuda:
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([Variable(each.data) for each in h])

            net.zero_grad()

            output, h = net.forward(inputs, h)
            loss = criterion(output, targets.view(n_seqs*n_steps))

            loss.backward()

            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm(net.parameters(), clip)

            opt.step()

            if counter % print_every == 0:

                # Get validation loss
                val_h = net.init_hidden(n_seqs)
                val_losses = []
                for x, y in get_batches(val_data, n_seqs, n_steps):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)

                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([Variable(each.data, volatile=True) for each in val_h])

                    inputs, targets = Variable(x, volatile=True), Variable(y, volatile=True)
                    if cuda:
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net.forward(inputs, val_h)
                    val_loss = criterion(output, targets.view(n_seqs*n_steps))

                    val_losses.append(val_loss.data[0])

                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.data[0]),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))


In [16]:
if 'net' in locals():
    del net


In [17]:
net = CharRNN(chars, n_hidden=512, n_layers=2)


# Start the training process

In [None]:
n_seqs, n_steps = 128, 100
train(net, encoded, epochs=10, n_seqs=n_seqs, n_steps=n_steps, lr=0.001, cuda=True, print_every=10)




Epoch: 1/10... Step: 10... Loss: 3.3574... Val Loss: 3.3108
Epoch: 1/10... Step: 20... Loss: 3.2052... Val Loss: 3.1832
Epoch: 1/10... Step: 30... Loss: 3.0958... Val Loss: 3.0593
Epoch: 1/10... Step: 40... Loss: 2.9181... Val Loss: 2.9155
Epoch: 1/10... Step: 50... Loss: 2.7964... Val Loss: 2.7698
Epoch: 1/10... Step: 60... Loss: 2.6432... Val Loss: 2.6392
Epoch: 1/10... Step: 70... Loss: 2.5528... Val Loss: 2.5388
Epoch: 1/10... Step: 80... Loss: 2.5145... Val Loss: 2.4706
Epoch: 1/10... Step: 90... Loss: 2.4507... Val Loss: 2.4283
Epoch: 1/10... Step: 100... Loss: 2.3650... Val Loss: 2.3909
Epoch: 1/10... Step: 110... Loss: 2.4027... Val Loss: 2.3572
Epoch: 1/10... Step: 120... Loss: 2.3391... Val Loss: 2.3339
Epoch: 1/10... Step: 130... Loss: 2.3419... Val Loss: 2.3089
Epoch: 1/10... Step: 140... Loss: 2.2672... Val Loss: 2.2827
Epoch: 1/10... Step: 150... Loss: 2.2627... Val Loss: 2.2641
Epoch: 1/10... Step: 160... Loss: 2.2437... Val Loss: 2.2469
Epoch: 1/10... Step: 170... Loss:

# Save the model parameters

In [21]:
checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}
with open('rnn.net', 'wb') as f:
    torch.save(checkpoint, f)


# Load the saved model

In [20]:
def load_model(model_path, model_class=CharRNN):
    model_dict = torch.load(model_path, map_location=torch.device('cpu'))

    state_dict = model_dict['state_dict']

    model = model_class(tokens=model_dict['tokens'],
                        n_hidden=model_dict['n_hidden'],
                        n_layers=model_dict['n_layers'])

    model.load_state_dict(state_dict)

    return model


In [21]:
net = load_model("rnn.net")


In [22]:
def sample(net, size, prime='Ani', top_k=None, cuda=False):

    if cuda:
        net.cuda()
    else:
        net.cpu()

    net.eval()

    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = net.predict(ch, h, cuda=cuda, top_k=top_k)

    chars.append(char)


    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = net.predict(chars[-1], h, cuda=cuda, top_k=top_k)
        chars.append(char)

    return ''.join(chars)


# Lets see how it works

In [23]:
print(sample(net, 500, prime='Haala', top_k=5, cuda=False))


  inputs = Variable(torch.from_numpy(x), volatile=True)
  h = tuple([Variable(each.data, volatile=True) for each in h])
  p = F.softmax(out).data


Haala akka harka keenyaa beeksiseera.
Ati hin jedhama nannicha bulchina isinuu dhiiste
Ani abbaa isaa argatee jecha taee aramaa akkamaa keessa jirtin balleessiisan kaan duula haadha isaa akka isaa irratti gabbisan
Kanaan akkumi keenyaa bara beekamuutii isaani arraba jiru kaayya hundi kanumaa kan hidhaman kee irraa bulchu kijibaa
akka isiin goota koomoo inni baatee biyya keenyaa keessaa dhalachuf harreen kana hiidaa keessa jedhe  Akka keetin garee kijibaa kuni biyya balleessa kana hoo argame miti ammoo
