In [10]:
import tensorflow as tf
from keras import initializers, regularizers, constraints
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Layer
import keras.backend as K


In [11]:
class Attention(Layer):
    def __init__(self,
                 W_regularizer=None, b_regularizer=None,
                 W_constraint=None, b_constraint=None,
                 bias=True, return_attention=False,
                 **kwargs):
        self.supports_masking = True
        self.return_attention = return_attention
        self.init = initializers.get('glorot_uniform')

        self.W_regularizer = regularizers.get(W_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)

        self.W_constraint = constraints.get(W_constraint)
        self.b_constraint = constraints.get(b_constraint)

        self.bias = bias
        super(Attention, self).__init__(**kwargs)


    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[-1],),
                                 initializer=self.init,
                                 name=f'{self.name}_Weight',
                                 regularizer=self.W_regularizer,
                                 constraint=self.W_constraint)
        if self.bias:
            self.b = self.add_weight(shape=(input_shape[1],),
                                     initializer='zero',
                                     name=f'{self.name}_bias',
                                     regularizer=self.b_regularizer,
                                     constraint=self.b_constraint)
        else:
            self.b = None

        self.built = True

    def compute_mask(self, input, input_mask=None):
        # do not pass the mask to the next layers
        return None

    def call(self, x, mask=None):
        eij = K.squeeze(K.dot(x, K.expand_dims(self.W)), axis=-1)

        if self.bias:
            eij += self.b

        eij = K.tanh(eij)

        a = K.exp(eij)

        # apply mask after the exp. will be re-normalized next
        if mask is not None:
            # Cast the mask to floatX to avoid float64 upcasting in theano
            a *= K.cast(mask, K.floatx())

        # in some cases especially in the early stages of training the sum may be almost zero
        # and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
        # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
        a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

        weighted_input = x * K.expand_dims(a)

        result = K.sum(weighted_input, axis=1)

        if self.return_attention:
            return [result, a]
        return result

    def compute_output_shape(self, input_shape):
        if self.return_attention:
            return [(input_shape[0], input_shape[-1]),
                    (input_shape[0], input_shape[1])]
        else:
            return input_shape[0], input_shape[-1]


In [14]:
from dataclasses import dataclass

@dataclass
class BiGruSettings:
    name : str
    input_shape: tuple[int]
    n_units: int

@dataclass 
class DistanceSettings:
    name : str
    mode: str

@dataclass
class InnerModelSettings:
    name: str
    embedding_input_dim : int
    embedding_output_dim : int
    char_level_settings: BiGruSettings
    word_level_settings: BiGruSettings

@dataclass
class OuterModelSettings:
    name: str
    inner_settings: InnerModelSettings
    distance_settings: DistanceSettings

In [37]:
class BiGruWithAttention(Layer):
    def __init__(self,settings:BiGruSettings,**kwargs):
        super().__init__(**kwargs)
        self.settings = settings
        self.gru_layer = tf.keras.layers.GRU(self.settings.n_units,return_sequences=True,name=self.settings.name+"_gru")
        self.bidirectional_layer = tf.keras.layers.Bidirectional(self.gru_layer, input_shape=self.settings.input_shape, name=self.settings.name+"_BiGru")
        self.attention_layer = Attention(return_attention=True, name = self.settings.name+"_attention_vec")
    
    def __call__(self,x,training=False):
        x = self.bidirectional_layer(x,training=training)
        sequence, element_scores = self.attention_layer(x,training=training)

        return sequence

class DistanceLayer(tf.keras.layers.Layer):
    """
    Layer responsible for computation of cosine similarity
    """
    def __init__(self,settings:DistanceSettings,**kwargs):
        super().__init__(**kwargs)
        self.settings = settings

    def call(self,input_a,input_b):
        if self.settings.mode == "abs":
            dist = tf.math.abs(tf.keras.slosses.cosine_similarity(input_a,input_b))
        elif self.settings.mode == "zero_to_one":
            dist = ( 1-tf.keras.losses.cosine_similarity(input_a,input_b) ) / 2
        else:
            dist = tf.keras.losses.cosine_similarity(input_a,input_b)

        return (dist)


class InnerModel(Layer):
    def __init__(self,settings:InnerModelSettings,**kwargs):
        super().__init__(**kwargs)

        self.settings = settings
        self.embedding_layer = tf.keras.layers.Embedding(input_dim=self.settings.embedding_input_dim,output_dim=self.settings.embedding_output_dim)
        self.char_level_attention = BiGruWithAttention(self.settings.char_level_settings)
        self.word_level_attention = BiGruWithAttention(self.settings.word_level_settings)

    def __call__(self, x, training=False):
        print(f"Input: {x}")
        x = self.embedding_layer(x,training=training)
        print(f"X after embedding: {x}")
        x = self.char_level_attention(x)
        print(f"X char level attention: {x}")



In [16]:
test_string = ["Grigory Sharkov","Boris Jonson Junior"]
tokenizer = Tokenizer(char_level=True,lower=False)
tokenizer.fit_on_texts(test_string)
print(tokenizer.word_index)

{'o': 1, 'r': 2, 'i': 3, ' ': 4, 'n': 5, 's': 6, 'J': 7, 'G': 8, 'g': 9, 'y': 10, 'S': 11, 'h': 12, 'a': 13, 'k': 14, 'v': 15, 'B': 16, 'u': 17}


In [26]:
from typing import Iterable
def preprocess_list(lst:Iterable, tokenizer:Tokenizer, max_words:int=None, max_char:int=None):
    """
    Function preprocesses a given list. A string is turned into a sequence of words of length max_words,
    then each word is turned into a sequence of characters of length max_char, if any of maximum parameters is 
    not specified, they are calcualted based on the provided list

    function returns a tf.DataSet
    """
    if max_words is None:
        word_counts = [len(x.split()) for x in lst]
        max_words = max(word_counts)

    if max_char is None:
        words = list(set([word for name in lst for word in name.split()]))
        word_lengths = [len(word) for word in words]
        max_char = max(word_lengths)

    padded_test_string = [x + " "*(max_words-len(x.split())) for x in lst]
    test_split = [x.split(" ") for x in padded_test_string]
    test_sequences = [tokenizer.texts_to_sequences(x) for x in test_split]
    padded_test_sequences = [pad_sequences(x,maxlen=max_char,padding="post") for x in test_sequences]
    return_matrix = tf.data.Dataset.from_tensor_slices(padded_test_sequences)
    return return_matrix

test_names = preprocess_list(test_string,tokenizer,3,4).batch(2)
test_names

<BatchDataset shapes: (None, 3, 4), types: tf.int32>

In [38]:
experiment_settings = OuterModelSettings(
    name="two_level_rnn_with_attention",
    inner_settings=InnerModelSettings(
        name="inner_model",
        embedding_input_dim=len(tokenizer.word_index),
        embedding_output_dim=5,
        char_level_settings=BiGruSettings(
            name="char_bigru",
            input_shape=(3,4,5),
            n_units=6
        ),
        word_level_settings=BiGruSettings(
            name="word_bigru",
            input_shape=(3,6),
            n_units=7
        ),
    ),
    distance_settings = DistanceSettings("distance","zero_to_one")
)


inner_model = InnerModel(experiment_settings.inner_settings)
for el in test_names:
    print(inner_model(el))

Input: [[[ 9  1  2 10]
  [ 2 14  1 15]
  [ 0  0  0  0]]

 [[ 1  2  3  6]
  [ 5  6  1  5]
  [ 5  3  1  2]]]
X after embedding: [[[[ 0.01586528  0.04389241 -0.00771624 -0.01105056  0.01934382]
   [ 0.01433593  0.00120996  0.0304316   0.03688076  0.02319697]
   [ 0.02539683  0.02100693 -0.0137306   0.01544705  0.03589076]
   [ 0.00408237  0.04511049  0.04383305 -0.01469549  0.03973918]]

  [[ 0.02539683  0.02100693 -0.0137306   0.01544705  0.03589076]
   [-0.01071814 -0.00616956 -0.00104443 -0.0175782   0.01681907]
   [ 0.01433593  0.00120996  0.0304316   0.03688076  0.02319697]
   [ 0.01163561 -0.02268995 -0.04409087 -0.03580417  0.01963869]]

  [[-0.03303801 -0.00533998 -0.00573512  0.02302686  0.03536339]
   [-0.03303801 -0.00533998 -0.00573512  0.02302686  0.03536339]
   [-0.03303801 -0.00533998 -0.00573512  0.02302686  0.03536339]
   [-0.03303801 -0.00533998 -0.00573512  0.02302686  0.03536339]]]


 [[[ 0.01433593  0.00120996  0.0304316   0.03688076  0.02319697]
   [ 0.02539683  0.02

ValueError: Input 0 of layer char_bigru_BiGru is incompatible with the layer: expected ndim=3, found ndim=4. Full shape received: (2, 3, 4, 5)