# Урок 9. Языковое моделирование
# Задание
Разобраться с моделью генерации текста, собрать самим или взять датасет с вебинара и обучить генератор текстов

In [1]:
import tensorflow as tf
tf.enable_eager_execution()
import matplotlib.pyplot as plt
import numpy as np
import os
import time

import warnings
warnings.filterwarnings('ignore')

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])





In [2]:
text = open('evgenyi_onegin.txt','rb').read().decode(encoding = 'utf-8')
print('Length of text: {} characters'.format(len(text)))

Length of text: 286984 characters


In [3]:
text = text + text

In [4]:
vocab = sorted(set(text))
print('{} unique characters'.format(len(vocab)))

131 unique characters


In [5]:
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

text_as_int = np.array([char2idx[c] for c in text])

In [6]:
seq_length = 100
examples_per_epoch = len(text)//(seq_length+1)

char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)

for i in char_dataset.take(5):
    print(idx2char[i.numpy()])

А
л
е
к
с


In [7]:
sequences = char_dataset.batch(seq_length+1, drop_remainder = True)

for item in sequences.take(5):
    print(repr(''.join(idx2char[item.numpy()])))

'Александр Сергеевич Пушкин\n\n                                Евгений Онегин\n                          '
'      Роман в стихах\n\n                        Не мысля гордый свет забавить,\n                        '
'Вниманье дружбы возлюбя,\n                        Хотел бы я тебе представить\n                        '
'Залог достойнее тебя,\n                        Достойнее души прекрасной,\n                        Свят'
'ой исполненной мечты,\n                        Поэзии живой и ясной,\n                        Высоких д'


In [8]:
def split_input_target(chunk):
    input_text = chunk[:-1]
    target_text = chunk[1:]
    return input_text, target_text

dataset = sequences.map(split_input_target)

In [9]:
for input_example, target_example in  dataset.take(1):
    print('Input data: ', repr(''.join(idx2char[input_example.numpy()])))
    print('Target data:', repr(''.join(idx2char[target_example.numpy()])))
     

Input data:  'Александр Сергеевич Пушкин\n\n                                Евгений Онегин\n                         '
Target data: 'лександр Сергеевич Пушкин\n\n                                Евгений Онегин\n                          '


In [10]:
BATCH_SIZE = 64
BUFFER_SIZE = 10000

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder = True)

In [11]:
vocab_size = len(vocab)
embedding_dim = 128
rnn_units = 1024

In [12]:
class RNNgenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super(RNNgenerator, self).__init__()
        
        self.emb = tf.keras.layers.Embedding(vocab_size, embedding_dim)
                                 
        self.gru1 = tf.keras.layers.GRU(rnn_units,
                            return_sequences = True,
                            stateful = False,
                            recurrent_initializer = 'glorot_uniform')

        self.gru2 = tf.keras.layers.GRU(rnn_units,
                            return_sequences = True,
                            stateful = False,
                            recurrent_initializer = 'glorot_uniform')
        self.gru3 = tf.keras.layers.GRU(rnn_units,
                            return_sequences = True,
                            stateful = False,
                            recurrent_initializer = 'glorot_uniform')

        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, x):
        emb_x = self.emb(x)
        x = self.gru1(emb_x)
        x = self.gru2(x)
        x = self.gru3(x)

        x = self.dense(x)
        return x 

model = RNNgenerator(vocab_size,
                     embedding_dim,
                     rnn_units = rnn_units
                     )

In [13]:
def loss(labels, logits):
    return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits = True)

In [14]:
model.compile(optimizer = 'adam', loss = loss, metrics = 'accuracy')

In [15]:
checkpoint_dir = 'RNN_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
                                filepath = checkpoint_prefix,
                                save_freq = 5,
                                save_weights_only = True)

In [None]:
EPOCHS = 200
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

In [None]:
plt.plot(history.history['loss'])
plt.show()

In [None]:
tf.train.latest_checkpoint(checkpoint_dir)

In [None]:
model = RNNgenerator(vocab_size, embedding_dim, rnn_units)
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))

In [None]:
num_generate = 500
temperature = 1

In [None]:
def generate_text(model, start_string):

    input_eval = [char2idx[s] for s in start_string]
    input_eval = tf.expand_dims(input_eval, 0)

    text_generated = []

    # Here batch size == 1
    model.reset_states()
    for i in range(num_generate):
        predictions = model(input_eval)
        predictions = tf.squeeze(predictions, 0)
        
        predictions = predictions / temperature
        predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()

        input_eval = tf.expand_dims([predicted_id], 0)

        text_generated.append(idx2char[predicted_id])

    return (start_string + ''.join(text_generated))   

In [None]:
text_ = generate_text(model, start_string = u"И вот идет уже ")
print(text_)

In [None]:
model = RNNgenerator(vocab_size, embedding_dim, rnn_units) 
model.load_weights('RNN_checkpoints/ckpt_200')
model.build(tf.TensorShape([1, None]))

In [None]:
text_ = generate_text(model, start_string = u"И вот идет уже ")
print(text_)

In [None]:
class RNNgenerator_1(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru1 = tf.keras.layers.GRU(rnn_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.gru2 = tf.keras.layers.GRU(rnn_units,
                                    return_sequences=True,
                                    return_state=True,
                                    recurrent_initializer='glorot_uniform')
    self.gru3 = tf.keras.layers.GRU(rnn_units,
                                    return_sequences=True,
                                    return_state=True,
                                    recurrent_initializer='glorot_uniform')
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, x, states=None, return_state=False):

    x = self.embedding(x)

    if states is None:
      states = self.gru1.get_initial_state(x)

    x, states = self.gru1(x, initial_state=states)
    x, states = self.gru2(x, initial_state=states)
    x, states = self.gru3(x, initial_state=states)

    x = self.dense(x)

    if return_state:
      return x, states
    else:
      return x

In [None]:
heckpoint_dir = 'RNN_checkpoints'

# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
                                filepath = checkpoint_prefix,
                                save_freq = 1,
                                save_weights_only = True)

In [None]:
model_1 = RNNgenerator_1(
                 vocab_size = vocab_size,
                 embedding_dim = embedding_dim,
                 rnn_units = rnn_units,
                )

In [None]:
model_1.compile(optimizer = 'adam', loss = loss, metrics = 'accuracy')

In [None]:
model_1.load_weights('./RNN_checkpoints/ckpt_200')

In [None]:
# Generate text

def generate_text_1(model, start_string, states, temperature):

    input_eval = [char2idx[s] for s in start_string]
    input_eval = tf.expand_dims(input_eval, 0)

    text_generated = []

    model.reset_states()
    for i in range(num_generate):
        predictions, states = model(input_eval, states=states, return_state=True)
        predictions = tf.squeeze(predictions, 0)
        
        predictions = predictions / temperature
        predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()

        input_eval = tf.expand_dims([predicted_id], 0)

        text_generated.append(idx2char[predicted_id])

    return (start_string + ''.join(text_generated))

In [None]:
num_generate = 500
temperature = 0.6
states = None
next_char = u"И вот идет уже "
text_generated = []

In [None]:
text = generate_text_1(model_1, next_char, states, temperature)
print(text)

In [None]:
class RNNgenerator_2(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru1 = tf.keras.layers.GRU(rnn_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.gru2 = tf.keras.layers.GRU(rnn_units,
                                    return_sequences=True,
                                    return_state=True,
                                    recurrent_initializer='glorot_uniform')
    self.gru3 = tf.keras.layers.GRU(rnn_units,
                                    return_sequences=True,
                                    return_state=True,
                                    recurrent_initializer='glorot_uniform')
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, x, states_1=None, states_2=None, states_3=None, return_state=False):

    x = self.embedding(x)

    if states is None:
      states_1 = self.gru1.get_initial_state(x)
      states_2 = self.gru1.get_initial_state(x)
      states_3 = self.gru1.get_initial_state(x)

    x, states_1 = self.gru1(x, initial_state=states_1)
    x, states_2 = self.gru2(x, initial_state=states_2)
    x, states_3 = self.gru3(x, initial_state=states_3)

    x = self.dense(x)

    if return_state:
      return x, states_1, states_2, states_3
    else:
      return x

In [None]:
checkpoint_dir = 'RNN_2_checkpoints'
checkpoint_dir = 'RNN_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
                                filepath = checkpoint_prefix,
                                save_freq = 1,
                                save_weights_only = True)

In [None]:
model_2 = RNNgenerator_2(
                 vocab_size = vocab_size,
                 embedding_dim = embedding_dim,
                 rnn_units = rnn_units,
                )

In [None]:
model_2.compile(optimizer = 'adam', loss = loss, metrics = 'accuracy')

In [None]:
model_2.load_weights('./RNN_checkpoints/ckpt_200')

In [None]:
def generate_text_2(model, start_string, states_1, states_2, states_3, temperature):
    # Evaluation step (generating text using the learned model)

    # Converting our start string to numbers (vectorizing)
    input_eval = [char2idx[s] for s in start_string]
    input_eval = tf.expand_dims(input_eval, 0)

    # Empty string to store our results
    text_generated = []

    # Here batch size == 1
    model.reset_states()
    for i in range(num_generate):
        predictions, states_1,states_2, states_3 = model(input_eval, states_1=states_1, states_2=states_2, states_3=states_3, return_state=True)
        predictions = tf.squeeze(predictions, 0)
        
        # using a categorical distribution to predict the character returned by the model
        predictions = predictions / temperature
        predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()

        # Pass the predicted character as the next input to the model
        # along with the previous hidden state
        input_eval = tf.expand_dims([predicted_id], 0)

        text_generated.append(idx2char[predicted_id])

    return (start_string + ''.join(text_generated))

In [None]:
num_generate = 500
temperature = .5
states_1 = None
states_2 = None
states_3 = None
next_char = u"И вот идет уже "
text_generated = []

In [None]:
text = generate_text_2(model_2, next_char, states_1, states_2, states_3, temperature)
print(text)