This part is mostly based on TensorFlow tutorial "Text generation with RNN" (https://www.tensorflow.org/text/tutorials/text_generation). Most changes are focused on project specification - TFLite compatibility and no need in multiple training approaches.

Import TensorFlow and other libraries.

In [1]:
import tensorflow as tf

import numpy as np
import pandas as pd
import os
import time

Read prepared text dataset and analyze it.

In [2]:
path_to_file = os.path.abspath("data/scp6999.txt")

# Read, then decode for py2 compat.
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
print(f'Length of text: {len(text)} characters')

Length of text: 5600694 characters


In [3]:
print(text[:250])

☺SCP-002
☺"The "Living" Room"
☺Object Class: Euclid
☺Special Containment Procedures: SCP-002 is to remain connected to a suitable power supply at all times, to keep it in what appears to be a recharging mode. In case of electrical outage, the emer


In [4]:
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

176 unique characters


Text must be vectorized for further usage. StringLookup can provide string => int and int => string conversions based on vocabulary.

In [5]:
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), name='charToId', mask_token=None)
    
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), name='idToChar', invert=True, mask_token=None)

However, lookup returns a tensor, that can be converted into text with reduce_join.

In [6]:
def text_from_ids(ids):
  return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

Now convert the text to character ids and then to dataset so it can be used for training.

In [7]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids

<tf.Tensor: shape=(5600694,), dtype=int64, numpy=array([176,  54,  38, ...,   1,   2,   1], dtype=int64)>

In [8]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

In [9]:
for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode('utf-8'))

☺
S
C
P
-
0
0
2





Prediction task is to find character best-fitting as continuation of current sequence, so training process based on feeding sequence without it's last character as input and sequence without it first character as desired series of outputs.

In [10]:
seq_length = 100

Dataset can be split to sequences with the batch method.

In [11]:
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(1):
  print(chars_from_ids(seq))

tf.Tensor(
[b'\xe2\x98\xba' b'S' b'C' b'P' b'-' b'0' b'0' b'2' b'\r' b'\n'
 b'\xe2\x98\xba' b'"' b'T' b'h' b'e' b' ' b'"' b'L' b'i' b'v' b'i' b'n'
 b'g' b'"' b' ' b'R' b'o' b'o' b'm' b'"' b'\r' b'\n' b'\xe2\x98\xba' b'O'
 b'b' b'j' b'e' b'c' b't' b' ' b'C' b'l' b'a' b's' b's' b':' b' ' b'E'
 b'u' b'c' b'l' b'i' b'd' b'\r' b'\n' b'\xe2\x98\xba' b'S' b'p' b'e' b'c'
 b'i' b'a' b'l' b' ' b'C' b'o' b'n' b't' b'a' b'i' b'n' b'm' b'e' b'n'
 b't' b' ' b'P' b'r' b'o' b'c' b'e' b'd' b'u' b'r' b'e' b's' b':' b' '
 b'S' b'C' b'P' b'-' b'0' b'0' b'2' b' ' b'i' b's' b' ' b't' b'o'], shape=(101,), dtype=string)


In [12]:
for seq in sequences.take(5):
  print(text_from_ids(seq).numpy())

b'\xe2\x98\xbaSCP-002\r\n\xe2\x98\xba"The "Living" Room"\r\n\xe2\x98\xbaObject Class: Euclid\r\n\xe2\x98\xbaSpecial Containment Procedures: SCP-002 is to'
b' remain connected to a suitable power supply at all times, to keep it in what appears to be a recharg'
b'ing mode. In case of electrical outage, the emergency barrier between the object and the facility is '
b'to be closed and the immediate area evacuated. Once facility power is re-established, alternating bur'
b'sts of X-ray and ultraviolet light must strobe the area until SCP-002 is re-affixed to the power supp'


In [13]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

In [14]:
dataset = sequences.map(split_input_target)

Now dataset can be batched and shuffled.

In [15]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

<PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>

Here starts model definition.

In [16]:
# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary())

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

Model uses three layers: 
- embedding => trainable lookup to map character-id to vector 
- gru => RNN based on GRU units
- dense => output layer with logits for each character in vocabulary

In [17]:
class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(rnn_units,
                                   return_sequences=True,
                                   return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      states = self.gru.get_initial_state(x)
    x, states = self.gru(x, initial_state=states, training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

In [18]:
model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

Here are checks if model works as intended.

In [19]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 100, 177) # (batch_size, sequence_length, vocab_size)


In [20]:
model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  45312     
                                                                 
 gru (GRU)                   multiple                  3938304   
                                                                 
 dense (Dense)               multiple                  181425    
                                                                 
Total params: 4,165,041
Trainable params: 4,165,041
Non-trainable params: 0
_________________________________________________________________


To check the results outputs must be sampled (argmax can't be used as the model then likely to loop).

In [21]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

In [22]:
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())

Input:
 b'o (2) weeks. SCP-306 appears to mainly infect humans; however, testing is ongoing to determine any a'

Next Char Predictions:
 b'"\xe2\x88\x86%s\xc6\x9ff\xe2\x89\xa5, lj\xc3\xb3h\xce\xbf d>\xce\xb5\n\xe2\x80\x91\xc3\x86#VB\xe2\x84\xa2l\xc3\xb01K6\xe2\x89\xa1J\xc3\x93\xe2\x98\xbaa\xc3\x9f\'\xc2\xa0/\xc6\x94Hl\xca\x8a\nCq\xcf\x89&(\xe2\x80\x997\xe2\x80\x99\xe2\x89\xa1g: m\xc3\xb0\xc2\xa3h\xe2\x89\xa1e\xca\x8a\xc6\x94\xe2\x89\xa4\xcf\x88"\xc3\x97\xc2\xa0\xe2\x85\xa1\xe2\x8a\x83\xcf\x88\xe2\x80\x91K\xc3\x97\xe2\x80\x98\xc3\xb0\xc3\xa1\xe2\x89\x88\xc2\xa0D\xc2\xae\xc2\xb3?\xc3\x97\xe2\x80\x936K\xc3\xb7\n\xc5\xba7\n=\xe2\x82\xac\xc2\xb1\xc3\x89%1\xc2\xb2'


Now loss can be defined.

In [23]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [24]:
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

Prediction shape:  (64, 100, 177)  # (batch_size, sequence_length, vocab_size)
Mean loss:         tf.Tensor(5.1757903, shape=(), dtype=float32)


Commonly loss must be near to characters count.

In [25]:
tf.exp(example_batch_mean_loss).numpy()

176.93639

Then optimizer can be added and model can be compiled.

In [26]:
model.compile(optimizer='adam', loss=loss)

Custom training process will be used.

In [27]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [28]:
class CustomTraining(MyModel):
  @tf.function
  def train_step(self, inputs):
      inputs, labels = inputs
      with tf.GradientTape() as tape:
          predictions = self(inputs, training=True)
          loss = self.loss(labels, predictions)
      grads = tape.gradient(loss, model.trainable_variables)
      self.optimizer.apply_gradients(zip(grads, model.trainable_variables))

      return {'loss': loss}

In [29]:
model = CustomTraining(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [31]:
model.compile(optimizer = tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))

In [32]:
model.fit(dataset, epochs=20)
model.save_weights(checkpoint_prefix.format(epoch=20))



For text generation OneStep class is created, it provides functionality for easy single-step prediction.

In [34]:
class OneStep(tf.keras.Model):
  def __init__(self, model, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function(input_signature=[tf.TensorSpec([1, None], tf.int64, name='chars'), tf.TensorSpec([1, 1024], tf.float32, name='states')])
  def generate_one_step(self, input_ids, states=None):
    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Return the characters and model state.
    return predicted_ids, states

Save the model.

In [35]:
one_step_model = OneStep(model, ids_from_chars, temperature=0.3)

In [33]:
tf.saved_model.save(one_step_model, 'one_step')



INFO:tensorflow:Assets written to: one_step\assets


INFO:tensorflow:Assets written to: one_step\assets


To generate text predictions run in a loop till the terminate condition, here by symbols count. Model from the original tutorial uses string lookups inside to make a process simplier, but it won't work for TFLite that does not support string ops and string dtype, thus char-id conversion will be dealt-with outside.

In [36]:
def prepare_input(s):
    input_chars = tf.strings.unicode_split(s, 'UTF-8')
    return ids_from_chars(input_chars).to_tensor()

In [37]:
def generate_text(n, step_model):
  start = time.time()
  states = tf.zeros([1, 1024])

  next_char = tf.constant(['SCP-6969 \n"'])
  result = [next_char]

  for n in range(n):
    next_char, states = step_model.generate_one_step(prepare_input(next_char), states=states)
    next_char = chars_from_ids(next_char)
    result.append(next_char)

  result = tf.strings.join(result)
  end = time.time()
  print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
  print('\nRun time:', end - start)

In [39]:
generate_text(500, one_step_model)

SCP-6969 
"
☺Object Class: Keter
☺Special Containment Procedures: SCP-6604 is to be kept in a standard containment chamber is to be contained in a standard humanoid containment chamber in a standard humanoid containment process of a standard humanoid containment cell. As a day of a standard human male at all times are to be housed in a standard humanoid containment chamber is to be contained at Site-19. A facility of surveillance of SCP-6920 is to be kept in a standard containment cell at Site-19. Any in 

________________________________________________________________________________

Run time: 3.5163733959198
