<a href="https://colab.research.google.com/github/TimOgden/RoastBot/blob/master/RoastTransformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
import numpy as np

In [2]:
def get_angles(pos, i, d_model):
  angle_rates = 1 / np.power(1000, (2*(i//2)) / np.float32(d_model))
  return pos * angle_rates

In [3]:
def positional_encoding(position, d_model):
  angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)
  angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
  angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

  pos_encoding = angle_rads[np.newaxis, ...]
  return tf.cast(pos_encoding, dtype=tf.float32)

In [21]:
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self, num_heads, d_model=256, dff=2048, dropout=.1, name=None):
    super().__init__(name=name)
    self.self_attention = tf.keras.layers.MultiHeadAttention(num_heads, key_dim=d_model)
    self.fc1 = tf.keras.layers.Dense(dff, activation=tf.keras.activations.gelu)
    self.fc2 = tf.keras.layers.Dense(d_model)
    self.dropout = tf.keras.layers.Dropout(dropout)
    self.layer_norm1 = tf.keras.layers.LayerNormalization()
    self.layer_norm2 = tf.keras.layers.LayerNormalization()

  def call(self, x, training, mask):
    
    attention_weights, _ = self.self_attention(x, x, attention_mask=mask,
                                                return_attention_scores=True)
    normalized = self.layer_norm1(x + attention_weights)
    x = self.fc2(self.dropout(self.fc1(normalized), training=training))
    return self.layer_norm2(x + normalized)

class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self, d_model, num_heads, dff, rate=0.1):
    super(DecoderLayer, self).__init__()

    self.mha1 = tf.keras.layers.MultiHeadAttention(num_heads, key_dim=d_model)
    self.mha2 = tf.keras.layers.MultiHeadAttention(num_heads, key_dim=d_model)

    self.fc1 = tf.keras.layers.Dense(dff, activation=tf.keras.activations.gelu)
    self.fc2 = tf.keras.layers.Dense(d_model)

    self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

    self.dropout1 = tf.keras.layers.Dropout(rate)
    self.dropout2 = tf.keras.layers.Dropout(rate)
    self.dropout3 = tf.keras.layers.Dropout(rate)

  def call(self, x, enc_output, training,
           look_ahead_mask, padding_mask):
    # enc_output.shape == (batch_size, input_seq_len, d_model)

    attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask,
                                           return_attention_scores=True)  # (batch_size, target_seq_len, d_model)
    attn1 = self.dropout1(attn1, training=training)
    out1 = self.layernorm1(attn1 + x)

    attn2, attn_weights_block2 = self.mha2(
        enc_output, enc_output, out1, padding_mask,
                     return_attention_scores=True)  # (batch_size, target_seq_len, d_model)
    attn2 = self.dropout2(attn2, training=training)
    out2 = self.layernorm2(attn2 + out1)  # (batch_size, target_seq_len, d_model)

    ffn_output = self.fc2(self.dropout3(self.fc1(out2), training=training))
    out3 = self.layernorm3(ffn_output + out2)  # (batch_size, target_seq_len, d_model)

    return out3, attn_weights_block1, attn_weights_block2

In [15]:
encoder_layer = EncoderLayer(8, name='test_layer')

In [16]:
input_tensor = tf.random.uniform((1,16,16), dtype=tf.float32, minval=0, maxval=256)

In [52]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, num_layers, d_model, num_heads, dff, input_size,
               maximum_positional_encoding, rate=0.1):
    super(Encoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers
    self.flatten = tf.keras.layers.Flatten()
    self.embedding = tf.keras.layers.Embedding(input_size, d_model)
    self.pos_encoding = positional_encoding(maximum_positional_encoding,
                                            self.d_model)
    self.enc_layers = [EncoderLayer(num_heads, d_model, dff, dropout=rate)
                      for _ in range(num_layers)]
    self.dropout = tf.keras.layers.Dropout(rate)
  
  def call(self, x, training, mask):
    seq_len = tf.shape(x)[1]
    #x = self.flatten(x)
    x = self.embedding(x)
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x += self.pos_encoding[:, :seq_len, :]

    x = self.dropout(x, training=training)

    for i in range(self.num_layers):
      x = self.enc_layers[i](x, training, mask)
    return x

In [54]:
sample_encoder = Encoder(num_layers=2, d_model=256, num_heads=8,
                         dff=2048, input_size=9*16*16*3,
                         maximum_positional_encoding=10000)
temp_input = tf.random.uniform((64,256), dtype=tf.int64, minval=0, maxval=200)
sample_encoder_output = sample_encoder(temp_input, training=False, mask=None)
print(sample_encoder_output.shape)

(64, 256, 256)


In [22]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
               maximum_position_encoding, rate=0.1):
    super(Decoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
    self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

    self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
                       for _ in range(num_layers)]
    self.dropout = tf.keras.layers.Dropout(rate)

  def call(self, x, enc_output, training,
           look_ahead_mask, padding_mask):

    seq_len = tf.shape(x)[1]
    attention_weights = {}

    x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x += self.pos_encoding[:, :seq_len, :]

    x = self.dropout(x, training=training)

    for i in range(self.num_layers):
      x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                             look_ahead_mask, padding_mask)

      attention_weights[f'decoder_layer{i+1}_block1'] = block1
      attention_weights[f'decoder_layer{i+1}_block2'] = block2

    # x.shape == (batch_size, target_seq_len, d_model)
    return x, attention_weights

In [27]:
sample_decoder = Decoder(num_layers=2, d_model=256, num_heads=8,
                         dff=2048, target_vocab_size=8000,
                         maximum_position_encoding=5000)
temp_input = tf.random.uniform((64, 26), dtype=tf.int64, minval=0, maxval=200)

output, attn = sample_decoder(temp_input,
                              enc_output=sample_encoder_output,
                              training=False,
                              look_ahead_mask=None,
                              padding_mask=None)

output.shape, attn['decoder_layer2_block2'].shape

(TensorShape([64, 26, 256]), TensorShape([64, 8, 26, 26]))

In [57]:
class Transformer(tf.keras.Model):
  def __init__(self, num_encoders, num_decoders, d_model, num_heads, dff, input_size,
               target_vocab_size, img_input, target, rate=0.1):
    super(Transformer, self).__init__()

    self.tokenizer = Encoder(num_encoders, d_model, num_heads, dff,
                             input_size, img_input, rate)
    self.decoder = Decoder(num_decoders, d_model, num_heads, dff,
                           target_vocab_size, target, rate)
    
    self.final_layer = tf.keras.layers.Dense(target_vocab_size)
  
  def call(self, inp, tar, training, enc_padding_mask,
           look_ahead_mask, dec_padding_mask):
    enc_output = self.tokenizer(inp, training, enc_padding_mask)

    dec_output, attention_weights = self.decoder(
        tar, enc_output, training, look_ahead_mask, dec_padding_mask
    )

    final_output = self.final_layer(dec_output)

    return final_output, attention_weights

In [49]:
sample_transformer = Transformer(
    num_encoders=1, num_decoders=1, d_model = 256, num_heads = 8, dff = 2048,
    input_vocab_size = 8500, target_vocab_size = 8000,
    img_input=10000, target=6000)

temp_input = tf.random.uniform((64,36), dtype=tf.int64, minval=0, maxval=200)
temp_target = tf.random.uniform((64, 36), dtype=tf.int64, minval=0, maxval=200)

fn_out, _ = sample_transformer(temp_input, temp_target, training=False,
                               enc_padding_mask=None,
                               dec_padding_mask=None,
                               look_ahead_mask=None)

fn_out.shape

TensorShape([64, 36, 8000])

In [50]:
def create_padding_mask(seq):
  seq = tf.cast(tf.math.equal(seq,0),tf.float32)
  return seq[:, tf.newaxis, tf.newaxis, :] # (batch_size, 1, 1, seq_len)

In [51]:
def create_look_ahead_mask(size):
  mask = 1 - tf.linalg.band_part(tf.ones((size,size)), -1, 0)
  return mask # (seq_len, seq_len)

In [58]:
transformer = Transformer(
    num_encoders=12, num_decoders=4, d_model=768, num_heads=12, dff=2048,
    input_size=9*16*16*3, target_vocab_size=8000, img_input=10000, target=5000
)

In [59]:
import tensorflow_datasets as tfds
mscoco_val = tfds.load('coco/2014', split=['validation'])

[1mDownloading and preparing dataset coco/2014/1.1.0 (download: 37.57 GiB, generated: Unknown size, total: 37.57 GiB) to /root/tensorflow_datasets/coco/2014/1.1.0...[0m


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Dl Completed...', max=1.0, style=Progre…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Dl Size...', max=1.0, style=ProgressSty…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Extraction completed...', max=1.0, styl…






ExtractError: ignored