In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
file_path = 'drive/MyDrive/data/anna.txt'

with open(file_path, 'r') as f:
    text = f.read()

In [None]:
import re

sentences = []

addressings_regexp = " (Mr|St|Mrs|Ms|Dr)\.$"
quotes = "\"'"
end_of_sentence = ".!?"

text = text.lower()
sentence = ""
for symbol in text:
  if symbol in quotes:
    continue

  if symbol == ':':
    symbol = '.'

  if symbol.isspace():
    if len(sentence) != 0 and sentence[-1] == ' ':
      continue
    symbol = ' '

  if not (symbol.isalpha() or symbol.isdigit() or symbol.isspace() or symbol in end_of_sentence):
    if len(sentence) > 0 and sentence[-1] == ' ':
      sentence = sentence[:-1]
    if len(sentence) != 0 and sentence[-1] == ',':
      continue
    symbol = ','

  sentence += symbol

  if symbol in end_of_sentence:
    suffix = "".join(sentence[-5:])
    if re.search(addressings_regexp, suffix):
      continue

    letters_are_present = False
    i = 0
    while i < len(sentence):
      if sentence[i].isalpha():
        letters_are_present = True
        break
      i += 1

    if not letters_are_present:
      continue

    sentences.append(sentence[i:])
    sentence = ""

limit = 70
max_sentence_length = 0

for i in range(len(sentences)):
  if len(sentences[i]) > limit:
    ind = limit
    while ind < len(sentences[i]) and not sentences[i][ind].isspace():
      ind += 1
    sentences[i] = sentences[i][:ind]
    if not sentences[i][-1] in end_of_sentence:
      sentences[i] += '.'
  max_sentence_length = max(max_sentence_length, len(sentences[i]))

for i in range(len(sentences)):
  sentences[i] += '#' * (max_sentence_length - len(sentences[i]))

In [None]:
chars = list(set(" ".join(sentences)))
print(chars)
int2char = dict(enumerate(chars))
print(int2char)
char2int = {ch: ii for ii, ch in int2char.items()}
print(char2int)

batch_size = 128

encoded_sentences = np.array([[char2int[ch] for ch in sentence] for sentence in sentences])

src = [sentence[:-1] for sentence in encoded_sentences]
target = [sentence[1:] for sentence in encoded_sentences]

dataset = tf.data.Dataset.from_tensor_slices((src, target))
dataset = dataset.shuffle(100).batch(batch_size)

['n', '0', 'j', '4', 'p', 'y', 'k', 'l', 'q', '#', 'm', 'f', 'r', 'c', 'a', '1', 'e', '!', 'z', 'x', 'b', ',', 'g', '6', '8', 'h', 'i', 'w', '2', 'o', 'u', 't', ' ', 's', '3', 'v', '9', '7', '5', '?', '.', 'd']
{0: 'n', 1: '0', 2: 'j', 3: '4', 4: 'p', 5: 'y', 6: 'k', 7: 'l', 8: 'q', 9: '#', 10: 'm', 11: 'f', 12: 'r', 13: 'c', 14: 'a', 15: '1', 16: 'e', 17: '!', 18: 'z', 19: 'x', 20: 'b', 21: ',', 22: 'g', 23: '6', 24: '8', 25: 'h', 26: 'i', 27: 'w', 28: '2', 29: 'o', 30: 'u', 31: 't', 32: ' ', 33: 's', 34: '3', 35: 'v', 36: '9', 37: '7', 38: '5', 39: '?', 40: '.', 41: 'd'}
{'n': 0, '0': 1, 'j': 2, '4': 3, 'p': 4, 'y': 5, 'k': 6, 'l': 7, 'q': 8, '#': 9, 'm': 10, 'f': 11, 'r': 12, 'c': 13, 'a': 14, '1': 15, 'e': 16, '!': 17, 'z': 18, 'x': 19, 'b': 20, ',': 21, 'g': 22, '6': 23, '8': 24, 'h': 25, 'i': 26, 'w': 27, '2': 28, 'o': 29, 'u': 30, 't': 31, ' ': 32, 's': 33, '3': 34, 'v': 35, '9': 36, '7': 37, '5': 38, '?': 39, '.': 40, 'd': 41}


In [None]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
    )
    return tf.tile(mask, mult)

In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
        attention_output = self.dropout1(attention_output)
        out1 = self.layernorm1(inputs + attention_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

In [None]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size):
        super(TokenAndPositionEmbedding, self).__init__()
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=vocab_size)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = tf.one_hot(x, vocab_size)
        return x + positions

In [None]:
max_sentence_length -= 1

In [None]:
vocab_size = len(chars)
num_heads = 2
feed_forward_dim = 256

def create_model():
    inputs = layers.Input(shape=(max_sentence_length,), dtype=tf.int32)
    embedding_layer = TokenAndPositionEmbedding(max_sentence_length, vocab_size)
    x = embedding_layer(inputs)
    transformer_block1 = TransformerBlock(vocab_size, num_heads, feed_forward_dim)
    x = transformer_block1(x)
    transformer_block2 = TransformerBlock(vocab_size, num_heads, feed_forward_dim)
    x = transformer_block2(x)
    transformer_block3 = TransformerBlock(vocab_size, num_heads, feed_forward_dim)
    x = transformer_block3(x)
    outputs = layers.Dense(vocab_size)(x)
    model = keras.Model(inputs=inputs, outputs=[outputs, x])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(
        "adam", loss=[loss_fn, None],
    )
    return model

In [None]:
class TextGenerator(keras.callbacks.Callback):
    def __init__(self, max_tokens, start_tokens, int2char, char2int, top_k=10, print_every=1):
      self.max_tokens = max_tokens
      self.start_tokens = start_tokens
      self.int2char = int2char
      self.char2int = char2int
      self.print_every = print_every
      self.k = top_k

    def sample_from(self, logits):
      logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
      indices = np.asarray(indices).astype("int32")
      preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
      preds = np.asarray(preds).astype("float32")
      return np.random.choice(indices, p=preds)

    def detokenize(self, number):
      return self.int2char[number]

    def tokenize(self, symbol):
      return self.char2int[symbol]
  
    def predict(self, beginning):
      num_tokens_generated = 0
      tokens_generated = []
      sample_token = self.tokenize('#')
      while num_tokens_generated <= self.max_tokens and self.detokenize(sample_token) not in end_of_sentence:
          pad_len = max_sentence_length - len(beginning)
          sample_index = len(beginning) - 1
          if pad_len < 0:
              x = beginning[:max_sentence_length]
              sample_index = max_sentence_length - 1
          elif pad_len > 0:
              x = beginning + [0] * pad_len
          else:
              x = beginning
          x = np.array([x])
          y, _ = self.model.predict(x)
          sample_token = self.sample_from(y[0][sample_index])
          tokens_generated.append(sample_token)
          beginning.append(sample_token)
          num_tokens_generated = len(tokens_generated)
      return "".join([self.detokenize(_) for _ in beginning])

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        txt = self.predict(start_tokens)
        print(txt)

    def generate(self, beginning):
      start_tokens = [self.tokenize(s) for s in beginning]
      return self.predict(start_tokens)

start_prompt = "the "
start_tokens = [char2int.get(ch) for ch in start_prompt]
print(start_tokens)
text_gen_callback = TextGenerator(max_sentence_length, start_tokens, int2char, char2int)


[31, 25, 16, 32]


In [None]:
model = create_model()

model.fit(dataset, verbose=2, epochs=25, callbacks=[text_gen_callback])

Epoch 1/25
175/175 - 7s - loss: 1.5310 - dense_6_loss: 1.5310
the oratorit conellle whenorthe ond som t tr oro croowher oul staridrofe.
Epoch 2/25
175/175 - 4s - loss: 1.3265 - dense_6_loss: 1.3265
the arevinghon tate that ame aid cofese coonct trorsthat oman the watite.
Epoch 3/25
175/175 - 4s - loss: 1.2144 - dense_6_loss: 1.2144
the wofit ing ite olf him teer in has omougr the pitsainil butats, mit.
Epoch 4/25
175/175 - 4s - loss: 1.1329 - dense_6_loss: 1.1329
the but that was o clocened turre arkad oneveasad addice, thy.
Epoch 5/25
175/175 - 4s - loss: 1.0751 - dense_6_loss: 1.0751
the wont ones they tartys in marme wheres tim thing pros the cutall wouldictioove,.
Epoch 6/25
175/175 - 4s - loss: 1.0312 - dense_6_loss: 1.0312
the was alway a the went, doese the the went the sepriter was by bsulle.
Epoch 7/25
175/175 - 4s - loss: 0.9992 - dense_6_loss: 0.9992
the same was of so ceart think theing arted to to the prerseders that ouselopp,.
Epoch 8/25
175/175 - 4s - loss: 0.9737 - dens

<tensorflow.python.keras.callbacks.History at 0x7f52a004ef10>

In [None]:
text_gen_callback.generate("the  ")

'the  twull bost had him at her each, said see, that on only and what the.'

In [None]:
text_gen_callback.generate("good ")

'good his should open it,she he recallecty was stay, in contator them, doctort.'

In [None]:
text_gen_callback.generate("transformer ")

'transformer as seented over for his brrousarable with a foot hourgly been.'