# Dante

Following code is heavily inspired by these projects:
- https://github.com/mathematiguy/keras-char-rnn
- https://github.com/michaelrzhang/Char-RNN/
- https://github.com/mineshmathew/char_rnn_karpathy_keras
- https://github.com/ekzhang/char-rnn-keras
- https://github.com/michaelrzhang/Char-RNN
- https://github.com/yxtay/char-rnn-text-generation

In [None]:
%matplotlib inline

import numpy as np
import os
import sys
import random
import slabikar
from keras.models import Sequential, load_model
from keras.layers import Dense, Dropout, LSTM, TimeDistributed
from keras.callbacks import ModelCheckpoint, LambdaCallback
from keras.utils import np_utils, Sequence
from keras import backend as K
from keract import get_activations
from matplotlib import pyplot as plt

%matplotlib notebook

Hyperparameters are set here

In [None]:
maxlen = 120 #length of sequence
step = 13 #overlap
validation_split = 0.1
batch_size = 10
rnn_size = 10#128
num_layers = 3
drop_prob = 0.1
epochs = 6
temperature=1.0
sample_length = 1000

Function for concatenating all text files from directory. Text files are expected to be utf-8 encoded.

In [None]:
text_data = ''
for filename in filter(lambda s: s.endswith(".txt"), os.listdir('resources/')):
    print("loading file: %s" % filename)
    filepath = os.path.join('resources/', filename)
    with open(filepath,'r', encoding='utf-8') as f:
        text_data += f.read() + "\n"


Since our dataset is pretty small, we convert everything to lowercase and remove diacritics. There are some characters that need to be tweaked beforehand.

In [None]:
text_data = text_data.replace("’","\'")
text_data = text_data.replace("„","\"")
text_data = text_data.replace("“","\"")
text_data = text_data.replace("‒","-")
import unidecode
text_data = unidecode.unidecode(text_data).lower()


Methods for processing texts. One uses syllables as text atoms, the other uses characters

Return values:
- atom_to_int: (dict) Maps characters in the character set to ints.
- int_to_atom: (dict) Maps ints to characters in the character set.
- n_atom: (int) The number of characters in the text.
- n_vocab: (int) The number of unique characters in the text.'''
- data: preprocessed input

In [None]:
def process_text_char(text_data):
    # create mapping of unique chars to integers, and a reverse mapping
    chars = sorted(set(text_data)) #sorted is necessary for checkpointing model and reopening in later sessions
    char_to_int = {c: i for i, c in enumerate(chars)}
    int_to_char = {i: c for i, c in enumerate(chars)}
    # summarize the loaded data
    n_chars = len(text_data)
    n_vocab = len(chars)    
    return char_to_int, int_to_char, n_chars, n_vocab, text_data

def process_text_syllable(text_data):
    syllable_data = slabikar.slabikar(text_data)
    syllables = sorted(set(syllable_data))
    syllable_to_int = {c: i for i, c in enumerate(syllables)}
    int_to_syllable = {i: c for i, c in enumerate(syllables)}
    n_syllables = len(syllable_data)
    n_vocab = len(syllables)    
    return syllable_to_int, int_to_syllable, n_syllables, n_vocab, syllable_data

Processes data to overlapping sequences. Since we are doing many to many RNN, targets are sequences of atoms shifted to the right from the source. Syllable data cannot be preprocessed as whole. Use generator instead.

In [None]:
def createInput(text, maxlen, step, n_vocab, atom_to_int):
    dataX = []
    dataY = []
    sentences = []
    targets = []
    for i in range(0, len(text) - maxlen - 1, step):
        sentences.append(text[i: i + maxlen])
        targets.append(text[i + 1: i + maxlen + 1])
    X = np.zeros((len(sentences), maxlen, n_vocab), dtype=np.bool)
    y = np.zeros((len(sentences), maxlen, n_vocab), dtype=np.bool)
    for i in range(len(sentences)):
        sentence = sentences[i]
        target = targets[i]
        for j in range(maxlen):
            X[i][j][atom_to_int[sentence[j]]] = 1
            y[i][j][atom_to_int[target[j]]] = 1
    return X,y

def get_batch(batch, starts, text_data, seq_length, batch_size, 
              atom_to_int, n_vocab):
    dataX = []
    dataY = []
    for start in range(batch_size * batch, batch_size * (batch + 1)): 
        seq_in  = text_data[starts[start]:starts[start] + seq_length]
        seq_out = text_data[starts[start]+1:starts[start]+1 + seq_length]
        dataX.append([atom_to_int[atom] for atom in seq_in])
        dataY.append([atom_to_int[atom] for atom in seq_out])
        
    X = np_utils.to_categorical(dataX, num_classes=n_vocab)
    y = np_utils.to_categorical(dataY, num_classes=n_vocab)
    X = X.reshape(batch_size, seq_length, n_vocab) #might be unnecessary in this configuration
    y = y.reshape(batch_size, seq_length, n_vocab)
    return X, y

#use this for syllables
def generate_batches(mode, text_data, seq_length, validation_split,
                     batch_size, atom_to_int, n_atom, n_vocab,
                     random_seed=42, shuffle=True):
    #Same seed is needed to guarantee that val. and train. sets are disjoint
    random.seed(random_seed)
    starts = list(range(n_atom - (seq_length+1)))
    if shuffle:
        random.shuffle(starts)
    n_batches = len(starts) // batch_size
    validation_size = round(n_batches * validation_split)
    while True:
        if mode == 'validation':
            for batch in range(validation_size):
                X, y = get_batch(batch, starts, text_data, seq_length, 
                                 batch_size, atom_to_int, n_vocab)
                yield X, y
            
        elif mode == 'train':
            for batch in range(validation_size, n_batches):
                X, y = get_batch(batch, starts, text_data, seq_length, 
                                 batch_size, atom_to_int, n_vocab)
                yield X, y
        else:
            raise ValueError("only 'validation' and 'train' modes accepted")

class BatchSequence(Sequence):
    def __init__(self,mode, text_data, seq_length, validation_split,
                     batch_size, atom_to_int, n_atom, n_vocab,
                     random_seed=42, shuffle=True):
        self.starts = list(range(0,n_atom - (seq_length+1)))
        random.seed(random_seed)
        if shuffle:
            random.shuffle(self.starts)
        self.n_batches = len(self.starts) // batch_size
        self.validation_size = round(self.n_batches * validation_split)
        self.mode = mode
        self.text_data=text_data
        self.atom_to_int=atom_to_int
        self.n_vocab= n_vocab
        self.seq_length=seq_length
        self.batch_size=batch_size
        
    def __len__(self):
        if self.mode == 'validation':
            return self.validation_size            
        elif self.mode == 'train':
            return self.n_batches - self.validation_size
        else:
            raise ValueError("only 'validation' and 'train' modes accepted")
            
    def __getitem__(self, idx):
        if self.mode == 'validation':
            X, y = get_batch(idx, self.starts, self.text_data, self.seq_length, 
                                 self.batch_size, self.atom_to_int, self.n_vocab)
            return X, y
            
        elif self.mode == 'train':
            X, y = get_batch(idx+self.validation_size, self.starts, self.text_data, self.seq_length, 
                                 self.batch_size, self.atom_to_int, self.n_vocab)
            return X, y
        else:
            raise ValueError("only 'validation' and 'train' modes accepted")

Model builder.

In [None]:
def build_model(batch_size, seq_length, n_vocab, rnn_size, num_layers, drop_prob):
    model = Sequential()
    for i in range(num_layers):
        if i == num_layers - 1:
            # add last layer
            model.add(TimeDistributed(Dense(n_vocab, activation='softmax')))
        elif i == 0:
            # add first hidden layer
            model.add(LSTM(rnn_size, batch_input_shape=(None, seq_length, n_vocab), return_sequences=True))
            model.add(Dropout(drop_prob))
        else:
            # add middle hidden layer
            model.add(LSTM(rnn_size, return_sequences=True))
            model.add(Dropout(drop_prob))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model


Use this to prepare char data (contains some debug printing to ensure everything looks good). CreateInput is risky to use with small step - large amount of RAM is used. Use Sequence instead.

In [None]:
atom_to_int, int_to_atom, n_atoms, n_vocab, data = process_text_char(text_data)
X,y = createInput(data, maxlen, step, n_vocab, atom_to_int)
print(atom_to_int)
print(int_to_atom)
print(n_atoms)
print(n_vocab)

Use this to prepare syllable model

In [None]:
atom_to_int, int_to_atom, n_atoms, n_vocab, data = process_text_syllable(text_data)
print(n_atoms)
print(n_vocab)

Train syllable model

In [None]:
callbacks = [ModelCheckpoint('checkpoints/weights-{epoch:02d}-{val_acc:.2f}-{val_loss:.2f}.hdf5', monitor='val_acc', verbose=1, save_best_only=True, mode='max')]
model = build_model(batch_size, maxlen, n_vocab, rnn_size, num_layers, drop_prob)

# batches should divide sequences
n_batches = (n_atoms - (maxlen+1)) // batch_size
batch_params = (data, maxlen, validation_split, batch_size, atom_to_int, n_atoms, n_vocab)

In [None]:
history = model.fit_generator(
    generator = BatchSequence('train', *batch_params),
    validation_data = BatchSequence('validation', *batch_params),
    validation_steps = int(n_batches * validation_split),
    epochs = epochs, # tune following parameters at will. Steps per epoch are used for pure generator
    steps_per_epoch = n_batches-1,
    max_queue_size = 10,
    workers=3,
    use_multiprocessing=True,
    verbose=1,
    callbacks = callbacks)

Plot training values

In [None]:
plt.figure()
plt.plot(history.history['loss'], label='training loss')
plt.plot(history.history['val_loss'], label='validation loss')
plt.legend(loc='best')

plt.figure()
plt.plot(history.history['acc'], label='training accuracy')
plt.plot(history.history['val_acc'], label='validation accuracy')
plt.legend(loc='best')
plt.show()

Here we can save the last model in case it hasn't improved.

In [None]:
model.save('checkpoints/last.hdf5')

Here we can load saved model to play around with it (make sure to use correct dataset preprocessing for loaded model).

In [None]:
model = load_model('checkpoints/Fix-13-0.96-0.20.hdf5')

Training was using stateless model with fixed length sequence. This is not what we want to do since when generating, the sequence is theoretically unbounded length. Here is the function that converts a training model to one more suitable for generation. Using stateful model it should be possible to  and we want feed the model character after character but it seems that poems generated this way are confusing for the network and it outputs gibberish. So instead of this method (used in original Karpathy's article) we use computationally more intensive and less elegant approach. We use stateless network and feed it the whole poem during each step.

In [None]:
def generative_model(model):
    config = model.get_config()
    
    for layer in config['layers']:
        if 'stateful' in layer['config']:
            layer['config']['stateful'] = False #true for one char at time.
        if 'trainable' in layer['config']:
            layer['config']['trainable'] = False
        if 'batch_input_shape' in layer['config']:
            #we expect 3 dimensions
            orig = layer['config']['batch_input_shape']
            layer['config']['batch_input_shape'] = (1, None, orig[2]) #(None, 1, orig[2]) for one char at time
    inference_model = Sequential.from_config(config)
    inference_model.trainable = False
    inference_model.set_weights(model.get_weights())
    inference_model.reset_states()
    return inference_model

In [None]:
gmodel = generative_model(model)
model.summary()
gmodel.summary()

This function chooses the atom output from prediction. The temperature controls the level of conservativeness of the network. Low temperature means the network chooses the highest scoring prediction, high temperature allows it to experiment more.

In [None]:
def pick_atom_index(predictions, temperature):
    preds = np.asarray(predictions).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

Following function is used to generate output in the less elegant way with better results. (Requires small tweaks to work with char model - syllable model uses list of strings, char just one string)

In [None]:
def sample_model_slow(model,sample_length, int_to_atom, atom_to_int, n_vocab, temperatures=[0.9]):
    outerlist = []
    for diversity in temperatures:
        seed_string = slabikar.slabikar("vidim zem, jej jasot skvie sa ")
        sl = len(seed_string)
        for i in range(sample_length):
            test_in = np.zeros((1, len(seed_string), n_vocab))
            for t, atom in enumerate(seed_string):
                test_in[0, t, atom_to_int[atom]] = 1
            entire_prediction = gmodel.predict(test_in, verbose=0)[0] #batch wrapped
            next_index = pick_atom_index(entire_prediction[-1], diversity)
            next_atom = int_to_atom[next_index]
            seed_string = seed_string + [next_atom]
        print("".join(seed_string))
        #get activations for generated poem
        last = np.zeros((1, len(seed_string), n_vocab))
        for t, atom in enumerate(seed_string):
            last[0, t, atom_to_int[atom]] = 1
        adict = get_activations(model, last) 
        keylist = sorted(adict.keys()) #to guarantee that layers are in correct order all the time
        neuronsSnapshot = [[] for i in range(len(seed_string))]# for each timestep we need to merge its layers
        for k in keylist:
            if 'lstm' in k:#this is interesting for us
                for ix, act in enumerate(np.squeeze(adict[k])): # we are iterating over activations through time steps
                    neuronsSnapshot[ix] = np.concatenate((neuronsSnapshot[ix], act))
            else:
                pass
        outerlist.append({'poem': seed_string, 'neurons':np.array(neuronsSnapshot).tolist(), 'desc':''}) 
    return outerlist
fout = sample_model_slow(gmodel, 10, int_to_atom, atom_to_int, n_vocab,temperatures=[0.6,0.9,1.2,1.6])

Following function is used to generate some sample outputs from trained model in character by character fashion. It also saves activations of neurons.

In [None]:
def sample_model(model,sample_length, int_to_atom, atom_to_int, n_vocab, temperatures):
    outerlist = []
    for diversity in temperatures:
        print("generating with diversity: "+str(diversity))
        seed = ["no",","," ","na"," ","u","pa","tie"," ","vr","chu"," ","pri","duc"," ","za"]
        encoded = np.zeros((len(seed), n_vocab))
        for t, atom in enumerate(seed):
            encoded[t, atom_to_int[atom]] = 1.
        #print(encoded)
        generated = seed
        #encoded = np_utils.to_categorical(seed_enc, num_classes=n_vocab)
        model.reset_states()
        neuronsSnapshot = []
        for x in encoded[:-1]:
            #print(x)
            # input shape: (1, 1)
            # saturate the net
            model.predict([[[x]]])

        next_index = encoded[-1]
        for i in range(sample_length):
            #print(i)
            x = np.array([[next_index]])
            # input shape: (1, 1)
            #we need the activations
            adict = get_activations(model, x)
            keylist = sorted(adict.keys()) #to guarantee that layers are in correct all the time
            preds = []
            alist = []
            for k in keylist:
                if 'lstm' in k:#this is interesting
                    #print(adict[k])
                    alist = np.concatenate((alist, np.squeeze(adict[k])))
                    #alist += adict[k] #seems like this is a numpy array
                elif 'time' in k: 
                    probs = adict[k]
                else:
                    pass
            #probs = model.predict(x)
            # output shape: (1, 1, vocab_size)
            number = pick_atom_index(probs[0][0], diversity)
            # append to sequence
            generated += int_to_atom[number]
            neuronsSnapshot.append(alist)
            next_index = np_utils.to_categorical(number,num_classes=n_vocab)
        print(generated)
        outerlist.append({'poem': generated[len(seed)-1:], 'neurons':np.array(neuronsSnapshot).tolist()})#-2 
    return {'data': outerlist, 'neurons': len(outerlist[0]['neurons'][0])}
    # data = {data:[{poem,neurons}...],neurons}


    
fout = sample_model(gmodel, 500, int_to_atom, atom_to_int, n_vocab,[0.5, 1.0, 1.5])

Here the activations are saved for visualization

In [None]:
import json
with open('out/data.json','w') as f:
    json.dump({"data":fout,"neurons":len(fout[0]["neurons"][0])}, f, ensure_ascii=False) 

Small tool to visually check how is the network doing so far

In [None]:
g = BatchSequence('validation', *batch_params, shuffle=False)
print("_________")
X,y =  g.__getitem__(1)
print(X[0])
preds = model.predict(X, verbose=1)
print(list(np.argmax(y[0],axis=1)))
print("".join(list(map(lambda x: int_to_atom[x],list(np.argmax(preds[0],axis=1))))))
print("_________")
print("".join(list(map(lambda x: int_to_atom[x],list(np.argmax(preds[2],axis=1))))))
print("_________")
print("".join(list(map(lambda x: int_to_atom[x],list(np.argmax(preds[3],axis=1))))))
print("_________")
print("".join(list(map(lambda x: int_to_atom[x],list(np.argmax(preds[4],axis=1))))))