In [None]:
import codecs
import numpy as np
import tensorflow as tf

In [None]:
data_fpath = tf.keras.utils.get_file(
    'shakespeare.txt', 
    'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')

text = codecs.open(data_fpath, 'r', encoding='utf8').read()
print(text[:250])

In [None]:
# создание словаря символов
vocab = sorted(set(text)) # отбор и соритровка уникалных слов в тексте
VOCAB_SIZE = len(vocab) # размер словаря

print('Vocab: {}'.format(vocab))
print('{} unique characters'.format(VOCAB_SIZE))

char2idx = {u:i for i, u in enumerate(vocab)} # создание словаря индекс:символ
idx2char = np.array(vocab) # массив словаря

text_as_int = np.array([char2idx[c] for c in text]) # посимвольное превращение текста в последовательность целых чисел

print('Example of the encoded text: {}'.format(text_as_int[:13]))

In [None]:
SEQ_LEN = 1000 # длина последовательности
BATCH_SIZE = 64 # размер батча

input_seqs = [] # входящая последовательность
target_seqs = [] # целевая последовательность

num_seqs = len(text_as_int) // (SEQ_LEN+1)

for i in range(num_seqs):
    seq = text_as_int[i:i+SEQ_LEN+1]
    input_seqs.append(np.array(seq[:-1])) # входящая последовательность без последнего символа <START
    target_seqs.append(np.array(seq[1:])) # целевая последовательность без первого символа START>

input_seqs = np.array(input_seqs)
target_seqs = np.array(target_seqs)

input_seqs = input_seqs[:(len(input_seqs)//BATCH_SIZE)*BATCH_SIZE]
target_seqs = target_seqs[:(len(input_seqs)//BATCH_SIZE)*BATCH_SIZE]

print('Input: {} ...'.format([idx2char[i] for i in input_seqs[0][:15]]))
print('Target: {} ...'.format([idx2char[i] for i in target_seqs[0][:15]]))

In [None]:
# функция для создания модели генерации текста

def build_model(batch_size):
    model = tf.keras.Sequential([
        tf.keras.layers.Embedding(VOCAB_SIZE, 256, batch_input_shape=(batch_size, None)), # превращение входящих батчей последовательностей в эмбеддинги
        tf.keras.layers.GRU(256, return_sequences=True, stateful=True), # реккуррентный слой возвращабющий всю последовательность и состоряния для каждого элемента
        tf.keras.layers.Dense(VOCAB_SIZE), # слой генерации текста, возвращает логиты
    ])
    model.build()
    return model

model = build_model(BATCH_SIZE)

In [None]:
# компиляция и обучение модели
EPOCHS = 100

loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss)

history = model.fit(input_seqs, 
                    target_seqs, 
                    epochs=EPOCHS,
                    batch_size=BATCH_SIZE)

In [None]:
# модель для инференса с входом батча 1
model_inf = build_model(1)

# копирование весов из каждого слоя обученной модели в каждый слой модели инференса
for i in range(len(model_inf.layers)):
    for j in range(len(model_inf.layers[i].weights)):
        model_inf.layers[i].weights[j].assign(model.layers[i].weights[j])

In [None]:
# для генерациии текста с начальным зерном, конечной длинной текста и температурой
def generate_text(model, seed, out_len, temperature:float):

    text_generated = []

    # Обнуляем состояние модели
    model.reset_states()
    
    # Конвертируем входную цепочку в индексы
    inp = np.array([char2idx[s] for s in seed])

    for i in range(out_len):

        # Получаем предсказания для входной цепочки inp
        # pred - матрица размерности (длина цепочки, распределение по классам)
        # На первой итерации цикла длина цепочки равна длине seed, а затем длина равна 1
        pred = model(inp[None, ...])[0]

        # Для получения символа сэмплируем из распределения
        # БОльшая температура соответствует более случайному предсказанию символа
        pred = pred / temperature
        pred_c = tf.random.categorical(pred, num_samples=1)[-1][0].numpy()
        
        text_generated.append(idx2char[pred_c])
        
        # Новый вход -- только что сгенерированный символ
        inp = np.array([pred_c])

    return (seed + ''.join(text_generated))

In [None]:
print(generate_text(model_inf, seed=u"MONTAGUE:", out_len=500, temperature=1))

In [None]:
print(generate_text(model_inf, seed=u"MONTAGUE:", out_len=500, temperature=0.5))

In [None]:
print(generate_text(model_inf, seed=u"MONTAGUE:", out_len=500, temperature=1.5))

In [None]:
def build_model(batch_size):
    model = tf.keras.Sequential([
        tf.keras.layers.Embedding(VOCAB_SIZE, 256, batch_input_shape=(batch_size, None)),
        tf.keras.layers.GRU(256, return_sequences=True, stateful=True, dropout=.5),
        tf.keras.layers.GRU(256, return_sequences=True, stateful=True, dropout=.5),
        tf.keras.layers.Dense(VOCAB_SIZE),
    ])
    model.build()
    return model

model = build_model(BATCH_SIZE)

In [None]:
EPOCHS = 50

loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss)

history = model.fit(input_seqs, 
    target_seqs,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE)

In [None]:
model_inf = build_model(1)

for i in range(len(model_inf.layers)):
    for j in range(len(model_inf.layers[i].weights)):
        model_inf.layers[i].weights[j].assign(model.layers[i].weights[j])

In [None]:
print(generate_text(model_inf, seed=u"MONTAGUE:", out_len=500, temperature=1))

In [None]:
VOCAB_SIZE_IMDB = 20000 # Количество слов в словаре

(train_data, train_labels), (test_data, test_labels) = \
    tf.keras.datasets.imdb.load_data(num_words=VOCAB_SIZE_IMDB)

In [None]:
data = np.row_stack([train_data, test_data])[0]

In [None]:
# словарь всех слов в корпусе типа ключ:значение
word_index = tf.keras.datasets.imdb.get_word_index()

# добавление спец.токенов в словарь
word_index = {k:(v+3) for k,v in word_index.items()} 
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNKNOWN>"] = 2

# перевернутый словарь значение:ключ
reverse_word_index = {v: k for k, v in word_index.items()}

# последовательность индексов в текст
def decode_review(text):
    return ' '.join([reverse_word_index.get(i, ' ') for i in text])

# текст в последовательность индексов
def encode_review(text):
    words = text.lower().split()
    words = ['<START>'] + words
    idxs = [word_index.get(word, word_index['<UNKNOWN>']) for word in words]
    return idxs

print('Example of a decoded review: \n{}'.format(decode_review(train_data[0])))

In [None]:
# пример декодирования текста
d = []
for i in range(len(data)):
  d.append(decode_review(data[i]))
texts = ' '.join(d)

In [None]:
# создание словаря с помощью Tokenizer
from keras_preprocessing.text import Tokenizer

target_regex = '!"#$%&()*+,-./:;=?@[\]^_`{|}~\t\n\'0123456789'
# Tokenizer позволяет векторизовать наш корпус, превращая каждое предложение
# в последовательность целых чисел, где каждое целое число является индексом
# токена во внутреннем словаре
tokenizer = Tokenizer(filters=target_regex) # создание объекта Tokenizer с фильтром
tokenizer.fit_on_texts(df_m_1 + df_f_1) # анализ корпусов и составление словаря
VOCAB_SIZE = len(tokenizer.word_index) + 1 # размер полученного словаря

tokenizer.word_index.get('<PAD>') # словарь типа значение:ключ .get() выдает ключ по слову
tokenizer.index_word.get(0) # словарь типа ключ:значение .get() выдает слово по ключу

print('Vocabulary size : {}'.format(VOCAB_SIZE))

In [None]:
MAX_SEQ_LEN = 256 # Финальная длина последовательности

train_data = tf.keras.preprocessing.sequence.pad_sequences(
    train_data,
    value=word_index["<PAD>"],
    padding='post',
    maxlen=MAX_SEQ_LEN)

test_data = tf.keras.preprocessing.sequence.pad_sequences(
    test_data,
    value=word_index["<PAD>"],
    padding='post',
    maxlen=MAX_SEQ_LEN)

print("Length examples: {}".format([len(train_data[0]), len(train_data[1])]))
print('=====================================')
print("Entry example: {}".format(train_data[0]))

In [None]:
x_val = train_data[:10000]
partial_x_train = train_data[10000:]

y_val = train_labels[:10000]
partial_y_train = train_labels[10000:]

In [None]:
EMB_SIZE = 16 # Размер векторного представления (эмбеддинга)
    
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(VOCAB_SIZE, 16),
    tf.keras.layers.GlobalAveragePooling1D(),
    tf.keras.layers.Dense(16, activation=tf.nn.relu),
    tf.keras.layers.Dense(1, activation=tf.nn.sigmoid),
])

model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['acc'])

In [None]:
BATCH_SIZE = 512
NUM_EPOCHS = 20

history = model.fit(partial_x_train,
                    partial_y_train,
                    epochs=NUM_EPOCHS,
                    batch_size=BATCH_SIZE,
                    validation_data=(x_val, y_val),
                    verbose=1)

In [None]:
results = model.evaluate(test_data, test_labels)

print('Test loss: {:.4f}'.format(results[0]))
print('Test accuracy: {:.2f} %'.format(results[1]*100))

In [None]:
epochs = range(1, len(history.history['acc']) + 1)

plt.figure()
plt.plot(epochs, history.history['loss'], 'bo', label='Training loss')
plt.plot(epochs, history.history['val_loss'], 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid()

plt.figure()
plt.plot(epochs, history.history['acc'], 'bo', label='Training acc')
plt.plot(epochs, history.history['val_acc'], 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.grid()

In [None]:
text = 'It was a really good movie it was so fantastic'

encoded = encode_review(text)
prediction = model.predict(np.array(encoded)[None, :])[0,0]

print(prediction)
print('Positive' if prediction > 0.5 else 'Negative')

Encoder-Decoder

In [None]:
# русско-французкий разговорник
data_fpath = '/content/rus.txt'
max_sentences = 10000

input_texts = []
target_texts = []
lines = codecs.open(data_fpath, 'r', encoding='utf8').readlines()[:max_sentences]
for line in lines:
    input_text, target_text, = line.split('\t')[:2] # разделение на русские и французкие слова
    input_texts.append(input_text)
    target_texts.append(target_text)

In [None]:
# создание словаря символов
def prepare_vocab(texts):
    vocab = sorted(set(''.join(texts)))
    # добавление токена начала и конца последовательности
    vocab.append('<START>') 
    vocab.append('<END>')
    vocab_size = len(vocab)
    char2idx = {u:i for i, u in enumerate(vocab)} # словарь ключ:значение
    idx2char = np.array(vocab) # словарь значение:ключ
    return vocab_size, char2idx, idx2char 

INPUT_VOCAB_SIZE, input_char2idx, input_idx2char = prepare_vocab(input_texts)
TARGET_VOCAB_SIZE, target_char2idx, target_idx2char = prepare_vocab(target_texts)

In [None]:
# текст в последовательность чисел
input_texts_as_int = [[input_char2idx[c] for c in text] for text in input_texts]
target_texts_as_int = [[target_char2idx[c] for c in text] for text in target_texts]

encoder_input_seqs = [np.array(text) for text in input_texts_as_int] # последовательность для энкодера
decoder_input_seqs = [] # входящая последовательность для декодера
decoder_target_seqs = [] # целевая последовательность для декодера

# добавление спец.токенов начала и конца во входящую и целевую последовательность для декодера
for target_text in target_texts_as_int:
    decoder_input_seqs.append(np.array([target_char2idx['<START>']] + target_text))
    decoder_target_seqs.append(np.array(target_text + [target_char2idx['<END>']]))

In [None]:
# приведение длины всех последовательностей к максимальной
max_enc_seq_length = max([len(seq) for seq in encoder_input_seqs])
max_dec_seq_length = max([len(seq) for seq in decoder_input_seqs])

encoder_input_seqs = tf.keras.preprocessing.sequence.pad_sequences(
    encoder_input_seqs,
    value=input_char2idx[' '],
    padding='post',
    maxlen=max_enc_seq_length)

decoder_input_seqs = tf.keras.preprocessing.sequence.pad_sequences(
    decoder_input_seqs,
    value=target_char2idx[' '],
    padding='post',
    maxlen=max_dec_seq_length)

decoder_target_seqs = tf.keras.preprocessing.sequence.pad_sequences(
    decoder_target_seqs,
    value=target_char2idx[' '],
    padding='post',
    maxlen=max_dec_seq_length)

In [None]:
# Encoder с одним LSTM слоем
class Encoder(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.embed = tf.keras.layers.Embedding(INPUT_VOCAB_SIZE, EMB_SIZE)
        self.lstm = tf.keras.layers.LSTM(H_SIZE, return_sequences=False, return_state=True)
        
    def call(self, x):
        out = self.embed(x)
        _, h, c = self.lstm(out)
        state = (h, c)
        return state

# Decoder с одним LSTM слоем
class Decoder(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.embed = tf.keras.layers.Embedding(TARGET_VOCAB_SIZE, EMB_SIZE)
        self.lstm = tf.keras.layers.LSTM(H_SIZE, return_sequences=True, return_state=True)
        self.fc = tf.keras.layers.Dense(TARGET_VOCAB_SIZE, activation='softmax')
        
    def call(self, x, init_state):
        out = self.embed(x)
        out, h, c = self.lstm(out, initial_state=init_state)
        out = self.fc(out)
        state = (h, c)
        return out, state

In [None]:
H_SIZE = 256 # Размерность скрытого состояния LSTM
EMB_SIZE = 256 # размерность эмбеддингов (и для входных и для выходных цепочек)

encoder_model = Encoder()
decoder_model = Decoder()

encoder_inputs = tf.keras.layers.Input(shape=(None,))
decoder_inputs = tf.keras.layers.Input(shape=(None,))

enc_state = encoder_model(encoder_inputs)
decoder_outputs, _ = decoder_model(decoder_inputs, enc_state)

seq2seq = tf.keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
BATCH_SIZE = 64 # размер входящего батча
EPOCHS = 50

loss = tf.losses.SparseCategoricalCrossentropy()
seq2seq.compile(optimizer='rmsprop', loss=loss, metrics=['accuracy'])

seq2seq.fit([encoder_input_seqs, decoder_input_seqs], decoder_target_seqs,
          batch_size=BATCH_SIZE,
          epochs=EPOCHS)

In [None]:
# инференс для последовательности. При вызове возвращает перевод на другом языке
def seq2seq_inference(input_seq):
    state = encoder_model(input_seq) # получение скрытого состояия и памяти от энкодера

    target_seq = np.array([[target_char2idx['<START>']]]) # целевая последовательность из спец.токена для декодера

    decoded_sentence = ''

    while True:
        output_tokens, state = decoder_model(target_seq, state) # передача скрытого состояния и памяти в декодер

        sampled_token_index = np.argmax(output_tokens[0, -1, :]) # 
        sampled_char = target_idx2char[sampled_token_index] # из числа в слово
        decoded_sentence += sampled_char

        # прекращение генерации при получении спец токена либо достижения максимальной последовательности
        if (sampled_char == '<END>' or
           len(decoded_sentence) > max_dec_seq_length):
            break

        target_seq = np.array([[sampled_token_index]]) #

    return decoded_sentence

In [None]:
for seq_index in range(0, 20):
    input_seq = encoder_input_seqs[seq_index: seq_index + 1]
    decoded_sentence = seq2seq_inference(input_seq)
    print('-')
    print('Input sentence:', input_texts[seq_index])
    print('Result sentence:', decoded_sentence)
    print('Target sentence:', target_texts[seq_index])

In [None]:
# Encoder с двумя LSTM слоями
class Encoder(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.embed = tf.keras.layers.Embedding(INPUT_VOCAB_SIZE, EMB_SIZE)
        self.lstm_1 = tf.keras.layers.LSTM(H_SIZE, return_sequences=False, return_state=True)
        self.lstm_2 = tf.keras.layers.LSTM(H_SIZE, return_sequences=False, return_state=True)
        
    def call(self, x):
        out = self.embed(x)
        _, h_1, c_1 = self.lstm_1(out)
        _, h_2, c_2 = self.lstm_2(out)
        state = ((h_1, c_1), (h_2, c_2))
        return state

# Decoder с двумя LSTM слоями
class Decoder(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.embed = tf.keras.layers.Embedding(TARGET_VOCAB_SIZE, EMB_SIZE)
        self.lstm_1 = tf.keras.layers.LSTM(H_SIZE, return_sequences=True, return_state=True)
        self.lstm_2 = tf.keras.layers.LSTM(H_SIZE, return_sequences=True, return_state=True)
        self.fc = tf.keras.layers.Dense(TARGET_VOCAB_SIZE, activation='softmax')
        
    def call(self, x, init_state):
        out = self.embed(x)
        init_state_1, init_state_2 = init_state
        out_1, h_1, c_1 = self.lstm_1(out, initial_state=init_state_1)
        out_2, h_2, c_2 = self.lstm_2(out_1, initial_state=init_state_2)
        out_fc = self.fc(out_2)
        state = ((h_1, c_1), (h_2, c_2))
        return out_fc, state

encoder_model = Encoder()
decoder_model = Decoder()

encoder_inputs = tf.keras.layers.Input(shape=(None,))
decoder_inputs = tf.keras.layers.Input(shape=(None,))

enc_state = encoder_model(encoder_inputs)
decoder_outputs, _ = decoder_model(decoder_inputs, enc_state)

seq2seq = tf.keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
BATCH_SIZE = 64
EPOCHS = 50

loss = tf.losses.SparseCategoricalCrossentropy()
seq2seq.compile(optimizer='rmsprop', loss=loss, metrics=['accuracy'])
seq2seq.fit([encoder_input_seqs, decoder_input_seqs],
            decoder_target_seqs,
            batch_size=BATCH_SIZE,
            epochs=EPOCHS)

In [None]:
for seq_index in range(0, 20):
    input_seq = encoder_input_seqs[seq_index: seq_index + 1]
    decoded_sentence = seq2seq_inference(input_seq)
    print('-')
    print('Input sentence:', input_texts[seq_index])
    print('Result sentence:', decoded_sentence)
    print('Target sentence:', target_texts[seq_index])

In [None]:
# Encoder с двунаправленым LSMT слоем
class Encoder(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.embed = tf.keras.layers.Embedding(INPUT_VOCAB_SIZE, EMB_SIZE)
        self.lstm_bi = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(H_SIZE, return_sequences=False, return_state=True))
        
    def call(self, x):
        out = self.embed(x)
        _, h1, c1, h2, c2 = self.lstm_bi(out)
        state = ((h1, c1), (h2, c2))
        return state

encoder_model = Encoder()
decoder_model = Decoder()

encoder_inputs = tf.keras.layers.Input(shape=(None,))
decoder_inputs = tf.keras.layers.Input(shape=(None,))

enc_state = encoder_model(encoder_inputs)
decoder_outputs, _ = decoder_model(decoder_inputs, enc_state)

seq2seq = tf.keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
BATCH_SIZE = 64
EPOCHS = 50

loss = tf.losses.SparseCategoricalCrossentropy()
seq2seq.compile(optimizer='rmsprop', loss=loss, metrics=['accuracy'])
seq2seq.fit([encoder_input_seqs, decoder_input_seqs], decoder_target_seqs,
          batch_size=BATCH_SIZE,
          epochs=EPOCHS)

In [None]:
for seq_index in range(0, 20):
    input_seq = encoder_input_seqs[seq_index: seq_index + 1]
    decoded_sentence = seq2seq_inference(input_seq)
    print('-')
    print('Input sentence:', input_texts[seq_index])
    print('Result sentence:', decoded_sentence)
    print('Target sentence:', target_texts[seq_index])