In [6]:
import numpy as np

class WeightSum:
    def __init__(self):
        self.params = []
        self.grads = []
        
    def forward(self, hs, a):
        N, T, H = hs.shape
        self.hs = hs
        
        self.T = T
        self.ar = a.reshape(N, T, 1).repeat(H, axis=2) # N x T x H
        t = hs*self.ar # N x T x H
        c = np.sum(t, axis=1)
        
        return c
    
    def backward(self, dout):
        N, H = dout.shape
        dt = dout.reshape(N, 1, H).repeat(self.T, axis=1) # N x T x H
        dhs = dt * self.ar
        dar = dt * self.hs
        da = np.sum(dar, axis=2)
        
        return dhs, da

In [None]:
from util import softmax

class AttentionWeight:
    def __init__(self):
        self.params = []
        self.grads = []
        
    def forward(self, hs, h):
        N, T, H = hs.shape
        self.hs = hs
        self.H = H
        
        hr = h.reshape(N, 1, H).repeat(T, axis=1)
        t = hs*hr
        s = np.sum(t, axis=2)
        a = softmax(s)
        
        self.hr = hr
        self.a = a
        return a
    
    def backward(self, da):
        N, T = da.shape
        
        ds = (self.a * da) - (np.sum(da * self.a, axis=1, keepdims=True) * self.a) # N x S
        dt = ds.reshape(N, T, 1).repeat(self.H, axis=2)
        dhs = dt * self.hr
        dhr = dt * self.hs
        dh = np.sum(dhr, axis=1)
        
        return dhs, dh

In [8]:
class Attention:
    def __init__(self):
        self.params = []
        self.grads = []
        self.attention_weight_layer = AttentionWeight()
        self.weight_sum_layer = WeightSum()
        self.attention_weight = None
        
    def forward(self, hs, h):
        a = self.attention_weight_layer.forward(hs, h)
        self.attention_weight = a
        out = self.weight_sum_layer.forward(hs, a)
        
        return out
    
    def backward(self, dout):
        dhs1, da = self.weight_sum_layer.backward(dout)
        dhs2, dh = self.attention_weight_layer.backward(da)
        
        dhs = dhs1 + dhs2
        
        return dhs, dh

In [9]:
class TimeAttention:
    def __init__(self):
        self.params = []
        self.grads = []
        self.layers = []
        self.attention_weights = []
        
    def forward(self, hs_encoder, hs_decoder):
        # hs_encoder : N x T x H
        # hs_decoder : N x T x H
        _, T, _ = hs_encoder.shape
        
        out = []
        
        for i in range(T):
            attention_layer = Attention()
            res = attention_layer.forward(hs_encoder, hs_decoder[:, i, :])
            out.append(res)
            self.attention_weights.append(attention_layer.attention_weight)
            
            self.layers.append(attention_layer)
        
        return np.array(out).transpose(1, 0, 2) # N x T x H
    
    def backward(self, dout):
        _, T, _ = dout.shape
        
        dhs_encoder = np.zeros_like(dout)
        dhs_decoder = np.zeros_like(dout)
        
        for i in range(T-1, -1, -1):
            dout_cur = dout[:, i, :]
            dhs, dh = self.layers[i].backward(dout_cur)
            dhs_decoder[:, i, :] = dh
            dhs_encoder += dhs
        
        return dhs_encoder, dhs_decoder

In [None]:
from layer import Encoder, TimeEmbedding, TimeAffine, TimeLSTM

class AttentionEncoder(Encoder):
    def forward(self, xs):
        out = xs
        for layer in self.layers:
            out = layer.forward(out)
        
        self.hs = out
        return out

    def backward(self, dhs):
        dout = dhs
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout
    
class AttentionDecoder:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        self.V = vocab_size
        self.D = wordvec_size
        self.H = hidden_size
        
        self.params = []
        self.grads = []
        
        self.W_embed = (0.01 * np.random.randn(self.V, self.D)).astype('float32')
        self.W_x_lstm = ((1 / np.sqrt(self.D)) * np.random.randn(self.D, 4*self.H)).astype('float32')
        self.W_h_lstm = ((1 / np.sqrt(self.H)) * np.random.randn(self.H, 4*self.H)).astype('float32')
        self.b_lstm = np.zeros(4*self.H).astype('float32')
        self.W_affine = ((1 / np.sqrt(2*self.H)) * np.random.randn(2*self.H, self.V)).astype('float32')
        self.b_affine = np.zeros(self.V).astype('float32')
        
        self.layers = []
        self.layers.append(TimeEmbedding(self.W_embed))
        self.layers.append(TimeLSTM(self.W_x_lstm, self.W_h_lstm, self.b_lstm))
        self.layers.append(TimeAttention())
        self.layers.append(TimeAffine(self.W_affine, self.b_affine))
        
        for layer in self.layers:
            for param, grad in zip(layer.params, layer.grads):
                self.params.append(param)
                self.grads.append(grad)
    
    def forward(self, xs, hs):
        self.layers[1].h = hs[:, -1, :]
        
        word_vecs = self.layers[0].forward(xs) # N x T x D
        
        hs_decoder = self.layers[1].forward(word_vecs)  # N x T x H
        
        c = self.layers[2].forward(hs, hs_decoder)    # N x T x H
        
        concat_vec = np.concatenate([c, hs_decoder], axis=2) # N x T x 2H
        logit = self.layers[3].forward(concat_vec) # N x T x V
        
        return logit
    
    def backward(self, dout):
        dconcat_vec = self.layers[3].backward(dout)
        dc = dconcat_vec[:, :, :self.H]
        dhs_decoder1 = dconcat_vec[:, :, self.H:]
        
        dhs1, dhs_decoder2 = self.layers[2].backward(dc) # dhs1 : N x T x H
        
        dword_vecs = self.layers[1].backward(dhs_decoder1 + dhs_decoder2)
        
        self.layers[0].backward(dword_vecs)
        
        dhs2 = self.layers[1].dh # dhs2 : N x H
        dhs = np.zeros_like(dhs1)
        dhs[:, -1, :] = dhs2
        dhs += dhs1
        
        return dhs
    
    def generate(self, hs, start_id, sample_size):
        self.layers[1].h = hs[:, -1, :] # N x H
        
        input = start_id
        word_ids = [start_id]

        while len(word_ids) < sample_size:
            input = np.array(input, dtype=np.int32).reshape(1, 1)
            
            word_vecs = self.layers[0].forward(input) # N x T x D
            
            hs_decoder = self.layers[1].forward(word_vecs)  # N x T x H
            
            c = self.layers[2].forward(hs, hs_decoder)    # N x T x H
            
            concat_vec = np.concatenate([c, hs_decoder], axis=2) # N x T x 2H
            logit = self.layers[3].forward(concat_vec) # N x T x V

            sample = int(np.argmax(logit.flatten()))

            word_ids.append(sample)
            input = sample

        return word_ids

In [None]:
from layer import Seq2seq, TimeSoftmaxWithLoss

class AttentionSeq2seq(Seq2seq):
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        self.V = vocab_size
        self.D = wordvec_size
        self.H = hidden_size
        
        self.params = []
        self.grads = []
        
        self.encoder = AttentionEncoder(vocab_size, wordvec_size, hidden_size)
        self.decoder = AttentionDecoder(vocab_size, wordvec_size, hidden_size)
        self.softmax_with_loss = TimeSoftmaxWithLoss()
        
        for param, grad in zip(self.encoder.params, self.encoder.grads):
            self.params.append(param)
            self.grads.append(grad)
            
        for param, grad in zip(self.decoder.params, self.decoder.grads):
            self.params.append(param)
            self.grads.append(grad)