<a href="https://colab.research.google.com/github/eisbetterthanpi/RNN/blob/main/text_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title gen text eg
<pre>
QUEENE:
I had thought thou hadst a Roman; for the oracle,
Thus by All bids the man against the word,
Which are so weak of care, by old care done;
Your children were in your holy love,
And the precipitation through the bleeding throne.

BISHOP OF ELY:
Marry, and will, my lord, to weep in such a one were prettiest;
Yet now I was adopted heir
Of the world's lamentable day,
To watch the next way with his father with his face?

ESCALUS:
The cause why then we are all resolved more sons.

VOLUMNIA:
O, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, it is no sin it should be dead,
And love and pale as any will to that word.

QUEEN ELIZABETH:
But how long have I heard the soul for this world,
And show his hands of life be proved to stand.

PETRUCHIO:
I say he look'd on, if I must be content
To stay him from the fatal of our country's bliss.
His lordship pluck'd from this sentence then for prey,
And then let us twain, being the moon,
were she such a case as fills m
</pre>

In [1]:
# @title tf data
# https://www.tensorflow.org/text/tutorials/text_generation

import tensorflow as tf
import numpy as np
import os
import time

path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
vocab = sorted(set(text))
ids_from_chars = tf.keras.layers.StringLookup(vocabulary=list(vocab), mask_token=None)
# use the get_vocabulary() method of the tf.keras.layers.StringLookup layer so that the [UNK] tokens is set the same way.
chars_from_ids = tf.keras.layers.StringLookup(vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
print(all_ids)
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
# for ids in ids_dataset.take(10):
#     print(chars_from_ids(ids).numpy().decode('utf-8'))
seq_length = 100

sequences = ids_dataset.batch(seq_length+1, drop_remainder=True) # convert individual chars to seqs of the desired len

# Each input sequence will contain seq_length characters from the text.
# For each input sequence, the corresponding targets contain the same length of text, except shifted one character to the right.
# So break the text into chunks of seq_length+1
def split_input_target(sequence):
    return sequence[:-1], sequence[1:] # input_text, target_text

dataset = sequences.map(split_input_target)
# for input_example, target_example in dataset.take(1):

BATCH_SIZE = 64
# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences, so it doesn't attempt to shuffle the entire sequence in memory.
# Instead,it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000
dataset = (dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.experimental.AUTOTUNE))



Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt
tf.Tensor([19 48 57 ... 46  9  1], shape=(1115394,), dtype=int64)


In [None]:
# @title tf model
class MyModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(rnn_units, return_sequences=True, return_state=True)
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)
        if return_state: return x, states
        else: return x

vocab_size = len(ids_from_chars.get_vocabulary())
embedding_dim = 256
rnn_units = 1024
model = MyModel(vocab_size=vocab_size, embedding_dim=embedding_dim, rnn_units=rnn_units)
# model.summary()



In [None]:

for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

# sample from the output distribution, to get actual character indices
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
print(sampled_indices)
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

# check that the exponential of the mean loss is approximately equal to the vocabulary size
tf.exp(example_batch_mean_loss).numpy()

# Configure the training procedure
model.compile(optimizer='adam', loss=loss)
EPOCHS = 20
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])



(64, 100, 66) # (batch_size, sequence_length, vocab_size)


In [None]:
# @title tf generate

class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        values=[-float('inf')]*len(skip_ids), # Put a -inf at each bad index.
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states, return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    predicted_logits = predicted_logits + self.prediction_mask # Apply the prediction mask: prevent "[UNK]" from being generated.

    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1) # Sample output logits
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    predicted_chars = self.chars_from_ids(predicted_ids)
    return predicted_chars, states

one_step_model = OneStep(model, chars_from_ids, ids_from_chars)


In [None]:
# @title inference
states = None
next_char = tf.constant(['ROMEO:'])
# next_char = tf.constant(['ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:'])
result = [next_char]

start = time.time()
for n in range(100):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)
end = time.time()

print(tf.strings.join(result)[0].numpy().decode("utf-8"))
print('\nRun time:', end - start)



In [None]:
# @title save
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, save_weights_only=True)

tf.saved_model.save(one_step_model, 'one_step')
one_step_reloaded = tf.saved_model.load('one_step')



In [None]:
# @title CustomTraining
# The above training procedure is simple, but does not give you much control. It uses teacher-forcing which prevents bad predictions from being fed back to the model, so the model never learns to recover from mistakes
# https://www.tensorflow.org/guide/eager
class CustomTraining(MyModel):
    @tf.function
    def train_step(self, inputs):
        inputs, labels = inputs
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.loss(labels, predictions)
        grads = tape.gradient(loss, model.trainable_variables) # tf.GradientTape to track the gradients
        self.optimizer.apply_gradients(zip(grads, model.trainable_variables))
        return {'loss': loss}

model = CustomTraining(vocab_size=len(ids_from_chars.get_vocabulary()), embedding_dim=embedding_dim, rnn_units=rnn_units)
model.compile(optimizer = tf.keras.optimizers.Adam(), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))
model.fit(dataset, epochs=1)



In [None]:
# @title custom train

# complete custom training loop
EPOCHS = 10
mean = tf.metrics.Mean()

for epoch in range(EPOCHS):
    start = time.time()
    mean.reset_states()
    for (batch_n, (inp, target)) in enumerate(dataset):
        logs = model.train_step([inp, target])
        mean.update_state(logs['loss'])
        if batch_n % 50 == 0:
            template = f"Epoch {epoch+1} Batch {batch_n} Loss {logs['loss']:.4f}"
            print(template)

    # saving (checkpoint) the model every 5 epochs
    if (epoch + 1) % 5 == 0:
        model.save_weights(checkpoint_prefix.format(epoch=epoch))

    print(f'Epoch {epoch+1} Loss: {mean.result().numpy():.4f}')
    print(f'Time taken for 1 epoch {time.time() - start:.2f} sec')
    print("_"*80)

model.save_weights(checkpoint_prefix.format(epoch=epoch))


In [44]:
# @title data
# https://github.com/Sam-Armstrong/tinyGPT/blob/main/Training.py
# https://colab.research.google.com/github/karpathy/minGPT/blob/master/play_char.ipynb
# https://github.com/karpathy/nanoGPT

import torch
import torch.nn as nn
from torch.utils.data import Dataset

class CharDataset(Dataset): # https://github.com/karpathy/minGPT
    def __init__(self, raw_data, seq_len):
        data = ''.join(raw_data)
        chars = sorted(list(set(data)))
        self.vocab_size = len(chars) # 283
        self.stoi = {ch:i for i,ch in enumerate(chars)}
        self.itos = {i:ch for i,ch in enumerate(chars)}
        self.data = self.data_process(data) # list of int
        self.seq_len = seq_len

    def data_process(self, data): # str 10780437
        return torch.tensor([self.stoi.get(c) for c in data]) # list of int 4570571 # stoi.get(c,UNK_IDX)

    def __len__(self):
        # return len(self.data) - self.seq_len
        return len(self.data)//(self.seq_len+1)

    def __getitem__(self, idx):
        # dix = self.data[idx:idx + self.seq_len + 1]
        dix = self.data[idx*(self.seq_len+1) : (idx+1)*(self.seq_len+1)]
        x, y = dix[:-1], dix[1:]
        return x, y

import requests
url='https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt'

out=requests.get(url)
with open("shakespeare.txt", "wb") as f:
    f.write(out.content)
text=out.content.decode(encoding='utf-8')

# vocab = sorted(set(text))

# data = list(open('input.txt', 'r').read()) # for using a text corpus contained within a .txt file
# from torchtext.datasets import WikiText2
# train_iter, val_iter, test_iter = WikiText2() # line by line of wiki  = Valkyria Chronicles III =
seq_len = 128
train_dataset = CharDataset(text, seq_len) # one line of poem is roughly 50 characters
# test_dataset = CharDataset(test_iter, seq_len) # one line of poem is roughly 50 characters
from torch.utils.data.dataloader import DataLoader
batch_size = 512 #512
train_loader = DataLoader(train_dataset, shuffle = True, pin_memory = True, batch_size = batch_size, num_workers = 2) # num_workers = 4
# test_loader = DataLoader(test_dataset, shuffle = True, pin_memory = True, batch_size = batch_size, num_workers = 0)

def encode(context): return torch.tensor([train_dataset.stoi.get(c) for c in context], device=device)
def decode(x): return ''.join([train_dataset.itos[int(i)] for i in x])
# for x,y in train_loader:
#     break
# n=2
# print(decode(x[n]))
# print(decode(y[n]))


In [50]:
# @title torch model

import torch.nn as nn
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class MyModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super(MyModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # self.gru = nn.GRU(embedding_dim, rnn_units, batch_first=True, return_sequences=True)
        self.gru = nn.GRU(embedding_dim, rnn_units, batch_first=True)
        self.dense = nn.Linear(rnn_units, vocab_size)

    def forward(self, x, hidden=None):
        x = self.embedding(x)
        if hidden is None: hidden = torch.zeros(1, x.size(0), rnn_units, device=device)
        output, hidden = self.gru(x, hidden)
        output = self.dense(output)
        if hidden is not None:  # Maintain hidden state for potential sequential usage
            return output, hidden
        else:
            return output

vocab_size = train_dataset.vocab_size
# vocab_size = len(ids_from_chars.get_vocabulary())
embedding_dim = 256
rnn_units = 1024
model = MyModel(vocab_size=vocab_size, embedding_dim=embedding_dim, rnn_units=rnn_units).to(device)
# model.summary()


In [4]:
# @title train, gen

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred,_ = model(X)
        loss = loss_fn(pred.reshape(-1,pred.shape[-1]), y.reshape(-1))
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

import torch
from torch.nn import functional as F
def generate(model, context, max_steps = 64, temperature=1):
    # x = torch.tensor([train_dataset.stoi.get(c) for c in context], device=device).unsqueeze(0)
    x=ix = torch.tensor([train_dataset.stoi.get(c) for c in context], device=device).unsqueeze(0)
    model.eval()
    hidden=None
    with torch.no_grad():
        for n in range(max_steps):
            # output, hidden = model(x, hidden)
            output, hidden = model(ix, hidden)
            hidden=hidden[:, -1, :].unsqueeze(1)
            output = output[:, -1, :] # get logit for last character
            output = output/temperature
            output = F.softmax(output, dim = -1) # vocab_size to char
            ix = torch.multinomial(output, num_samples = 1) # rand sample by output distribution
            x = torch.cat((x, ix),1)
        completion = ''.join([train_dataset.itos[int(i)] for i in x.flatten()])
        return completion

# out=generate(model, "A wi")
# print(out)


In [51]:
# train
import time
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), 1e-3, (0.9, 0.999), eps=1e-08)
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)

epochs = 20
for t in range(epochs):
    start = time.time()
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_loader, model, loss_fn, optimizer)
    # test(test_loader, model, loss_fn)
    print(time.time()-start)
    print(generate(model, "A "))
print("Done!")
torch.save(model.state_dict(), "model.pth") # save model weights to 'model.pth'
# model = NeuralNetwork(28*28, 10) # create new model
# model.load_state_dict(torch.load("model.pth")) # load model weights from 'model.pth'



Epoch 1
-------------------------------
loss: 4.184251  [    0/ 8646]
8.548993349075317
A pist.


OEN-Fcb?z
WRD;Bhin ne.
B Inen'that I ent cour; athave, h
Epoch 2
-------------------------------
loss: 2.494669  [    0/ 8646]
8.65460467338562
A Rad is thowon yt g'sengadt brewse,
Se prenercet
in kortun,
We, a
Epoch 3
-------------------------------
loss: 2.215699  [    0/ 8646]
8.724737882614136
A GFud pide.
Bung'd tould thenteis forr.

LUORDENRTD:
Yole'tion:ot
Epoch 4
-------------------------------
loss: 2.047531  [    0/ 8646]
8.689782857894897
A Endiked:
Undis, and it.
Whis huse ow of cousion aperion th-
Vir,
Epoch 5
-------------------------------
loss: 1.904373  [    0/ 8646]
8.576466798782349
A nonou your all surde his evey be with by frieds, my did it years
Epoch 6
-------------------------------
loss: 1.801122  [    0/ 8646]
8.503104209899902
A Lender:

Firsill Cipevins, I cell musinage man and his wive
Byce
Epoch 7
-------------------------------
loss: 1.716901  [    0/ 8646]
8

In [52]:

print(generate(model, "A ",500))


A here most sleep
The strangers upon myself kiss our guess
Dight upon small meeps their counterately.

BENVOLIO:
Papelous! Why, as it brought is tender prepared,
Intone themselves that must ne'er whils yare?
What have I may have none, ye not how it.

PAULINA:
How! courdering there; indeed, I comes?
Why not hear the most vaults are blood!

GLOUCIO:
Sirrah, the Prettians are sovereign.
God find good, my sovereign'st give me keeps about!
The huttiffer broaches, and given us be as stone
And orinables 
