In [1]:
import numpy as np
import math
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [2]:
target = np.random.random((256,)) 
sources = [np.random.random((256,)) for _ in range(10)] 

def softmax(x, axis=-1):
    if isinstance(x, list):
        x = np.array(x)
    exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
    return exp_x / np.sum(exp_x, axis=axis, keepdims=True)

def score(target, source):
    return np.dot(target, source)

In [3]:
scores = [score(target, source) for source in sources]
scores = softmax(scores)

sources_array = np.array(sources) 
scores = scores[:, np.newaxis]  
combined = np.sum(scores * sources_array, axis=0)
print("Combined shape:", combined.shape)

Combined shape: (256,)


In [4]:
target = np.random.random((2, 10, 256)) 
source = np.random.random((2, 15, 256))

In [5]:
def dot_product_attention(target, source):
    # Takes the dot-product between all target and source vectors,
    # where b = batch size, t = target length, s = source length, and d
    # = vector size
    scores = np.einsum("btd,bsd->bts", target, source)
    scores = softmax(scores, axis=-1)
    # Computes a weighted sum of all source vectors for each target
    # vector
    return np.einsum("bts,bsd->btd", scores, source)

dot_product_attention(target, source)

array([[[0.25031252, 0.18927063, 0.64347902, ..., 0.54960735,
         0.76482802, 0.72117876],
        [0.76833941, 0.68287171, 0.90521719, ..., 0.229577  ,
         0.77702188, 0.39594471],
        [0.32866232, 0.29630267, 0.69959404, ..., 0.48533089,
         0.68348005, 0.6437031 ],
        ...,
        [0.4636575 , 0.47372767, 0.68088346, ..., 0.45750899,
         0.63562687, 0.52901039],
        [0.26871919, 0.21942518, 0.59846786, ..., 0.52675242,
         0.69429453, 0.57669274],
        [0.52717543, 0.48146495, 0.73810563, ..., 0.42526169,
         0.64766292, 0.56734962]],

       [[0.74999776, 0.72400696, 0.48949992, ..., 0.31074765,
         0.72149453, 0.87038652],
        [0.64736422, 0.61682424, 0.68182485, ..., 0.29378775,
         0.7740831 , 0.8775753 ],
        [0.64398858, 0.6523471 , 0.6637557 , ..., 0.34248846,
         0.73355969, 0.8428566 ],
        ...,
        [0.7742948 , 0.70673866, 0.54593247, ..., 0.26383247,
         0.74783071, 0.88416031],
        [0.7

In [6]:
dim = 256
query_dense = layers.Dense(dim)
key_dense = layers.Dense(dim)
value_dense = layers.Dense(dim)
output_dense = layers.Dense(dim)

def parameterized_attention(query, key, value):
    query = query_dense(query)
    key = key_dense(key)
    value = value_dense(value)
    scores = np.einsum("btd,bsd->bts", query, key)
    scores = softmax(scores, axis=-1)
    outputs = np.einsum("bts,bsd->btd", scores, value)
    return output_dense(outputs)

parameterized_attention(query=target, key=source, value=source)

<tf.Tensor: shape=(2, 10, 256), dtype=float32, numpy=
array([[[-0.45054394,  0.9943538 ,  0.5627296 , ...,  1.6444948 ,
         -1.4899708 ,  0.1546945 ],
        [-0.46762288,  0.82964355,  0.35091782, ...,  1.6376554 ,
         -1.3809798 ,  0.06882407],
        [-0.44683272,  0.9992876 ,  0.4425981 , ...,  1.7814057 ,
         -1.5250347 ,  0.03174876],
        ...,
        [-0.298494  ,  1.1188456 ,  0.4243595 , ...,  1.7610108 ,
         -1.7229068 , -0.09009187],
        [-0.42716572,  0.9536388 ,  0.45932785, ...,  1.7388272 ,
         -1.472808  ,  0.04370444],
        [-0.40345672,  1.0062622 ,  0.41577157, ...,  1.7403771 ,
         -1.4994948 ,  0.01196873]],

       [[-0.2851899 ,  0.8744449 , -0.0992508 , ...,  1.6036093 ,
         -0.98204273,  0.01995445],
        [-0.18265052,  0.86011195, -0.01766041, ...,  1.3403437 ,
         -0.9777156 , -0.04703699],
        [-0.33688113,  0.9273277 , -0.08073823, ...,  1.6231174 ,
         -1.0379088 , -0.04480076],
        ...,


In [7]:
num_heads = 8
head_dim = 32

query_dense = [layers.Dense(head_dim) for i in range(num_heads)]
key_dense = [layers.Dense(head_dim) for i in range(num_heads)]
value_dense = [layers.Dense(head_dim) for i in range(num_heads)]
output_dense = layers.Dense(head_dim * num_heads)

def multi_head_attention(query, key, value):
    head_outputs = []
    for i in range(num_heads):
        query = query_dense[i](query)
        key = key_dense[i](key)
        value = value_dense[i](value)
        scores = np.einsum("btd,bsd->bts", target, source)
        scores = softmax(scores / math.sqrt(head_dim), axis=-1)
        head_output = np.einsum("bts,bsd->btd", scores, source)
        head_outputs.append(head_output)
    outputs = tf.concat(head_outputs, axis=-1)
    return output_dense(outputs)

multi_head_attention(query=target, key=source, value=source)

<tf.Tensor: shape=(2, 10, 256), dtype=float32, numpy=
array([[[-0.18592608,  0.26163408,  0.4710325 , ..., -0.39132458,
          0.5226468 , -0.80184114],
        [-0.12332103,  0.27099016,  0.46379867, ..., -0.3735121 ,
          0.5170238 , -0.85234153],
        [-0.14606842,  0.26755506,  0.47601634, ..., -0.38170898,
          0.51592964, -0.8209633 ],
        ...,
        [-0.18730547,  0.21488252,  0.4682066 , ..., -0.34559852,
          0.54162973, -0.8756696 ],
        [-0.15994196,  0.23009714,  0.50274456, ..., -0.3487205 ,
          0.5180153 , -0.8553152 ],
        [-0.15560871,  0.2849512 ,  0.45941752, ..., -0.3650747 ,
          0.49716014, -0.81371766]],

       [[-0.05722441,  0.2031178 ,  0.7895407 , ..., -0.15143518,
          0.5269933 , -0.8113352 ],
        [-0.11641002,  0.1742965 ,  0.8289877 , ..., -0.16162756,
          0.5644667 , -0.83451295],
        [-0.12021776,  0.18370959,  0.8322437 , ..., -0.14184934,
          0.5456646 , -0.8362459 ],
        ...,


In [8]:
multi_head_attention = keras.layers.MultiHeadAttention(
    num_heads=num_heads,
    key_dim=head_dim,
)
multi_head_attention(query=target, key=source, value=source)

<tf.Tensor: shape=(2, 10, 256), dtype=float32, numpy=
array([[[-0.01955285, -0.06889678, -0.09262316, ...,  0.01603702,
          0.02627093,  0.10837913],
        [-0.01955827, -0.06883085, -0.09258158, ...,  0.01605407,
          0.0262757 ,  0.10841117],
        [-0.0195565 , -0.06890609, -0.09252397, ...,  0.01604028,
          0.02618441,  0.10839273],
        ...,
        [-0.01962097, -0.06879208, -0.0925836 , ...,  0.01603478,
          0.02625707,  0.10837299],
        [-0.01960058, -0.06887408, -0.09259455, ...,  0.01609693,
          0.0262844 ,  0.10842787],
        [-0.01957582, -0.06884725, -0.09255467, ...,  0.01600979,
          0.02627961,  0.10836946]],

       [[ 0.00014283, -0.07145255, -0.06483021, ...,  0.00980183,
          0.02426516,  0.09464476],
        [ 0.00023542, -0.07143644, -0.06478545, ...,  0.00982607,
          0.02421108,  0.09458698],
        [ 0.00013129, -0.07145785, -0.06478333, ...,  0.00984884,
          0.02421728,  0.09456387],
        ...,


In [9]:
multi_head_attention(key=source, value=source, query=source)

<tf.Tensor: shape=(2, 15, 256), dtype=float32, numpy=
array([[[-0.0194429 , -0.06883302, -0.09258254, ...,  0.01603119,
          0.0262194 ,  0.10839305],
        [-0.01968635, -0.06889655, -0.09256397, ...,  0.01600354,
          0.02627062,  0.10844125],
        [-0.01961418, -0.06888583, -0.09260273, ...,  0.01605913,
          0.02629185,  0.10839355],
        ...,
        [-0.0196203 , -0.06884837, -0.0925591 , ...,  0.01605909,
          0.02624412,  0.10839172],
        [-0.01963696, -0.06886108, -0.09254189, ...,  0.01605135,
          0.02622814,  0.10843844],
        [-0.01958858, -0.06890504, -0.09257051, ...,  0.01601265,
          0.02619541,  0.10845219]],

       [[ 0.00018485, -0.07141755, -0.06482642, ...,  0.00985956,
          0.02425843,  0.09466682],
        [ 0.00017474, -0.07142326, -0.06480012, ...,  0.00971453,
          0.02423343,  0.09466703],
        [ 0.00026685, -0.07145889, -0.06487991, ...,  0.00986562,
          0.02421542,  0.09463257],
        ...,


In [10]:
class TransformerEncoder(layers.Layer):
    def __init__(self, hidden_dim, intermediate_dim, num_heads):
        super().__init__()
        key_dim = hidden_dim // num_heads
        # Self-attention layers
        self.self_attention = layers.MultiHeadAttention(num_heads, key_dim)
        self.self_attention_layernorm = layers.LayerNormalization()
        # Feedforward layers
        self.feed_forward_1 = layers.Dense(intermediate_dim, activation="relu")
        self.feed_forward_2 = layers.Dense(hidden_dim)
        self.feed_forward_layernorm = layers.LayerNormalization()

    def call(self, source, source_mask):
        # Self-attention computation
        residual = x = source
        mask = source_mask[:, None, :]
        x = self.self_attention(query=x, key=x, value=x, attention_mask=mask)
        x = x + residual
        x = self.self_attention_layernorm(x)
        # Feedforward computation
        residual = x
        x = self.feed_forward_1(x)
        x = self.feed_forward_2(x)
        x = x + residual
        x = self.feed_forward_layernorm(x)
        return x

In [11]:
def layer_normalization(batch_of_sequences):
    # To compute mean and variance, we only pool data over the last
    # axis.
    mean = np.mean(batch_of_sequences, keepdims=True, axis=-1)
    variance = np.var(batch_of_sequences, keepdims=True, axis=-1)
    return (batch_of_sequences - mean) / variance

In [12]:
def batch_normalization(batch_of_images):
    # Pools data over the batch axis (axis 0), which creates
    # interactions between samples in a batch
    mean = np.mean(batch_of_images, keepdims=True, axis=(0, 1, 2))
    variance = np.var(batch_of_images, keepdims=True, axis=(0, 1, 2))
    return (batch_of_images - mean) / variance


In [13]:
#transformer decoder
class TransformerDecoder(layers.Layer):
    def __init__(self, hidden_dim, intermediate_dim, num_heads):
        super().__init__()
        key_dim = hidden_dim // num_heads
        # Self-attention layers
        self.self_attention = layers.MultiHeadAttention(num_heads, key_dim)
        self.self_attention_layernorm = layers.LayerNormalization()
        # Cross-attention layers
        self.cross_attention = layers.MultiHeadAttention(num_heads, key_dim)
        self.cross_attention_layernorm = layers.LayerNormalization()
        # Feedforward layers
        self.feed_forward_1 = layers.Dense(intermediate_dim, activation="relu")
        self.feed_forward_2 = layers.Dense(hidden_dim)
        self.feed_forward_layernorm = layers.LayerNormalization()

    def call(self, target, source, source_mask):
        # Self-attention computation
        residual = x = target
        x = self.self_attention(query=x, key=x, value=x, use_causal_mask=True)
        x = x + residual
        x = self.self_attention_layernorm(x)
        # Cross-attention computation
        residual = x
        mask = source_mask[:, None, :]
        x = self.cross_attention(
            query=x, key=source, value=source, attention_mask=mask
        )
        x = x + residual
        x = self.cross_attention_layernorm(x)
        # Feedforward computation
        residual = x
        x = self.feed_forward_1(x)
        x = self.feed_forward_2(x)
        x = x + residual
        x = self.feed_forward_layernorm(x)
        return x

In [14]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim):
        super().__init__()
        self.token_embeddings = layers.Embedding(input_dim, output_dim)
        self.position_embeddings = layers.Embedding(sequence_length, output_dim)

    def call(self, inputs):
        # Computes incrementing positions [0, 1, 2...] for each
        # sequence in the batch
        positions = tf.cumsum(tf.ones_like(inputs), axis=-1) - 1
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

In [15]:
hidden_dim = 256
intermediate_dim = 2048
num_heads = 8
vocab_size = 15000
 
source = keras.Input(shape=(None,), dtype="int32", name="english")
x = layers.Embedding(vocab_size, hidden_dim)(source)
encoder_output = TransformerEncoder(hidden_dim, intermediate_dim, num_heads)(
    source=x,
    source_mask=source != 0,
)
 
target = keras.Input(shape=(None,), dtype="int32", name="spanish")
x = layers.Embedding(vocab_size, hidden_dim)(target)
x = TransformerDecoder(hidden_dim, intermediate_dim, num_heads)(
    target=x,
    source=encoder_output,
    source_mask=source != 0,
)
x = layers.Dropout(0.5)(x)
target_predictions = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([source, target], target_predictions)




In [16]:
transformer.summary()

In [19]:
import string
import re
sequence_length = 20
train_pairs = [
    ("Who is in this room?", "[start] ¿Quién está en esta habitación? [end]"),
    ("Hello, how are you?", "[start] Hola, ¿cómo estás? [end]"),
    ("What is your name?", "[start] ¿Cuál es tu nombre? [end]"),
    ("Nice to meet you.", "[start] Mucho gusto. [end]"),
    ("Where is the bathroom?", "[start] ¿Dónde está el baño? [end]"),
    ("I am hungry.", "[start] Tengo hambre. [end]"),
    ("How much does it cost?", "[start] ¿Cuánto cuesta? [end]"),
    ("I don't understand.", "[start] No entiendo. [end]"),
    ("Can you help me?", "[start] ¿Puedes ayudarme? [end]"),
    ("Thank you very much.", "[start] Muchas gracias. [end]"),
    ("You're welcome.", "[start] De nada. [end]"),
    ("I love this place.", "[start] Me encanta este lugar. [end]"),
    ("What time is it?", "[start] ¿Qué hora es? [end]"),
    ("Where do you live?", "[start] ¿Dónde vives? [end]"),
    ("I am from the United States.", "[start] Soy de Estados Unidos. [end]"),
    ("Do you speak English?", "[start] ¿Hablas inglés? [end]"),
    ("I need a doctor.", "[start] Necesito un médico. [end]"),
    ("How old are you?", "[start] ¿Cuántos años tienes? [end]"),
    ("This is delicious.", "[start] Esto está delicioso. [end]"),
    ("See you tomorrow.", "[start] Hasta mañana. [end]"),
]
val_pairs = [
    ("Goodbye.", "[start] Adiós. [end]"),
    ("Good morning.", "[start] Buenos días. [end]"),
    ("Have a nice day.", "[start] Que tengas un buen día. [end]"),
    ("I am learning Spanish.", "[start] Estoy aprendiendo español. [end]"),
]
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", ""
    )

english_tokenizer = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
spanish_tokenizer = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)

# Adapt tokenizers to the training data
eng_texts, spa_texts = zip(*train_pairs)
english_tokenizer.adapt(eng_texts)
spanish_tokenizer.adapt(spa_texts)
def format_dataset(eng, spa):
    eng = english_tokenizer(eng)
    spa = spanish_tokenizer(spa)
    features = {"english": eng, "spanish": spa[:, :-1]}
    labels = spa[:, 1:]
    sample_weights = labels != 0
    return features, labels, sample_weights

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).cache()

batch_size = 64
train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)