# Torch Poet

In [None]:
import numpy as np
import os
import json
import random
import torch
import torch.nn as nn
from torch.autograd import Variable

from IPython.core.display import display, HTML

## Text library

In [None]:
# TextLibrary class: text library for training, encoding, batch generation,
# and formatted source display
class TextLibrary:
    def __init__(self, filenames, max=100000000):
        self.filenames = filenames
        self.data=''
        self.files=[]
        index = 1
        for filename in filenames:
            fd={}
            fd["name"] = os.path.splitext(os.path.basename(filename))[0]
            self.c2i = {}
            self.i2c = {}
            try:
                f = open(filename)
                dat = f.read(max)
                self.data += dat
                fd["data"] = dat
                fd["index"] = index
                index += 1
                self.files.append(fd)
                f.close()
            except OSError:
                print("  ERROR: Cannot read: ", filename)
        ind = 0
        for c in self.data: # sets are not deterministic
            if c not in self.c2i:
                self.c2i[c] = ind
                self.i2c[ind] = c
                ind += 1
        self.ptr = 0
            
    def print_colored_IPython(self, textlist, pre='', post=''):
        bgcolors = ['#d4e6f1', '#d8daef', '#ebdef0', '#eadbd8', '#e2d7d5', '#edebd0',
                    '#ecf3cf', '#d4efdf', '#d0ece7', '#d6eaf8', '#d4e6f1', '#d6dbdf',
                    '#f6ddcc', '#fae5d3', '#fdebd0', '#e5e8e8', '#eaeded', '#A9CCE3']
        out = ''
        for txt, ind in textlist:
            txt = txt.replace('\n','<br>')
            if ind==0:
                out += txt
            else:
                out += "<span style=\"background-color:"+bgcolors[ind%16]+";\">" + txt +\
                       "</span>"+"<sup>[" + str(ind) + "]</sup>"
        display(HTML(pre+out+post))
        
    def source_highlight(self, txt, minQuoteSize=10):
        tx = txt
        out = []
        qts = []
        txsrc=[("Sources: ", 0)]
        sc=False
        noquote = ''
        while len(tx)>0:  # search all library files for quote 'txt'
            mxQ = 0
            mxI = 0
            mxN = ''
            found = False
            for f in self.files:  # find longest quote in all texts
                p = minQuoteSize
                if p<=len(tx) and tx[:p] in f["data"]:
                    p = minQuoteSize + 1
                    while p<=len(tx) and tx[:p] in f["data"]:
                        p += 1
                    if p-1>mxQ:
                        mxQ = p-1
                        mxI = f["index"]
                        mxN = f["name"]
                        found = True
            if found:  # save longest quote for colorizing
                if len(noquote)>0:
                    out.append((noquote, 0))
                    noquote = ''
                out.append((tx[:mxQ],mxI))
                tx = tx[mxQ:]
                if mxI not in qts:  # create a new reference, if first occurence
                    qts.append(mxI)
                    if sc:
                        txsrc.append((", ", 0))
                    sc = True
                    txsrc.append((mxN,mxI))
            else:
                noquote += tx[0]
                tx = tx[1:]
        if len(noquote)>0:
            out.append((noquote, 0))
            noquote = ''
        self.printColoredIPython(out)
        if len(qts)>0:  # print references, if there is at least one source
            self.printColoredIPython(txsrc, pre="<small><p style=\"text-align:right;\">",
                                     post="</p></small>")
    
    def get_slice(self, length):
        if (self.ptr + length >= len(self.data)):
            self.ptr = 0
        if self.ptr == 0:
            rewind = True
        else:
            rewind = False
        sl = self.data[self.ptr:self.ptr+length]
        self.ptr += length
        return sl, rewind
    
    def decode(self, ar):
         return ''.join([self.i2c[ic] for ic in ar])
            
    def get_random_slice(self, length):
        p = random.randrange(0,len(self.data)-length)
        sl = self.data[p:p+length]
        return sl
    
    def get_slice_array(self, length):
        ar = np.array([c for c in self.get_slice(length)[0]],dtype=int)
        return ar
        
    def get_sample(self, length):
        s, rewind = self.get_slice(length+1)
        X = np.array([self.c2i[c] for c in s[:-1]],dtype=int)
        y = np.array([self.c2i[c] for c in s[1:]],dtype=int)
        return (X, y, rewind)
    
    def get_random_sample(self, length):
        s = self.get_random_slice(length+1)
        X = np.array([self.c2i[c] for c in s[:-1]],dtype=int)
        y = np.array([self.c2i[c] for c in s[1:]],dtype=int)
        return (X, y)
    
    def get_sample_batch(self, batch_size, length):
        smpX = np.zeros((batch_size,length),dtype=int)
        smpy = np.zeros((batch_size,length),dtype=int)
        for i in range(batch_size):
            smpX[i,:], smpy[i,:], _ = self.get_sample(length)
        return smpX, smpy
        
    def get_random_sample_batch(self, batch_size, length):
        smpX = np.zeros((batch_size,length),dtype=int)
        smpy = np.zeros((batch_size,length),dtype=int)
        for i in range(batch_size):
            smpX[i,:], smpy[i,:] = self.get_random_sample(length)
        return smpX, smpy

## Model parameters and data sources

In [None]:
libdesc = {
    "name": "TinyShakespeare",
    "description": "Small Shakespeare 'standard' corpus",
    "lib": [
        'data/tiny-shakespeare.txt',
    ]
}

textlib = TextLibrary(libdesc["lib"])

# Model parameter:
model_params_shakespeare = {
    "model_name": "shakespeare",
    "vocab_size": len(textlib.i2c),
    "neurons": 256,
    "layers": 3,
    "learning_rate": 1.e-3,
    "steps": 80,
    "batch_size": 128
}

# Look for optional json description of a library:
if os.path.exists('bk/lib-phil-deen.json'):
    with open('bk/lib-phil-deen.json') as data_file:    
        libdescphil = json.load(data_file)
        textlib = TextLibrary(libdescphil["lib"])
        model_params_phil = {
            "model_name": "phil",
            "vocab_size": len(textlib.i2c),
            "neurons": 256,
            "layers": 8,
            "learning_rate": 1.e-3,
            "steps": 128,
            "batch_size": 80
        }
        model_params = model_params_phil
else:
    model_params = model_params_shakespeare

In [None]:
def one_hot(p, dim):
    o=np.zeros(p.shape+(dim,), dtype=int)
    for y in range(p.shape[0]):
        for x in range(p.shape[1]):
            o[y,x,p[y,x]]=1
    return o

In [None]:
batch_size = model_params['batch_size']
vocab_size = model_params['vocab_size']
steps = model_params['steps']

use_cuda = torch.cuda.is_available()

if not use_cuda:
    dtype = torch.FloatTensor
else:
    dtype = torch.cuda.FloatTensor

def get_data():
    X, y=textlib.get_random_sample_batch(batch_size, steps)
    Xo = one_hot(X, vocab_size)
    
    Xt = Variable(dtype(torch.from_numpy(np.array(Xo,dtype=np.float32)).type(dtype)), requires_grad=False)
    yt = Variable(torch.LongTensor(torch.from_numpy(y)), requires_grad=False)
    if use_cuda:
        yt = yt.cuda()
    return Xt, yt

## The char-rnn model (deep LSTMs)

In [None]:
class Poet(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(Poet, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        self.demb = nn.Linear(hidden_size, output_size)
        # self.softmax = nn.LogSoftmax()
        self.softmax = nn.Softmax()
    
    def init_hidden(self, batch_size):
        self.h0 = Variable(torch.zeros(self.num_layers, batch_size, self.hidden_size).type(dtype))
        self.c0 = Variable(torch.zeros(self.num_layers, batch_size, self.hidden_size).type(dtype))

    def forward(self, input, steps, h0=None, c0=None):
        if h0==None:
            h0=self.h0
        if c0==None:
            c0=self.c0
        hn, (self.h0, self.c0) = self.lstm(input, (h0, c0))
        hnr = hn.contiguous().view(-1,self.hidden_size)
        op = self.demb(hnr)
        opr = op.view(-1, steps ,self.output_size)
        return opr

    def generate(self, n, start=None):
        s=''
        if start==None or len(start)==0:
            start=' '
        self.init_hidden(1)
        for c in start:
            X=np.array([[textlib.c2i[c]]])
            Xo=one_hot(X,self.output_size)
            Xt = Variable(dtype(torch.from_numpy(np.array(Xo,dtype=np.float32)).type(dtype)), requires_grad=False)
            ypl = self.forward(Xt,1)
            ypl2 = ypl.view(-1,self.output_size)
            yp = self.softmax(ypl2)
            # _, m=torch.max(yp,1)
            # ic=m[0,0].data[0]
        for i in range(n):
            ypc=yp.data.cpu()
            y_pred=ypc.numpy()
            inds=list(range(self.output_size))
            ind = np.random.choice(inds, p=y_pred.ravel())
            s=s+textlib.i2c[ind]
            X=np.array([[ind]])
            Xo=one_hot(X,self.output_size)
            Xt = Variable(dtype(torch.from_numpy(np.array(Xo,dtype=np.float32)).type(dtype)), requires_grad=False)
            ypl = self.forward(Xt,1)
            ypl2 = ypl.view(-1,self.output_size)
            yp = self.softmax(ypl2)
        return s    

## Create a poet

In [None]:
poet = Poet(vocab_size, model_params['neurons'], model_params['layers'], vocab_size)
if use_cuda:
    poet = poet.cuda()

## Training helpers

In [None]:
criterion = nn.CrossEntropyLoss()
learning_rate = model_params['learning_rate']

opti = torch.optim.Adam(poet.parameters(),lr=learning_rate);

bok=0

def train(Xt, yt, bPr=False):
    poet.zero_grad()

    poet.init_hidden(Xt.size(0))
    output = poet(Xt, steps)
    
    olin=output.view(-1,vocab_size)
    _, ytp=torch.max(olin,1)
    ytlin=yt.view(-1)

    pr=0.0
    if bPr: # Calculate precision
        ok=0
        nok=0
        for i in range(ytlin.size()[0]):
            i1=ytlin[i].data[0]
            i2=ytp[i].data[0]
            if i1==i2:
                ok = ok + 1
            else:
                nok = nok+1
            pr=ok/(ok+nok)
            
    loss = criterion(olin, ytlin)
    ls = loss.data[0]
    loss.backward()
    opti.step()

    return ls, pr

## The actual training

In [None]:
ls=0
nrls=0
if use_cuda:
    intv=2500
else:
    intv=100
for e in range(200000):
    Xt, yt = get_data()
    if (e+1)%intv==0:
        l,pr=train(Xt,yt,True)
    else:
        l,pr=train(Xt,yt,False)        
    ls=ls+l
    nrls=nrls+1
    if (e+1)%intv==0:
        print("Loss:",ls/nrls," Precision:", pr)
        nrls=0
        ls=0
        print(poet.generate(200,"\n\n"))