In [1]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Dense, Embedding, LayerNormalization, Dropout, Input
from tensorflow.keras.models import Model

In [2]:
# Przykładowe parametry modelu
d_model = 128  # Wymiar modelu
num_heads = 4  # Liczba głów uwagi
dff = 512      # Rozmiar warstw feed-forward
input_vocab_size = 5000  # Rozmiar słownika wejściowego
target_vocab_size = 5000  # Rozmiar słownika wyjściowego
max_seq_len = 40          # Maksymalna długość sekwencji

In [3]:
# Tworzenie maski pozycyjnej dla uwagi
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

In [5]:
# Definicja warstwy wielogłowego mechanizmu uwagi (Multi-Head Attention)
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % num_heads == 0

        self.depth = d_model // num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        """Rozdziela ostatni wymiar na (num_heads, depth)."""
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        scaled_attention, _ = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        output = self.dense(concat_attention)

        return output

In [6]:
# Funkcja do obliczania uwagi opartej na skali
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)
    return output, attention_weights

In [7]:
# Definicja bloku enkodera
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([
            Dense(dff, activation='relu'),
            Dense(d_model)
        ])

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, x, training, mask):
        attn_output = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

In [19]:
# Implementacja bloku enkodera
class SimpleEncoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, max_seq_len, rate=0.1):
        super(SimpleEncoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(max_seq_len, d_model)
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    # def call(self, x, training, mask):
    #     seq_len = tf.shape(x)[1]
    #     x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
    #     x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    #     x += self.pos_encoding[:, :seq_len, :]

    #     x = self.dropout(x, training=training)

    #     for i in range(self.num_layers):
    #         x = self.enc_layers[i](x, training, mask)

    #     return x
    def call(self, x, training):
        mask = self.create_padding_mask(x)  # Tworzenie maski paddingu

        seq_len = tf.shape(x)[1]
        x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x

In [20]:
# Funkcja do tworzenia kodowania pozycyjnego
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

In [21]:
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

In [22]:
# Tworzenie modelu
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_seq_len, rate=0.1):
        super(Transformer, self).__init__()
        self.encoder = SimpleEncoder(num_layers, d_model, num_heads, dff, input_vocab_size, max_seq_len, rate)
        self.final_layer = Dense(target_vocab_size)

    def call(self, inp, training, enc_padding_mask):
        enc_output = self.encoder(inp, training, enc_padding_mask)
        final_output = self.final_layer(enc_output)  # (batch_size, inp_seq_len, target_vocab_size)
        return final_output


In [23]:
# Inicjalizacja modelu
num_layers = 2  # Liczba warstw enkodera
transformer = Transformer(num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_seq_len)


In [24]:
# Przykładowe dane
input_seq = np.random.randint(0, input_vocab_size, (64, max_seq_len))
enc_padding_mask = create_padding_mask(input_seq)


In [16]:
enc_padding_mask

<tf.Tensor: shape=(64, 1, 1, 40), dtype=float32, numpy=
array([[[[0., 0., 0., ..., 0., 0., 0.]]],


       [[[0., 0., 0., ..., 0., 0., 0.]]],


       [[[0., 0., 0., ..., 0., 0., 0.]]],


       ...,


       [[[0., 0., 0., ..., 0., 0., 0.]]],


       [[[0., 0., 0., ..., 0., 0., 0.]]],


       [[[0., 0., 0., ..., 0., 0., 0.]]]], dtype=float32)>

In [25]:
# Kompilacja i trenowanie modelu
transformer.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
history = transformer.fit(input_seq, input_seq, epochs=5, batch_size=64)

Epoch 1/5


TypeError: missing a required argument: 'enc_padding_mask'