In [None]:
class TokenEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim, mask_embedding=False, w2v_init="uniform", pad_to_0=False):
        super(TokenEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size,
                                          output_dim=embed_dim,
                                          embeddings_initializer=w2v_init,
                                          #embeddings_constraint=tf.keras.constraints.UnitNorm(axis=0),
                                          )
        self.mask_embedding = mask_embedding
        self.embed_dim      = embed_dim
        self.pad_to_0       = pad_to_0
    def call(self, x):
        if self.mask_embedding is not False:
            mask = tf.cast(x != self.mask_embedding, np.float32)
        x = self.token_emb(x)
        if self.mask_embedding is not False:
            mask_embedded = tf.tile(tf.expand_dims(mask, -1), [1, 1, self.embed_dim])
            if self.pad_to_0:
                return x*mask_embedded, mask
            else:
                return x, mask
        else:
            return x, tf.zeros([0])

In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, p_enc="w2v"):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential(
            [layers.Dense(ff_dim, activation="gelu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.p_enc = p_enc

    def call(self, inputs, masked=None, training=None):
        if self.p_enc == "relative":
            x, x_q, x_k, x_v = inputs
            if masked is not None:
                attn_output, att_scores = self.att(x_q, x_k, x_v, return_attention_scores=True, attention_mask=masked)
            else:
                attn_output, att_scores = self.att(x_q, x_k, x_v, return_attention_scores=True)
            inputs = x
        else:
            if masked is not None:
                attn_output, att_scores = self.att(inputs, inputs, inputs, return_attention_scores=True, attention_mask=masked)
            else:
                attn_output, att_scores = self.att(inputs, inputs, inputs, return_attention_scores=True)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output), att_scores

In [None]:
#only new_embedding
class PositionEmbedding(layers.Layer):
    def __init__(self, maxlen, embed_dim, rate):
        super(PositionEmbedding, self).__init__()
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training):
        maxlen = tf.shape(x)[-2]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        x += self.pos_emb(positions)
        x = self.dropout(x, training=training)
        return x

In [None]:
class prepare_AttentionMask(layers.Layer):
    def __init__(self, add_reg, pool_size, name=None):
        super(prepare_AttentionMask, self).__init__(name = name)
        self.add_reg = add_reg
        self.pool_size = pool_size

    def call(self, x, training):
        x = tf.ones(tf.shape(x)) - x
        x = tf.expand_dims(x, -1)
        x = layers.MaxPool1D(pool_size=self.pool_size, strides=None, padding="valid")(x)
        if self.add_reg:
            x = tf.concat([tf.zeros((tf.shape(x)[0], 1, 1)), x], axis=1)
        x = tf.ones(tf.shape(x)) - x
        x = tf.matmul(x, x, transpose_b=True)
        return x

In [None]:
import torch
from torch import nn
from torch.nn import functional as F

class prepare_AttentionMask(nn.Module):
    def __init__(self, add_reg, pool_size):
        super(prepare_AttentionMask, self).__init__()
        self.add_reg = add_reg
        self.pool_size = pool_size
        self.maxpool = nn.MaxPool1d(pool_size)

    def forward(self, x):
        x = 1 - x
        x = x.unsqueeze(-1)
        x = self.maxpool(x)
        if self.add_reg:
            zeros = torch.zeros(x.size(0), 1, 1).to(x.device)
            x = torch.cat([zeros, x], dim=1)
        x = 1 - x
        x = torch.matmul(x, x.transpose(-1, -2))
        return x

In [None]:
class Add_REG(layers.Layer):
    def __init__(self, embed_dim, rate=0.01, name=None):
        super(Add_REG, self).__init__(name = name)
        self.reg_emb = layers.Embedding(input_dim=1, output_dim=embed_dim)
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training):
        REG     = tf.range(start=0, limit=1, delta=1)
        reg_emb = self.reg_emb(REG)
        reg_emb = self.dropout(reg_emb, training=training)
        reg_emb = tf.tile(tf.expand_dims(reg_emb, 0), [tf.shape(x)[0], 1, 1])
        concat  = tf.concat([reg_emb, x], 1)
        return concat

In [None]:
import torch
from torch import nn

class Add_REG(nn.Module):
    def __init__(self, embed_dim, rate=0.01):
        super(Add_REG, self).__init__()
        self.reg_emb = nn.Embedding(1, embed_dim)
        self.dropout = nn.Dropout(rate)

    def forward(self, x):
        REG = torch.arange(0, 1).long()
        reg_emb = self.reg_emb(REG)
        reg_emb = self.dropout(reg_emb)
        reg_emb = reg_emb.expand(x.size(0), -1, -1)
        concat = torch.cat([reg_emb, x], dim=1)
        return concat

In [None]:
def _build_model(self):
    embedding_layer = TokenEmbedding(self.maxlen, self.vocab_size, self.w2v_embdim, self.mask_embedding, self.w2v_init, self.pad_to_0)        
    pooler = layers.Dense(self.embed_dim)
    trans_layers = TransformerBlock(self.embed_dim, self.num_heads, self.ff_dim, self.t_rate, self.p_enc)
    #inputs
    inputs = []
    input1 = layers.Input(shape=(self.maxlen), name="sequence")
    inputs.append(input1)
    embedded, masked = embedding_layer(input1)

    x1 = layers.Conv1D(filters=self.embed_dim, kernel_size=6, strides=1, padding="same",
    dilation_rate=1, groups=1, activation="relu", kernel_initializer='glorot_normal')(embedded)
    x2 = layers.Conv1D(filters=self.embed_dim, kernel_size=9, strides=1, padding="same",
    dilation_rate=1, groups=1, activation="relu", kernel_initializer='glorot_normal')(embedded)
    x12 = layers.Concatenate()([x1,x2])
    x12 = layers.Dense(self.embed_dim, activation="relu")(x12)
    #x12 = layers.Dropout(self.dropout_rate)(x12)
    skip = layers.Add()([x12, embedded])
    x = layers.AveragePooling1D(pool_size=self.pool_size, strides=None, padding="valid")(skip)
    x = layers.BatchNormalization()(x)
    posemblen = x.shape[1]
    posenc_layer = PositionEmbedding(posemblen, self.embed_dim, self.t_rate)
    x = posenc_layer(x)
    masked = prepare_AttentionMask(self.add_reg, self.pool_size)(masked)
    if self.add_reg:
        add_reg = Add_REG(self.embed_dim)
        x = add_reg(x)
    att_scores = []
    x, atts = trans_layers[i](x, masked=masked)
    att_scores.append(atts)
    x = pooler(x[:, 0])
    x = tf.keras.activations.tanh(x)
    #dense1
    x = layers.Dense(self.dense, activation="relu")(x)
    x = layers.Dropout(self.dropout_rate)(x)
    #dense2
    x = layers.Dense(self.dense, activation="gelu")(x)
    x = layers.Dropout(self.dropout_rate)(x)
    #output
    output = layers.Dense(self.output_neurons, activation="linear", name="Regression_Output")(x)
    if len(att_scores) > 0:
        #att_scores = Forward(name="att_scores")(att_scores)
        self.model = tf.keras.Model(
            inputs=inputs,
            outputs={'Regression_Output': output, 'Attention_Scores': att_scores
                    #  , 'M_atts': M_atts
                        },
            )
    else:
        self.model = tf.keras.Model(
            inputs=inputs,
            outputs={'Regression_Output': output},
            )
    self.model.summary()
    img = tf.keras.utils.plot_model(self.model, f"{self.model_type}.png", show_shapes=True)
    display(img)
    print(f"\nParameters:\n")
    for k, v in vars(self).items():
        pad = ' '.join(['' for _ in range(25-len(k))])
        print(k, f" :{pad}", v)

In [None]:
import torch
from torch import nn
from torch.nn import functional as F

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.embedding_layer = TokenEmbedding(self.maxlen, self.vocab_size, self.w2v_embdim, self.mask_embedding, self.w2v_init, self.pad_to_0)
        self.pooler = nn.Linear(self.embed_dim, self.embed_dim)
        self.trans_layers = TransformerBlock(self.embed_dim, self.num_heads, self.ff_dim, self.t_rate, self.p_enc)
        self.conv1 = nn.Conv1d(self.embed_dim, self.embed_dim, kernel_size=6, stride=1, padding="same")
        self.conv2 = nn.Conv1d(self.embed_dim, self.embed_dim, kernel_size=9, stride=1, padding="same")
        self.fc = nn.Linear(self.embed_dim, self.embed_dim)
        self.avgpool = nn.AvgPool1d(self.pool_size)
        self.batchnorm = nn.BatchNorm1d(self.embed_dim)
        self.posenc_layer = PositionEmbedding(posemblen, self.embed_dim, self.t_rate)
        self.add_reg = Add_REG(self.embed_dim)
        self.dense1 = nn.Linear(self.dense, self.dense)
        self.dropout1 = nn.Dropout(self.dropout_rate)
        self.dense2 = nn.Linear(self.dense, self.dense)
        self.dropout2 = nn.Dropout(self.dropout_rate)
        self.output = nn.Linear(self.dense, self.output_neurons)

    def forward(self, x):
        embedded, masked = self.embedding_layer(x)
        x1 = F.relu(self.conv1(embedded))
        x2 = F.relu(self.conv2(embedded))
        x12 = torch.cat((x1, x2), dim=1)
        x12 = F.relu(self.fc(x12))
        skip = x12 + embedded
        x = self.avgpool(skip)
        x = self.batchnorm(x)
        x = self.posenc_layer(x)
        masked = prepare_AttentionMask(self.add_reg, self.pool_size)(masked)
        if self.add_reg:
            x = self.add_reg(x)
        x, atts = self.trans_layers(x, masked=masked)
        x = self.pooler(x[:, 0])
        x = torch.tanh(x)
        x = F.relu(self.dense1(x))
        x = self.dropout1(x)
        x = F.gelu(self.dense2(x))
        x = self.dropout2(x)
        output = self.output(x)
        return output
