In [32]:
# We download our dataset for the training

!pip3 install gutenbergpy
import datasource
datasource.download_dataset()

You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Cache already exists
0 / 29
1 / 29
2 / 29
3 / 29
4 / 29
5 / 29
6 / 29
7 / 29
8 / 29
9 / 29
10 / 29
11 / 29
12 / 29
13 / 29
14 / 29
15 / 29
16 / 29
17 / 29
18 / 29
19 / 29
20 / 29
21 / 29
22 / 29
23 / 29
24 / 29
25 / 29
26 / 29
27 / 29
28 / 29


In [36]:
!pip3 install keras_nlp
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import TextVectorization
import tensorflow_text as tf_text
import keras_nlp
import numpy as np
import pickle
import random
from pathlib import Path

You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [34]:
APP_DIR = Path.home() / ".lafontaine"
DATASET = APP_DIR / "dataset.txt"
VECTORIZER = APP_DIR / "vectorizer.pkl"

In [72]:
with DATASET.open('r') as stream:
    verses = stream.read().splitlines()
    
# We force it at 50 or it won't fit in GPU memory
seq_length = len(max(verses, key=lambda v: len(v.split())))
# seq_length = 50


def nw_split(text_input):
    splitted = tf_text.regex_split(input=text_input,
            delim_regex_pattern="[^\[a-zA-ZÀ-ÿ-.]+|\s+", # We split on french words and punctuations
            keep_delim_regex_pattern="\S+" # We keep everything that isn't a sequence of whitespaces
            )
    return splitted

vectorizer = TextVectorization(
    split=nw_split,
    standardize='lower',
    output_mode="int",
    output_sequence_length=seq_length + 1,
)

vectorizer.adapt(verses)
vocab = vectorizer.get_vocabulary()
vocab_size = len(vocab)
index_lookup = dict(zip(range(len(vocab)), vocab))

In [78]:
random.shuffle(verses)
length = len(verses)
text_train = verses[:int(0.7*length)]
text_test = verses[int(0.7*length):int(0.85*length)]
text_valid = verses[int(0.85*length):]

batch_size = 32

train_dataset = tf.data.Dataset.from_tensor_slices(text_train)
train_dataset = train_dataset.shuffle(buffer_size=256)
train_dataset = train_dataset.batch(batch_size)

test_dataset = tf.data.Dataset.from_tensor_slices(text_test)
test_dataset = test_dataset.shuffle(buffer_size=256)
test_dataset = test_dataset.batch(batch_size)

valid_dataset = tf.data.Dataset.from_tensor_slices(text_valid)
valid_dataset = valid_dataset.shuffle(buffer_size=256)
valid_dataset = valid_dataset.batch(batch_size)

def preprocess_text(text):
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorizer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y

train_dataset = train_dataset.map(preprocess_text)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

test_dataset = test_dataset.map(preprocess_text)
test_dataset = test_dataset.prefetch(tf.data.AUTOTUNE)

valid_dataset = valid_dataset.map(preprocess_text)
valid_dataset = valid_dataset.prefetch(tf.data.AUTOTUNE)

In [79]:
embed_dim = 128
num_heads = 4

def create_model():
    inputs = keras.layers.Input(shape=(seq_length,), dtype=tf.int32)
    embedding_layer = keras_nlp.layers.TokenAndPositionEmbedding(vocab_size, seq_length, embed_dim)(inputs)
    decoder = keras_nlp.layers.TransformerDecoder(intermediate_dim=embed_dim, 
                                                            num_heads=num_heads, 
                                                            dropout=0.5)(embedding_layer)
    
    outputs = keras.layers.Dense(vocab_size, activation='softmax')(decoder)
    
    model = keras.Model(inputs=inputs, outputs=outputs)
    
    model.compile(
        optimizer="adam", 
        loss='sparse_categorical_crossentropy',
        metrics=[keras_nlp.metrics.Perplexity(), 'accuracy']
    )
    return model

model = create_model()
model.summary()

Model: "model_13"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_14 (InputLayer)       [(None, 67)]              0         
                                                                 
 token_and_position_embeddin  (None, 67, 128)          5159040   
 g_13 (TokenAndPositionEmbed                                     
 ding)                                                           
                                                                 
 transformer_decoder_13 (Tra  (None, 67, 128)          99584     
 nsformerDecoder)                                                
                                                                 
 dense_13 (Dense)            (None, 67, 40238)         5190702   
                                                                 
Total params: 10,449,326
Trainable params: 10,449,326
Non-trainable params: 0
______________________________________________

In [80]:
class TextSampler(keras.callbacks.Callback):
    def __init__(self, start_prompt, max_tokens):
        self.start_prompt = start_prompt
        self.max_tokens = max_tokens
        
    # Helper method to choose a word from the top K probable words with respect to their probabilities
    # in a sequence
    def sample_token(self, logits):
        logits, indices = tf.math.top_k(logits, k=5, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def on_epoch_end(self, epoch, logs=None):
        decoded_sample = self.start_prompt
        
        for i in range(self.max_tokens-1):
            tokenized_prompt = vectorizer([decoded_sample])[:, :-1]
            predictions = self.model.predict([tokenized_prompt], verbose=0)
            # To find the index of the next word in the prediction array.
            # The tokenized prompt is already shorter than the original decoded sample
            # by one, len(decoded_sample.split()) is two words ahead - so we remove 1 to get
            # the next word in the sequence
            sample_index = len(decoded_sample.strip().split())-1
            
            sampled_token = self.sample_token(predictions[0][sample_index])
            sampled_token = index_lookup[sampled_token]
            decoded_sample += " " + sampled_token
            
        print(f"\nSample text:\n{decoded_sample}...\n")

# First 5 words of a random sentence to be used as a seed
random_sentence = ' '.join(random.choice(text_valid).replace('\n', ' ').split(' ')[:4])

# Sampler from seed
sampler = TextSampler(random_sentence, 30)

# Reduce learning rate when a metric has stopped improving.
reducelr = keras.callbacks.ReduceLROnPlateau(patience=10, monitor='val_loss')

In [None]:
model = create_model()
history = model.fit(train_dataset, 
                    validation_data=valid_dataset,
                    epochs=30, 
                    callbacks=[sampler, reducelr])
# Save model
model.save("sentence_model")

Epoch 1/30
 713/4091 [====>.........................] - ETA: 8:02 - loss: 1.1296 - perplexity: 3.0943 - accuracy: 0.8938

In [12]:
def sample_token(logits):
        logits, indices = tf.math.top_k(logits, k=5, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

def generate_text(prompt, response_length=20):
    decoded_sample = prompt
    for i in range(response_length-1):
        tokenized_prompt = vectorizer([decoded_sample])[:, :-1]
        predictions = model.predict([tokenized_prompt], verbose=0)
        sample_index = len(decoded_sample.strip().split())-1

        sampled_token = sample_token(predictions[0][sample_index])
        sampled_token = index_lookup[sampled_token]
        decoded_sample += " " + sampled_token
    return decoded_sample

In [31]:
generate_text("Bonjour")

"Bonjour ! çà! et puis - il  n ' ont fait plus  en ce que gazotte  "