In [1]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import ops
from tensorflow.keras.layers import TextVectorization
import numpy as np
import os
import string
import random
import tensorflow
import tensorflow.data as tf_data
import tensorflow.strings as tf_strings

In [2]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Mask the upper half of the dot product matrix in self attention.
    This prevents flow of information from future tokens to current token.
    1's in the lower triangle, counting from the lower right corner.
    """
    i = ops.arange(n_dest)[:, None]
    j = ops.arange(n_src)
    m = i >= j - n_src + n_dest
    mask = ops.cast(m, dtype)
    mask = ops.reshape(mask, [1, n_dest, n_src])
    mult = ops.concatenate(
        [ops.expand_dims(batch_size, -1), ops.convert_to_tensor([1, 1])], 0
    )
    return ops.tile(mask, mult)


class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = keras.Sequential(
            [
                layers.Dense(ff_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs):
        input_shape = ops.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, "bool")
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
        attention_output = self.dropout1(attention_output)
        out1 = self.layernorm1(inputs + attention_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

In [3]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = ops.shape(x)[-1]
        positions = ops.arange(0, maxlen, 1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [11]:
vocab_size = 20000  # Only consider the top 20k words
maxlen = 80  # Max sequence size
embed_dim = 256  # Embedding size for each token
num_heads = 4  # Number of attention heads
feed_forward_dim = 256  # Hidden layer size in feed forward network inside transformer


def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype="int32")
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
    x = transformer_block(x)
    outputs = layers.Dense(vocab_size)(x)
    model = keras.Model(inputs=inputs, outputs=[outputs, x])
    loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(
        "adam",
        loss=[loss_fn, None],
    )  # No loss and optimization based on word embeddings from transformer block
    return model

In [12]:
batch_size = 128

# The dataset contains each review in a separate text file
# The text files are present in four different folders
# Create a list all files
filenames = [
    "datasets/data_1.txt",
    "datasets/data_2.txt",
    "datasets/data_3.txt",
    "datasets/data_4.txt",
]
# directories = [
#     "aclImdb/train/pos",
#     "aclImdb/train/neg",
#     "aclImdb/test/pos",
#     "aclImdb/test/neg",
# ]
# for dir in directories:
#     for f in os.listdir(dir):
#         filenames.append(os.path.join(dir, f))

print(f"{len(filenames)} files")

# Create a dataset from text files
random.shuffle(filenames)
text_ds = tf_data.TextLineDataset(filenames)
text_ds = text_ds.shuffle(buffer_size=256)
text_ds = text_ds.batch(batch_size)
# print(dir(text_ds))

4 files


In [13]:
def custom_standardization(input_string):
    """Remove html line-break tags and handle punctuation"""
    lowercased = tf_strings.lower(input_string)
    stripped_html = tf_strings.regex_replace(lowercased, "<br />", " ")
    return tf_strings.regex_replace(stripped_html, f"([{string.punctuation}])", r" \1")


# Create a vectorization layer and adapt it to the text
vectorize_layer = TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size - 1,
    output_mode="int",
    output_sequence_length=maxlen + 1,
)
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()  # To get words back from token indices


def prepare_lm_inputs_labels(text):
    """
    Shift word sequences by 1 position so that the target for position (i) is
    word at position (i+1). The model will use all words up till position (i)
    to predict the next word.
    """
    text = tensorflow.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    print(tokenized_sentences)
    print(y)
    return x, y


text_ds = text_ds.map(prepare_lm_inputs_labels, num_parallel_calls=tf_data.AUTOTUNE)
text_ds = text_ds.prefetch(tf_data.AUTOTUNE)

Tensor("text_vectorization_1_1/Pad:0", shape=(None, None), dtype=int64)
Tensor("strided_slice_1:0", shape=(None, None), dtype=int64)


In [18]:
class TextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(
        self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
    ):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = ops.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(ops.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x, verbose=0)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")


# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "qui est justin bieber"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 40
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

In [15]:
fit_time = 20
max_ecpoch = 5
model = create_model()
md_name = "br_model.keras"

for ft in range(fit_time):
    model.fit(text_ds, verbose=2, epochs=max_ecpoch, callbacks=[text_gen_callback])
    model.save(md_name)
    print(f"étape {ft+1 } fini sur {fit_time}")

In [19]:
model.save(md_name)

In [21]:
# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "qui es-tu ?"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 200
print(num_tokens_generated)
print(start_tokens)

# print(word_to_index)
# print(vocab)
# text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

200
[1, 1, 20]


In [22]:
max_tokens = num_tokens_generated
# index_to_word = word_to_index
print_every = 1
k = 10

def find_value_with_key(key_v):
    keys = []
    for key, value in word_to_index.items():
        if value == key_v:
            keys.append(key)
    return keys

def sample_from(logits):
    logits, indices = ops.top_k(logits, k=k, sorted=True)
    indices = np.asarray(indices).astype("int32")
    preds = keras.activations.softmax(ops.expand_dims(logits, 0))[0]
    preds = np.asarray(preds).astype("float32")
    # print(np.random.choice(indices, p=preds))
    return np.random.choice(indices, p=preds)

def detokenize(number):
    # return index_to_word[number]
    # print(number)
    # print()
    word = find_value_with_key(number)[0]
    return word


In [24]:
start_tokens = [_ for _ in start_tokens]
num_tokens_generated = 0
tokens_generated = []
while num_tokens_generated <= max_tokens:
    pad_len = maxlen - len(start_tokens)
    sample_index = len(start_tokens) - 1
    if pad_len < 0:
        x = start_tokens[:maxlen]
        sample_index = maxlen - 1
    elif pad_len > 0:
        x = start_tokens + [0] * pad_len
    else:
        x = start_tokens
    x = np.array([x])
    y, _ = model.predict(x, verbose=0)
    sample_token = sample_from(y[0][sample_index])
    tokens_generated.append(sample_token)
    start_tokens.append(sample_token)
    num_tokens_generated = len(tokens_generated)
txt = " ".join(
    [detokenize(_) for _ in start_tokens + tokens_generated]
)
    
print(f"generated text:\n{txt}\n")
# 

In [26]:
custom_objects = {"causal_attention_mask": causal_attention_mask, "TransformerBlock": TransformerBlock, "TokenAndPositionEmbedding":TokenAndPositionEmbedding}

In [27]:
# from tensorflow.keras.utils import custom_object_scope
# from tensorflow.keras.utils import get_custom_objects
from tensorflow.keras import models
from tensorflow import keras

br_model = models.load_model("E:\\Alkaou\Python Projects\\models\\br_model.keras", custom_objects=custom_objects)

br_model.summary()

TypeError: <class 'keras.src.models.functional.Functional'> could not be deserialized properly. Please ensure that components that are Python object instances (layers, models, etc.) returned by `get_config()` are explicitly deserialized in the model's `from_config()` method.

config={'module': 'keras.src.models.functional', 'class_name': 'Functional', 'config': {'name': 'functional_2', 'trainable': True, 'layers': [{'module': 'keras.layers', 'class_name': 'InputLayer', 'config': {'batch_shape': [None, 80], 'dtype': 'int32', 'sparse': False, 'name': 'input_layer'}, 'registered_name': None, 'name': 'input_layer', 'inbound_nodes': []}, {'module': None, 'class_name': 'TokenAndPositionEmbedding', 'config': {'maxlen': 80, 'vocab_size': 20000, 'embed_dim': 256, 'trainable': True, 'dtype': 'float32'}, 'registered_name': 'TokenAndPositionEmbedding', 'build_config': {'input_shape': [None, 80]}, 'name': 'token_and_position_embedding', 'inbound_nodes': [{'args': [{'class_name': '__keras_tensor__', 'config': {'shape': [None, 80], 'dtype': 'int32', 'keras_history': ['input_layer', 0, 0]}}], 'kwargs': {}}]}, {'module': None, 'class_name': 'TransformerBlock', 'config': {'embed_dim': 256, 'num_heads': 4, 'ff_dim': 256, 'trainable': True, 'dtype': 'float32'}, 'registered_name': 'TransformerBlock', 'build_config': {'input_shape': [None, 80, 256]}, 'name': 'transformer_block', 'inbound_nodes': [{'args': [{'class_name': '__keras_tensor__', 'config': {'shape': [None, 80, 256], 'dtype': 'float32', 'keras_history': ['token_and_position_embedding', 0, 0]}}], 'kwargs': {}}]}, {'module': 'keras.layers', 'class_name': 'Dense', 'config': {'name': 'dense_2', 'trainable': True, 'dtype': 'float32', 'units': 20000, 'activation': 'linear', 'use_bias': True, 'kernel_initializer': {'module': 'keras.initializers', 'class_name': 'GlorotUniform', 'config': {'seed': None}, 'registered_name': None}, 'bias_initializer': {'module': 'keras.initializers', 'class_name': 'Zeros', 'config': {}, 'registered_name': None}, 'kernel_regularizer': None, 'bias_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None}, 'registered_name': None, 'build_config': {'input_shape': [None, 80, 256]}, 'name': 'dense_2', 'inbound_nodes': [{'args': [{'class_name': '__keras_tensor__', 'config': {'shape': [None, 80, 256], 'dtype': 'float32', 'keras_history': ['transformer_block', 0, 0]}}], 'kwargs': {}}]}], 'input_layers': [['input_layer', 0, 0]], 'output_layers': [['dense_2', 0, 0], ['transformer_block', 0, 0]]}, 'registered_name': 'Functional', 'build_config': {'input_shape': None}, 'compile_config': {'optimizer': 'Adam', 'loss': [{'module': 'keras.losses', 'class_name': 'SparseCategoricalCrossentropy', 'config': {'name': 'sparse_categorical_crossentropy', 'reduction': 'sum_over_batch_size', 'from_logits': True, 'ignore_class': None}, 'registered_name': None}, None], 'loss_weights': None, 'metrics': None, 'weighted_metrics': None, 'run_eagerly': False, 'steps_per_execution': 1, 'jit_compile': False}}.

Exception encountered: <class '__main__.TokenAndPositionEmbedding'> could not be deserialized properly. Please ensure that components that are Python object instances (layers, models, etc.) returned by `get_config()` are explicitly deserialized in the model's `from_config()` method.

config={'module': None, 'class_name': 'TokenAndPositionEmbedding', 'config': {'maxlen': 80, 'vocab_size': 20000, 'embed_dim': 256, 'trainable': True, 'dtype': 'float32'}, 'registered_name': 'TokenAndPositionEmbedding', 'build_config': {'input_shape': [None, 80]}, 'name': 'token_and_position_embedding', 'inbound_nodes': [{'args': [{'class_name': '__keras_tensor__', 'config': {'shape': [None, 80], 'dtype': 'int32', 'keras_history': ['input_layer', 0, 0]}}], 'kwargs': {}}]}.

Exception encountered: Error when deserializing class 'TokenAndPositionEmbedding' using config={'maxlen': 80, 'vocab_size': 20000, 'embed_dim': 256, 'trainable': True, 'dtype': 'float32'}.

Exception encountered: TokenAndPositionEmbedding.__init__() got an unexpected keyword argument 'trainable'