In [2]:
from keras.preprocessing.text import Tokenizer
from random import randint
from keras.optimizers import Adam
import numpy as np
import keras
import tensorflow as tf

2023-09-11 08:49:15.066604: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
import pickle

def unpickle(file_path):
    with open(file_path, 'rb') as file:
        return pickle.load(file)

name = 'len30_books5_'
file_path = name + 'sequences.pkl'
sequences = unpickle(file_path)

file_path = name + 'targets.pkl'
targets = unpickle(file_path)

name = 'len30_books5_'
file_path = name + 'tokenizer.pkl'
tokenizer = unpickle(file_path)

sequences.shape, targets.shape, len(tokenizer.word_index)

FileNotFoundError: [Errno 2] No such file or directory: 'len30_books5_sequences.pkl'

In [None]:
def positional_encoding(length, depth):
    depth = depth/2

    positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
    depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)
    
    angle_rates = 1 / (10000**depths)         # (1, depth)
    angle_rads = positions * angle_rates      # (pos, depth)

    pos_encoding = np.concatenate(
        [np.sin(angle_rads), np.cos(angle_rads)],
        axis=-1) 

    return tf.cast(pos_encoding, dtype=tf.float32)


@keras.saving.register_keras_serializable(package="PositionalEmbedding")
class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, emb_dim):
        super().__init__()
        self.emb_dim = emb_dim
        self.embedding = tf.keras.layers.Embedding(vocab_size, emb_dim, mask_zero=True) 
        self.pos_encoding = positional_encoding(length=2048, depth=emb_dim)


    def compute_mask(self, *args, **kwargs):
        return self.embedding.compute_mask(*args, **kwargs)


    def call(self, x):
        length = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.emb_dim, tf.float32))
        x = x + self.pos_encoding[tf.newaxis, :length, :]
        return x


@keras.saving.register_keras_serializable(package="BaseAttention")    
class BaseAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
        self.layernorm = tf.keras.layers.LayerNormalization()
        self.add = tf.keras.layers.Add()


@keras.saving.register_keras_serializable(package="CrossAttention")
class CrossAttention(BaseAttention):
    def call(self, x):
        attn_output, attn_scores = self.mha(
            query=x,
            key=x,
            value=x,
            return_attention_scores=True)
    
        self.last_attn_scores = attn_scores
        x = self.add([x, attn_output])
        x = self.layernorm(x)
        return x
    

@keras.saving.register_keras_serializable(package="CausalSelfAttention")
class CausalSelfAttention(BaseAttention):
    def call(self, x):
        attn_output = self.mha(
            query=x,
            value=x,
            key=x,
            use_causal_mask = True)
        x = self.add([x, attn_output])
        x = self.layernorm(x)
        return x
    

@keras.saving.register_keras_serializable(package="FeedForward")
class FeedForward(tf.keras.layers.Layer):
    def __init__(self, emb_dim, dff, dropout_rate=0.1):
        super().__init__()
        self.seq = tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),
        tf.keras.layers.Dense(emb_dim),
        tf.keras.layers.Dropout(dropout_rate)
        ])
        self.add = tf.keras.layers.Add()
        self.layer_norm = tf.keras.layers.LayerNormalization()

    def call(self, x):
        x = self.add([x, self.seq(x)])
        x = self.layer_norm(x) 
        return x

In [None]:
@keras.saving.register_keras_serializable(package="DecoderLayer")
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self,
                *,
                emb_dim,
                num_heads,
                dff,
                dropout_rate=0.1):
        super(DecoderLayer, self).__init__()

        self.causal_self_attention = CausalSelfAttention(
            num_heads=num_heads,
            key_dim=emb_dim,
            dropout=dropout_rate)
        
        self.cross_attention = CrossAttention(
            num_heads=num_heads,
            key_dim=emb_dim,
            dropout=dropout_rate)

        self.ffn = FeedForward(emb_dim, dff)

    def call(self, x):
        x = self.causal_self_attention(x=x)
        x = self.cross_attention(x=x)
        self.last_attn_scores = self.cross_attention.last_attn_scores
        x = self.ffn(x)
        return x

@keras.saving.register_keras_serializable(package="Decoder")
class Decoder(tf.keras.layers.Layer):
    def __init__(self, *, num_layers, emb_dim, num_heads, dff, vocab_size,
                dropout_rate=0.1):
        super(Decoder, self).__init__()

        self.emb_dim = emb_dim
        self.num_layers = num_layers

        self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size,
                                                emb_dim=emb_dim)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        self.dec_layers = [
            DecoderLayer(emb_dim=emb_dim, num_heads=num_heads,
                        dff=dff, dropout_rate=dropout_rate)
            for _ in range(num_layers)]

        self.last_attn_scores = None

    def call(self, x):
        x = self.pos_embedding(x)
        x = self.dropout(x)
        for i in range(self.num_layers):
            x  = self.dec_layers[i](x)

        self.last_attn_scores = self.dec_layers[-1].last_attn_scores
        return x

In [None]:
@keras.saving.register_keras_serializable(package="Transformer")
class Transformer(tf.keras.Model):
    def __init__(self, *, num_layers, emb_dim, num_heads, dff,
                input_vocab_size, target_vocab_size, dropout_rate=0.1):
        super().__init__()
        self.decoder = Decoder(num_layers=num_layers, emb_dim=emb_dim,
                            num_heads=num_heads, dff=dff,
                            vocab_size=target_vocab_size,
                            dropout_rate=dropout_rate)

        self.dense = tf.keras.layers.Dense(dff)
        self.final_layer = tf.keras.layers.Dense(target_vocab_size, activation = 'softmax')

    def call(self, inputs):
        x  = inputs
        x = self.decoder(x)
        x = self.dense(x)
        logits = self.final_layer(x)  
        return logits

In [None]:
a = "dsd fse aa,"
a.split()

['dsd', 'fse', 'aa,']

In [None]:
generatet_texts = {}

class PredictionCallback(tf.keras.callbacks.Callback):    
    def _preprocess_input_text(self, text):
        text = text.split()
        sequence = text
        vector_sequences = []
        vector_sequence = []
        for word in sequence:
            vector_sequence.append(word)

        vector_sequences.append(vector_sequence)
        vector_sequences = tokenizer.texts_to_sequences(vector_sequences)
        


        return tf.convert_to_tensor(vector_sequences)
    
    def on_epoch_end(self, epoch, logs={}):
        num_words_to_generate = 200
        generated_text = 'za górami za lasami żył sobie piękna dziewczynka, '
        start = [0]
        for i in range(num_words_to_generate):      
            input_vectors_padded = self._preprocess_input_text(generated_text)
            predictions = self.model.predict(input_vectors_padded, verbose = 0)[0]
            predictions = predictions[-1:, :]
            predicted_id = np.argmax(predictions)
            predicted_word = tokenizer.index_word[predicted_id]
            generated_text += " " + predicted_word

        print()
        print('Epoch:',  (epoch + 1))
        print('Prediction:', generated_text)
        generatet_texts[str(epoch)] = generated_text

In [None]:
num_layers = 2
emb_dim = 300
dff = 512
num_heads = 6
dropout_rate = 0.2

transformer = Transformer(
    num_layers=num_layers,
    emb_dim=emb_dim,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=len(tokenizer.word_index) + 1,
    target_vocab_size=len(tokenizer.word_index) + 1,
    dropout_rate=dropout_rate)

In [None]:
transformer.compile(optimizer=Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy')
transformer.fit(sequences, targets,  batch_size=64, epochs=40, callbacks=[PredictionCallback()])

Epoch 1/40
Epoch: 1
Prediction: za górami za lasami żył sobie piękna dziewczynka,  i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i i
Epoch 2/40