In [None]:
import numpy as np
import theano
import theano.tensor as T
from theano import shared 
from collections import OrderedDict

dtype=T.config.floatX
theano.config.optimizer='fast_compile'

In [None]:
#Embedded Reber Grammar http://christianherta.de/lehre/dataScience/machineLearning/neuralNetworks/reberGrammar.php

chars='BTSXPVE'

graph = [[(1,5),('T','P')] , [(1,2),('S','X')], \
           [(3,5),('S','X')], [(6,),('E')], \
           [(3,2),('V','P')], [(4,5),('V','T')] ]


def in_grammar(word):
    if word[0] != 'B':
        return False
    node = 0    
    for c in word[1:]:
        transitions = graph[node]
        try:
            node = transitions[0][transitions[1].index(c)]
        except ValueError: # using exceptions for flow control in python is common
            return False
    return True        
      
def sequenceToWord(sequence):
    """
    converts a sequence (one-hot) in a reber string
    """
    reberString = ''
    for s in sequence:
        index = np.where(s==1.)[0][0]
        reberString += chars[index]
    return reberString
    
def generateSequences(minLength):
    while True:
        inchars = ['B']
        node = 0
        outchars = []    
        while node != 6:
            transitions = graph[node]
            i = np.random.randint(0, len(transitions[0]))
            inchars.append(transitions[1][i])
            outchars.append(transitions[1])
            node = transitions[0][i]
        if len(inchars) > minLength:  
            return inchars, outchars


def get_one_example(minLength):
    inchars, outchars = generateSequences(minLength)
    inseq = []
    outseq= []
    for i,o in zip(inchars, outchars): 
        inpt = np.zeros(7)
        inpt[chars.find(i)] = 1.     
        outpt = np.zeros(7)
        for oo in o:
            outpt[chars.find(oo)] = 1.
        inseq.append(inpt)
        outseq.append(outpt)
    return inseq, outseq


def get_char_one_hot(char):
    char_oh = np.zeros(7)
    for c in char:
        char_oh[chars.find(c)] = 1.
    return [char_oh] 
    
def get_n_examples(n, minLength=10):
    examples = []
    for i in xrange(n):
        examples.append(get_one_example(minLength))
    return examples

emb_chars = "TP"


def get_one_embedded_example(minLength=10):
    i, o = get_one_example(minLength)
    emb_char = emb_chars[np.random.randint(0, len(emb_chars))]
    new_in = get_char_one_hot(('B',))
    new_in += get_char_one_hot((emb_char,))
    new_out= get_char_one_hot(emb_chars)
    new_out+= get_char_one_hot('B',)
    new_in += i
    new_out += o
    new_in += get_char_one_hot(('E',))
    new_in += get_char_one_hot((emb_char,))
    new_out += get_char_one_hot((emb_char, ))
    new_out += get_char_one_hot(('E',))
    return new_in, new_out
    
def get_n_embedded_examples(n, minLength=10):
    examples = []
    for i in xrange(n):
        examples.append(get_one_embedded_example(minLength))
    return examples

In [None]:
train_data = get_n_embedded_examples(1000)

In [None]:
def sample_weights(sizeX, sizeY):
    W = np.random.uniform(low=-1., high=1., size=(sizeX, sizeY))
    _, svs, _ = np.linalg.svd(W)
    values = np.asarray(W / svs[0], dtype=dtype)
    return shared(values, borrow=True) 

class LSTM:
    def __init__(self, n_in, n_lstm, n_out):        
        self.n_in = n_in
        self.n_lstm = n_lstm
        self.n_out = n_out
        self.W_xi = sample_weights(n_in, n_lstm)
        self.W_hi = sample_weights(n_lstm, n_lstm)
        self.W_ci = sample_weights(n_lstm, n_lstm)
        self.b_i = shared(np.cast[dtype](np.random.uniform(-0.5,.5,size = n_lstm)))
        self.W_xf = sample_weights(n_in, n_lstm)
        self.W_hf = sample_weights(n_lstm, n_lstm)
        self.W_cf = sample_weights(n_lstm, n_lstm)
        self.b_f = shared(np.cast[dtype](np.random.uniform(0, 1.,size = n_lstm)))
        self.W_xc = sample_weights(n_in, n_lstm)
        self.W_hc = sample_weights(n_lstm, n_lstm)
        self.b_c = shared(np.zeros(n_lstm, dtype=dtype))
        self.W_xo = sample_weights(n_in, n_lstm)
        self.W_ho = sample_weights(n_lstm, n_lstm)
        self.W_co = sample_weights(n_lstm, n_lstm)
        self.b_o = shared(np.cast[dtype](np.random.uniform(-0.5,.5,size = n_lstm)))
        self.W_hy = sample_weights(n_lstm, n_out)
        self.b_y = shared(np.zeros(n_out, dtype=dtype))
        self.params = [self.W_xi, self.W_hi, self.W_ci, self.b_i, 
                       self.W_xf, self.W_hf, self.W_cf, self.b_f, 
                       self.W_xc, self.W_hc, self.b_c, 
                       self.W_ho, self.W_co, self.W_co, self.b_o, 
                       self.W_hy, self.b_y]
                

        def step_lstm(x_t, h_tm1, c_tm1):
            i_t = T.nnet.sigmoid(T.dot(x_t, self.W_xi) + T.dot(h_tm1, self.W_hi) + T.dot(c_tm1, self.W_ci) + self.b_i)
            f_t = T.nnet.sigmoid(T.dot(x_t, self.W_xf) + T.dot(h_tm1, self.W_hf) + T.dot(c_tm1, self.W_cf) + self.b_f)
            c_t = f_t * c_tm1 + i_t * T.tanh(T.dot(x_t, self.W_xc) + T.dot(h_tm1, self.W_hc) + self.b_c) 
            o_t = T.nnet.sigmoid(T.dot(x_t, self.W_xo)+ T.dot(h_tm1, self.W_ho) + T.dot(c_t, self.W_co)  + self.b_o)
            h_t = o_t * T.tanh(c_t)
            y_t = T.nnet.sigmoid(T.dot(h_t, self.W_hy) + self.b_y) 
            return [h_t, c_t, y_t]
        
        X = T.matrix() # X is a sequence of vector   
        Y = T.matrix() # Y is a sequence of vector
        h0 = shared(np.zeros(self.n_lstm, dtype=dtype)) # initial hidden state 
        c0 = shared(np.zeros(self.n_lstm, dtype=dtype)) # initial cell state
        
        [h_vals, c_vals, y_vals], _ = theano.scan(fn=step_lstm,                                  
                                                  sequences=X,
                                                  outputs_info=[h0, c0, None])
        
        self.output = y_vals
    
        cost = -T.mean(Y * T.log(y_vals)+ (1.- Y) * T.log(1. - y_vals))
        lr = shared(np.cast[dtype](0.1))
        gparams = T.grad(cost, self.params)
        updates = OrderedDict()
        for param, gparam in zip(self.params, gparams):
            updates[param] = param - gparam * lr
        self.train = theano.function(inputs = [X, Y], outputs = cost, updates=updates) 
        
        self.pred = theano.function(inputs = [X], outputs = self.output)                            

In [None]:
model = LSTM(7, 50, 7)

In [None]:
nb_epochs = 100
#stupid and naive sgd
for x in range(nb_epochs):
    error = 0.
    for j in range(len(train_data)):  
        index = np.random.randint(0, len(train_data))
        i, o = train_data[index]
        train_cost = model.train(i, o)
        error += train_cost
    if x%10==0:
            print "epoch "+str(x)+ " error: "+str(error)

In [None]:
test_data = get_n_embedded_examples(10)

def print_out(test_data):
    for i,o in test_data:
        p = model.pred(i)
        print o[-2] # target
        print np.asarray([0. if x!=np.argmax(p[-2]) else 1. for x in range(7)]) # prediction
        print 
print_out(test_data)