In [3]:
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing
import numpy as np
import math
import os
import time

In [4]:
# get all train texts
dir = "../"
train_texts = ""
files = os.listdir(dir + "shahname/train")
for path in files:
	train_texts += open(dir + "shahname/train/" + path, 'rb').read().decode(encoding='utf-8')

In [None]:
file_text = open(dir + "allShahnameWords.txt", 'rb').read().decode(encoding = "utf-8")
vocab = sorted(set(file_text))
vocab.append(" ")

ids_from_chars = preprocessing.StringLookup(
    vocabulary=list(vocab), mask_token=None)

In [None]:
# turn ids into human readable text

chars_from_ids = tf.keras.layers.experimental.preprocessing.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

def text_from_ids(ids):
  return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)


In [None]:
all_ids = ids_from_chars(tf.strings.unicode_split(train_texts, 'UTF-8'))
all_ids

<tf.Tensor: shape=(2426146,), dtype=int64, numpy=array([41, 38, 47, ..., 36, 20,  1])>

In [None]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode('utf-8'))

چ
و
 
د
ش
ت
 
ا
ز
 


In [None]:
seq_length = 100
examples_per_epoch = len(train_texts)//(seq_length+1)

In [None]:
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(1):
  tmp = chars_from_ids(seq).numpy()
  print(''.join([str(s, encoding='UTF-8') for s in tmp]))
  # print(chars_from_ids(seq))

چو دشت از گیا گشت چون پرنیان[UNK]ببستند گردان توران میان
سپاهی بیامد ز ترکان و چین[UNK]هم از گرزداران خاور زم


In [None]:
for seq in sequences.take(5):
  print(text_from_ids(seq).numpy().decode("utf-8"))

In [None]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text


dataset = sequences.map(split_input_target)

In [None]:
for input_example, target_example in dataset.take(1):
    print("Input :", text_from_ids(input_example).numpy().decode("utf-8"))
    print("Target:", text_from_ids(target_example).numpy().decode("utf-8"))

Input : چو دشت از گیا گشت چون پرنیان[UNK]ببستند گردان توران میان
سپاهی بیامد ز ترکان و چین[UNK]هم از گرزداران خاور ز
Target: و دشت از گیا گشت چون پرنیان[UNK]ببستند گردان توران میان
سپاهی بیامد ز ترکان و چین[UNK]هم از گرزداران خاور زم


In [None]:
# Batch size
BATCH_SIZE = 64

BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

<PrefetchDataset shapes: ((64, 100), (64, 100)), types: (tf.int64, tf.int64)>

In [None]:
# Model
# Length of the vocabulary in chars
vocab_size = len(vocab)

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.LSTM(rnn_units,
                                    #stateful=True 
                                   return_sequences=True,
                                   return_state=True
    )
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      h_state, c_state = self.gru.get_initial_state(x)
    else:
      h_state, c_state = states
    #x, states = self.gru(x, initial_state=states, training=training)
    x, h_state,c_state = self.gru(x, initial_state=[h_state,c_state], training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, h_state, c_state
    else:
      return x

In [None]:
vocab_size

In [None]:
model = MyModel(
    # Be sure the vocabulary size matches the `StringLookup` layers.
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [None]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 100, 48) # (batch_size, sequence_length, vocab_size)


In [None]:
model.summary()

Model: "my_model_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_1 (Embedding)      multiple                  12288     
_________________________________________________________________
lstm_1 (LSTM)                multiple                  5246976   
_________________________________________________________________
dense_1 (Dense)              multiple                  49200     
Total params: 5,308,464
Trainable params: 5,308,464
Non-trainable params: 0
_________________________________________________________________


In [None]:
# Befor training
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
example_batch_loss = loss(target_example_batch, example_batch_predictions)
mean_loss = example_batch_loss.numpy().mean()
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", mean_loss)

Prediction shape:  (64, 100, 48)  # (batch_size, sequence_length, vocab_size)
Mean loss:         3.8723047


In [None]:
tf.exp(mean_loss).numpy()

48.053005

In [None]:
model.compile(optimizer='adam', loss=loss)

## Training

In [None]:
EPOCHS = 1
history = model.fit(dataset, epochs=EPOCHS)



In [None]:
# Save model
save_dir = "model-char-v3.h5"
# model.save_weights(save_dir)  # to store

# Load model
model.load_weights(save_dir)  # to load

In [None]:
class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # h_state, c_state = states
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, h_state, c_state = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, h_state, c_state

In [None]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

print("Input:\n", text_from_ids(input_example_batch[0]).numpy().decode("utf-8"))
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy().decode("utf-8"))

Input:
 ار باشد مرا نیک‌بخت
هرانگه که آید گمانتان که من[UNK]رسیدم بدان پاک‌رای انجمن
غو دیده باید که از دیدگاه[UNK]ک

Next Char Predictions:
 ءچؤزدضظشکی غءزشواشحغثامیشذذذ»ءٔیژؤ[UNK])تئؤ،رصکبکضنج؟ءذش«عپشخژأگءدؤ[UNK]عظط«ئحس( ‌ أ
ضدأا »نقضغپ)ء(نء»کؤ)چ؟ز


In [None]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

## Generate fake texts

In [None]:
start = time.time()
next_char = tf.constant(['هم‌اندر زمان لشکر آنجا رسید'])
states = None
zero_out = tf.keras.layers.Lambda(lambda x: tf.zeros_like(x))(next_char)
result = [next_char]

for n in range(120):
  next_char, h_state, c_state = one_step_model.generate_one_step(next_char, states=states)
  states = [h_state, c_state]
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

هم‌اندر زمان لشکر آنجا رسید
هشیوار دستور پستان کنید
دلیران هوا باز داده بدوخت
بشد با کبان درد و فرشیدوردبارد اندر هواست
چو آمد بنزدیک آن برکنند
تو  

________________________________________________________________________________

Run time: 1.1943893432617188


## Generated texts

> بگرییم چندی بران رزمگاه

> بدو گفت موبد که ای شهریار


> توایم چرا با چنین یک زمانه همیوست ماه

> شمار سپاه اندر آید ز جای