# Text Generation using LSTM for Sherlock Holmes

First, import libraries:


In [None]:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"  # avoid parallelism warning

# to do: try to use tensorflow-metal instead of base tf
import wandb
import tensorflow as tf
import matplotlib.pyplot as plt
import platform
from collections import Counter
import re

print('Python version:', platform.python_version())
print('Tensorflow version:', tf.__version__)
print('Keras version:', tf.keras.__version__)


In [None]:
from wandb.integration.keras import WandbMetricsLogger

wandb.login()


Import data + directory for saving checkpoints


In [None]:
text = open(dataset_file_path, mode='r').read()
print(text[:250])


In [None]:
from tokenizers import ByteLevelBPETokenizer
from transformers import GPT2Tokenizer


Here we tokenize the file. The total vocabulary size will be 30000


In [None]:
tokenizer = ByteLevelBPETokenizer()
save_dir="./tokenizer"

# Train tokenizer if folder is empty
if (not os.path.exists(save_dir)) or (len(os.listdir(save_dir)) == 0):
    tokenizer.train(files=[dataset_file_path], vocab_size=30_000, min_frequency=2)
    os.makedirs(save_dir, exist_ok=True)
    tokenizer.save_model(save_dir)

# Load the tokenizer using GPT2Tokenizer
custom_tokenizer = GPT2Tokenizer.from_pretrained(save_dir)


Tokenize the text file


In [None]:
input_ids = custom_tokenizer.encode(text)
print(f"Total tokens in text: {len(input_ids)}")


Prepping data


In [None]:
tokens_dataset = tf.data.Dataset.from_tensor_slices(input_ids)


In [None]:
sequence_length = 100
examples_per_epoch = len(input_ids) // (sequence_length + 1)

print(f'Examples per epoch: {examples_per_epoch}')


In [None]:
# Generate batched sequences out of the token dataset
sequences = tokens_dataset.batch(sequence_length + 1, drop_remainder=True)


In [None]:
# Split sequences into input and target
def split_input_target(chunk):
    input_text = chunk[:-1]
    target_text = chunk[1:]
    return input_text, target_text

dataset = sequences.map(split_input_target)


In [None]:
# Show some examples of input-target pairs
for input_example, target_example in dataset.take(1):
    for i in range(5):
        if i < len(input_example):
            input_token = input_example[i].numpy()
            target_token = target_example[i].numpy()
            print(f'Step {i:2d}')
            print(f'  input token: {input_token} ({custom_tokenizer.decode([input_token])})')
            print(f'  expected output token: {target_token} ({custom_tokenizer.decode([target_token])})')


In [None]:
# Batch size.
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
BUFFER_SIZE = 10000

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

dataset


In [None]:
print('Batched dataset size: {}'.format(len(list(dataset.as_numpy_iterator()))))


In [None]:
# Get vocabulary size from tokenizer
vocab_size = custom_tokenizer.vocab_size + 1  # +1 for the padding token if added

# Model hyperparams
EMBED_DIM = 256
MODEL_DIM = 256
NUM_HEADS = 8
FF_DIM = 1024
NUM_LAYERS = 4
DROPOUT = 0.1


In [None]:
import tensorflow as tf

def positional_encoding(length: int, depth: int) -> tf.Tensor:
    pos = tf.range(length, dtype=tf.float32)[:, tf.newaxis]
    idx = tf.range(depth, dtype=tf.float32)[tf.newaxis, :]
    angle_rates = 1 / tf.pow(10000.0, (2 * (idx // 2)) / tf.cast(depth, tf.float32))
    angles = pos * angle_rates
    sines = tf.math.sin(angles[:, 0::2])
    cosines = tf.math.cos(angles[:, 1::2])
    pos_encoding = tf.concat([sines, cosines], axis=-1)
    return pos_encoding[tf.newaxis, ...]


def transformer_block(x: tf.Tensor, model_dim: int, ff_dim: int, num_heads: int, dropout: float) -> tf.Tensor:
    attn_out = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=model_dim, dropout=dropout)(x, x)
    attn_out = tf.keras.layers.Dropout(dropout)(attn_out)
    x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x + attn_out)

    ff = tf.keras.Sequential([
        tf.keras.layers.Dense(ff_dim, activation="relu"),
        tf.keras.layers.Dense(model_dim),
    ])
    ff_out = ff(x)
    ff_out = tf.keras.layers.Dropout(dropout)(ff_out)
    x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x + ff_out)
    return x


def build_model(vocab_size, embed_dim, model_dim, num_heads, ff_dim, num_layers, dropout):
    inputs = tf.keras.layers.Input(shape=(None,), dtype=tf.int32)
    emb = tf.keras.layers.Embedding(vocab_size, embed_dim)(inputs)
    proj = tf.keras.layers.Dense(model_dim)(emb)
    pos = positional_encoding(length=sequence_length, depth=model_dim)

    def add_positional(x):
        seq_len = tf.shape(x)[1]
        return x + pos[:, :seq_len, :]

    x = tf.keras.layers.Lambda(add_positional)(proj)

    for _ in range(num_layers):
        x = transformer_block(x, model_dim, ff_dim, num_heads, dropout)

    logits = tf.keras.layers.Dense(vocab_size)(x)
    model = tf.keras.Model(inputs, logits, name="token_transformer")
    return model

model = build_model(vocab_size, EMBED_DIM, MODEL_DIM, NUM_HEADS, FF_DIM, NUM_LAYERS, DROPOUT)


In [None]:
model.summary()


In [None]:
tf.keras.utils.plot_model(
    model,
    show_shapes=True,
    show_layer_names=True,
)


In [None]:
# Loss function
def loss(labels, logits):
    return tf.keras.losses.sparse_categorical_crossentropy(
        y_true=labels,
        y_pred=logits,
        from_logits=True
    )

# Compile the model
adam_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(
    optimizer=adam_optimizer,
    loss=loss
)

# Directory for checkpoints
checkpoint_dir = cache_dir + '/checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

# Checkpoint filename
checkpoint_prefix = os.path.join(checkpoint_dir, 'model_epoch_{epoch:02d}.weights.h5')

checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_best_only=True,
    save_weights_only=True,
    monitor='loss',
    mode='min',
    verbose=1
)

from resource_monitor import ResourceMonitorCB
monitor_cb = ResourceMonitorCB(monitor_interval=2.0)

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='loss', patience=3, restore_best_weights=True
)


In [None]:
run = wandb.init(project='my-first-project', group='transformer-experiment')

EPOCHS = 2  # TESTING
history = model.fit(
    x=dataset,
    epochs=EPOCHS,
    callbacks=[
        checkpoint_cb,
        early_stopping,
        WandbMetricsLogger(),
        monitor_cb
    ]
)

run.finish()


In [None]:
def render_training_history(training_history):
    loss = training_history.history['loss']
    plt.title('Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.plot(loss, label='Training set')
    plt.legend()
    plt.grid(linestyle='--', linewidth=1, alpha=0.5)
    plt.show()

render_training_history(history)


Load best model


In [None]:
import glob

# Get list of all weight files
weight_files = glob.glob(os.path.join(checkpoint_dir, '*.weights.h5'))

if weight_files:
    latest = max(weight_files, key=os.path.getctime)  # or sort by name, etc.
    model.load_weights(latest)
    print(f"Loaded weights from: {latest}")
else:
    print("No weights file found. Using randomly initialized model.")


Generate text


In [None]:
def generate_text(model, start_string, num_generate=300, temperature=1.0):
    # Tokenize the start string
    input_ids = custom_tokenizer.encode(start_string)
    input_ids = tf.expand_dims(input_ids, 0)

    generated_tokens = []

    for _ in range(num_generate):
        logits = model(input_ids)
        logits = logits[:, -1, :] / temperature
        predicted_id = tf.random.categorical(logits, num_samples=1)[:, -1].numpy()[0]
        generated_tokens.append(predicted_id)
        input_ids = tf.concat([input_ids, tf.expand_dims([[predicted_id]], 0)[:, 0, :]], axis=1)

    return start_string + custom_tokenizer.decode(generated_tokens)

print(generate_text(model, start_string="Sherlock Holmes looked at the ", num_generate=300, temperature=1.0))


Experiments on grammar


In [None]:
import language_tool_python

def evaluate_grammar_quality(generated_text):

    try:
        tool = language_tool_python.LanguageTool('en-GB')
    except:
        print("LanguageTool not available. Installing via pip...")
        import subprocess
        import sys
        subprocess.check_call([sys.executable, "-m", "pip", "install", "language-tool-python"])
        tool = language_tool_python.LanguageTool('en-GB')
    
    # Find grammar and spelling errors
    matches = tool.check(generated_text)
    
    # Count errors by category
    error_categories = Counter()
    for match in matches:
        error_categories[match.category] = error_categories.get(match.category, 0) + 1
    
    # Calculate word count
    words = re.findall(r'\b\w+\b', generated_text)
    word_count = len(words)
    
    # Calculate errors per 100 words
    errors_per_100_words = (len(matches) / max(1, word_count)) * 100
    
    results = {
        'total_errors': len(matches),
        'error_categories': dict(error_categories),
        'errors_per_100_words': errors_per_100_words,
        'word_count': word_count
    }
    
    return results

def merge_error_categories(error_cat_list):
    merged = Counter()
    for error_cat in error_cat_list:
        for category, count in error_cat.items():
            merged[category] += count
    
    # Calculate averages
    result = {category: count / len(error_cat_list) for category, count in merged.items()}
    return result


def run_evaluations(model, start_string, num_runs=5, num_tokens=300, temperatures=[0.7, 1.0]):
    all_results = {}
    
    for temp in temperatures:
        print(f"\nEvaluating temperature: {temp}")
        run_results = []
        
        for run in range(1, num_runs + 1):
            print(f"  Run {run}/{num_runs}...")
            text = generate_text(model, start_string, num_tokens, temp)
            run_results.append(evaluate_grammar_quality(text))
        
        # Calculate averages across runs
        avg_results = {
            'total_errors': sum(r['total_errors'] for r in run_results) / num_runs,
            'errors_per_100_words': sum(r['errors_per_100_words'] for r in run_results) / num_runs,
            'word_count': sum(r['word_count'] for r in run_results) / num_runs,
            'error_categories': merge_error_categories([r['error_categories'] for r in run_results])
        }
        
        # Store individual runs for reference
        avg_results['individual_runs'] = run_results
        all_results[temp] = avg_results
    
    return all_results


In [None]:
# Define temperatures to test
temperatures = [1.0]
start_string = "Sherlock Holmes looked at the"

# Run the evaluations 5 times for each temperature
results = run_evaluations(model, start_string, num_runs=5, num_tokens=300, temperatures=temperatures)

print("\n===== AVERAGED RESULTS =====")
for temp, result in results.items():
    print(f"\nTemperature: {temp}")
    print(f"Average total errors: {result['total_errors']:.2f}")
    print(f"Average errors per 100 words: {result['errors_per_100_words']:.2f}")
    print(f"Average word count: {result['word_count']:.2f}")
    print("Average error categories:")
    
    # Sort categories by frequency for better readability
    sorted_categories = sorted(
        result['error_categories'].items(), 
        key=lambda x: x[1], 
        reverse=True
    )
    
    for category, avg_count in sorted_categories:
        print(f"  - {category}: {avg_count:.2f}")


In [None]:
cache_dir = './tmp-transformer'
dataset_file_name = 'sherlockholmes.txt'

dataset_file_path = dataset_file_name

print(dataset_file_path)
