In [40]:
# Importing Libraries
import tensorflow as tf, keras
from tensorflow.keras import layers, models, optimizers
import numpy as np
from tensorflow.keras.saving import register_keras_serializable

In [82]:
data_array = [
    "datasets_bambara/poemes/poeme_br_1.txt",
    "datasets_bambara/poemes/poeme_br_2.txt",
    "datasets_bambara/poemes/poeme_br_3.txt",
    "datasets_bambara/poemes/poeme_br_4.txt",
]

In [118]:
# hyperparameters
vocab_size = 20000
batch_size = 16  # how many independent sequences will we process in parallel?
block_size = 32  # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 100

Epochs = max_iters // eval_interval

learning_rate = 1e-9
eval_iters = 200
n_embd = 1064
n_head = 10
n_layer = 6
dropout = 0.0
# ------------

In [119]:
text = ""
# Read the text file
for dtext in data_array:
    with open(dtext, "r", encoding="utf-8") as f:
        text += f"{f.read()}"

In [120]:
len(text)

76129

In [121]:
import os
from tokenizers import Tokenizer, pre_tokenizers, decoders, trainers, processors
from tokenizers import models as t_model
# from data_list import DATA_LIST

tokenizer_path = "tokenizer.json"
tokenizer = Tokenizer(t_model.BPE())
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=True)
tokenizer.decoder = decoders.ByteLevel()
tokenizer.post_processor = processors.ByteLevel(trim_offsets=True)
trainer = trainers.BpeTrainer(
    vocab_size=20000,
    min_frequency=2,
    initial_alphabet=pre_tokenizers.ByteLevel.alphabet()
)


class SuperTokenizer():

    # {files_path_array} doit être un tableau. EX: ["1.txt", "2.txt", "3.txt"]
    @staticmethod
    def fit(files_path_array):
        # print(files_path_array)
        tokenizer.train(files_path_array, trainer=trainer)
        tokenizer.save(tokenizer_path, pretty=True)
        return True
    
    @staticmethod
    def loader_tokenizer_from_json():
        if os.path.exists(tokenizer_path):
            loader_tokenizer = Tokenizer.from_file(tokenizer_path)
        else:
            SuperTokenizer.fit(filenames)
            loader_tokenizer = Tokenizer.from_file(tokenizer_path)
        return loader_tokenizer
    
    def encode(text):
        loader_tokenizer = SuperTokenizer.loader_tokenizer_from_json()
        encoded = loader_tokenizer.encode(text)
        return encoded.ids
    
    def decode(tokens):
        loader_tokenizer = SuperTokenizer.loader_tokenizer_from_json()
        decoded = loader_tokenizer.decode(tokens)
        if decoded[0] == 220 and decoded[-1] == 220:
            decoded = decoded[1:-1]
        return decoded

In [122]:
# Unique characters
# chars = sorted(list(set(text)))
# vocab_size = len(chars)
# # chars

In [123]:
# create a mapping from characters to integers
# stoi = {ch: i for i, ch in enumerate(chars)}
# itos = {i: ch for i, ch in enumerate(chars)}
# encode = lambda s: [
#     stoi[c] for c in s
# ]  # encoder: take a string, output a list of integers
# decode = lambda l: "".join(
#     [itos[i] for i in l]
# )  # decoder: take a list of integers, output a string


In [124]:
# Train and test splits
data = np.array(SuperTokenizer.encode(text), dtype=np.int64)
n = int(0.9 * len(data))  # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]
data

array([314, 485, 365, ..., 605,  19,  15], dtype=int64)

In [125]:
data

array([314, 485, 365, ..., 605,  19,  15], dtype=int64)

In [126]:
train_data

array([ 314,  485,  365, ...,   42,  372, 1375], dtype=int64)

In [127]:
val_data[:5]

array([331,   0, 974, 119, 198], dtype=int64)

In [128]:
# Data loading
def get_batch(split):
    # Generate a small batch of data of inputs x and targets y
    data = train_data if split == "train" else val_data
    ix = np.random.randint(0, len(data) - block_size, batch_size)
    x = np.stack([data[i : i + block_size] for i in ix])
    y = np.stack([data[i + 1 : i + block_size + 1] for i in ix])
    return x, y


In [129]:
# Prepare train/val dataset
def train_data_generator():
    while True:
        yield get_batch("train")


def val_data_generator():
    while True:
        yield get_batch("val")

In [130]:
train_data_generator = tf.data.Dataset.from_generator(
    train_data_generator,
    output_signature=(
        tf.TensorSpec(shape=(batch_size, block_size), dtype=tf.int64),
        tf.TensorSpec(shape=(batch_size, block_size), dtype=tf.int64),
    ),
)

val_data_generator = tf.data.Dataset.from_generator(
    val_data_generator,
    output_signature=(
        tf.TensorSpec(shape=(batch_size, block_size), dtype=tf.int64),
        tf.TensorSpec(shape=(batch_size, block_size), dtype=tf.int64),
    ),
)

In [131]:
class FeedForward(layers.Layer):
    """A simple linear layer followed by a non-linearity"""

    def __init__(self, n_embd, dropout=0.0, **kwargs):
        super().__init__(**kwargs)
        self.n_embd = n_embd
        self.dropout = dropout
        self.net = models.Sequential(
            [
                layers.Dense(4 * n_embd, activation="relu"),
                layers.Dense(n_embd),
                layers.Dropout(dropout),
            ]
        )

    def call(self, x):
        return self.net(x)

    def get_config(self):
        config = super().get_config()
        config.update({
            "n_embd": self.n_embd,
            "dropout": self.dropout,
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)


In [132]:
class Block(layers.Layer):
    """Transformer block: communication followed by computation"""

    def __init__(self, n_embd, n_head, dropout=0.0, **kwargs):
        super().__init__(**kwargs)
        self.n_embd = n_embd
        self.n_head = n_head
        self.dropout = dropout
        self.sa = layers.MultiHeadAttention(
            num_heads=n_head, key_dim=n_embd // n_head, dropout=dropout
        )
        self.ffwd = FeedForward(n_embd, dropout)
        self.ln1 = layers.LayerNormalization()
        self.ln2 = layers.LayerNormalization()

    def call(self, x):
        attn_output = self.sa(
            self.ln1(x), self.ln1(x), use_causal_mask=True
        )  # use causal mask to ensure each token can only see previous tokens
        x = x + attn_output
        x = x + self.ffwd(self.ln2(x))
        return x

    def get_config(self):
        config = super().get_config()
        config.update({
            "n_embd": self.n_embd,
            "n_head": self.n_head,
            "dropout": self.dropout,
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)


In [133]:
# Bigram Language Model
@register_keras_serializable()
class BigramLanguageModel(keras.Model):
    def __init__(self, vocab_size, n_embd, block_size, n_head, n_layer, **kwargs):
        super().__init__(**kwargs)
        self.token_embedding_table = layers.Embedding(vocab_size, n_embd)
        self.position_embedding_table = layers.Embedding(block_size, n_embd)
        self.blocks = [Block(n_embd, n_head) for _ in range(n_layer)]
        self.ln_f = layers.LayerNormalization()
        self.lm_head = layers.Dense(vocab_size)

    def call(self, idx, targets=None):
        B, T = idx.shape
        tok_emb = self.token_embedding_table(idx)  # (B,T,C)
        pos_emb = self.position_embedding_table(
            tf.range(T)[tf.newaxis, :]
        )  # initially (T,C) adding new axis and get # (1,T,C)
        x = tok_emb + pos_emb  # (B,T,C)
        for block in self.blocks:  # (B,T,C)
            x = block(x)
        x = self.ln_f(x)  # (B,T,C)
        logits = self.lm_head(x)  # (B,T,vocab_size)

        if targets is None:
            return logits, None

        logits_flat = tf.reshape(logits, [-1, logits.shape[-1]])
        targets_flat = tf.reshape(targets, [-1])
        loss = keras.losses.sparse_categorical_crossentropy(
            targets_flat, logits_flat, from_logits=True
        )
        return logits, tf.reduce_mean(loss)

    def train_step(self, data):
        x, y = data
        with tf.GradientTape() as tape:
            logits, loss = self(x, y)
        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        return {"loss": loss}

    def test_step(self, data):
        x, y = data
        logits, loss = self(x, y)
        return {"loss": loss}

    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]
            idx_next = tf.random.categorical(logits, num_samples=1)
            idx = tf.concat([idx, idx_next], axis=1)
        return idx

    def get_config(self):
        config = super().get_config()
        config.update({
            "vocab_size": self.token_embedding_table.input_dim,
            "n_embd": self.token_embedding_table.output_dim,
            "block_size": self.position_embedding_table.input_dim,
            "n_head": self.blocks[0].sa.num_heads,
            "n_layer": len(self.blocks),
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [134]:
# Initialize the model train and plotting loss curves
model = BigramLanguageModel(vocab_size=vocab_size, n_embd=n_embd, block_size=block_size, n_head=n_head, n_layer=n_layer)
# print the number of parameters in the model
model.build((batch_size, block_size))
print("Number of trainable parameters:", model.count_params())

# Compile the model
model.compile(optimizer=optimizers.Adam(learning_rate))

Number of trainable parameters: 0




In [135]:
model.summary()

In [136]:
# Train the model
model.fit(
    train_data_generator,
    # epochs=Epochs,
    epochs=1,
    steps_per_epoch=eval_interval,
    validation_data=val_data_generator,
    validation_steps=eval_iters,
)

[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1358s[0m 13s/step - loss: 10.5332 - val_loss: 0.0000e+00


In [158]:
# Generate text
# prompt = "waati sera, wuli ka bɔ"
prompt = "kuma don sera"
context = SuperTokenizer.encode(prompt)
context = np.array([context], dtype=np.int64)
# context = [context]
print(context)

[[616 361 383 355]]


In [159]:
generated = model.generate(context, max_new_tokens=30)
print(SuperTokenizer.decode(generated[0].numpy().tolist()))
# print()
# generated
# print(SuperTokenizer.decode([616, 361, 383, 355, 13]))
# print(SuperTokenizer.encode("kuma don sera."))

 kuma don seraunsigi


In [160]:
print(model)
model_name = "E:\\Alkaou\Python Projects\\models\\br_bigram_model.keras"
# Sauvegarder le modèle
keras.models.save_model(model, model_name)

<BigramLanguageModel name=bigram_language_model_6, built=True>


In [161]:
md_loaded = keras.models.load_model(model_name)
md_loaded.summary()

In [37]:
# Generate text
context = np.zeros((1, 1), dtype=np.int64)
print(context)

[[0]]


In [39]:
generated = md_loaded.generate(context, max_new_tokens=20)
print(SuperTokenizer.decode(generated[0].numpy().tolist()))

!
ɔndiya yeama ye awlenw, jukɔrɔ, f la haliɔ an ye siniɲɛsigi kad wula
