<a href="https://colab.research.google.com/github/Vadim-Kolesnikov/U-net-Autoencoder/blob/main/translator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Импорт необходимых библитек

In [1]:
import numpy as np
import os
import time
from random import randint
import unicodedata
import re
import pandas as pd
import tensorflow as tf
from keras.models import Model, load_model
from keras.layers import Dense, Embedding, GRU, Input, Dense, Embedding, Bidirectional, Concatenate, Dropout, Dot
from keras.optimizers import RMSprop, Adadelta, Adam
from keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from keras import utils
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.utils import plot_model
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.ticker as ticker
%matplotlib inline

import warnings
warnings.filterwarnings('ignore')

##Загрузка данных

In [2]:
!wget https://storage.yandexcloud.net/academy.ai/rus-eng.zip
!unzip -o rus-eng.zip

--2024-10-02 10:52:28--  https://storage.yandexcloud.net/academy.ai/rus-eng.zip
Resolving storage.yandexcloud.net (storage.yandexcloud.net)... 213.180.193.243, 2a02:6b8::1d9
Connecting to storage.yandexcloud.net (storage.yandexcloud.net)|213.180.193.243|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 16305013 (16M) [application/x-zip-compressed]
Saving to: ‘rus-eng.zip’


2024-10-02 10:52:31 (9.83 MB/s) - ‘rus-eng.zip’ saved [16305013/16305013]

Archive:  rus-eng.zip
  inflating: rus.txt                 
  inflating: _about.txt              


##Основные переменные

In [3]:
BATCH_SIZE = 128
EPOCHS = 1
UNITS  = 1024
EMBED_DIM = 256
NUM_SAMPLES = 50000
ATTENTION_UNITS = 10
FILE_NAME = "rus.txt"
SOS = '<start>'
EOS = '<end>'

##Считывание данных из файла и их преварительная обработка

In [4]:
def preprocess_sentence(w):
    w = re.sub(r"([?.!,;:¿])", r" \1 ", w)
    w = re.sub(r'[" "]+', " ", w)

    w = re.sub(r"[^a-zA-Zа-яёА-ЯЁ?'`.!,;:¿]+", " ", w)
    w = w.rstrip().strip()

    w = SOS + ' ' + w + ' ' + EOS
    return w


en_texts = []
ru_texts = []

with open(FILE_NAME, "r", encoding="utf-8") as f:
    lines = f.read().split("\n")

for line in lines[: min(NUM_SAMPLES, len(lines) - 1)]:

    en_text, ru_text, _ = line.split("	")

    en_texts.append(preprocess_sentence(en_text))
    ru_texts.append(preprocess_sentence(ru_text))

print("Число используемых примеров:", len(en_texts))
print('Общее число примеров:', len(lines))
random_index = randint(0, len(en_texts)-1)
print(f'Пример текста на английском: {en_texts[random_index]}')
print(f'Пример текста на русском: {ru_texts[random_index]}')

Число используемых примеров: 50000
Общее число примеров: 496060
Пример текста на английском: <start> Fire is dangerous . <end>
Пример текста на русском: <start> Огонь опасен . <end>


##Токенизация данных и формирование выборок

In [5]:
def tokenize(text):
    tokenizer = Tokenizer(filters='')
    tokenizer.fit_on_texts(text)
    seq = tokenizer.texts_to_sequences(text)
    pseq = pad_sequences(seq, padding='post')
    return pseq, tokenizer

padded_en_texts, tokenizer_en = tokenize(en_texts)
padded_ru_texts, tokenizer_ru = tokenize(ru_texts)

max_length_en = padded_en_texts.shape[1]
max_length_ru = padded_ru_texts.shape[1]

en_train, en_test, ru_train, ru_test = train_test_split(padded_en_texts, padded_ru_texts, test_size=0.1)


BUFFER_SIZE = len(en_train)
STEPS_PER_EPOCH = len(en_train)//BATCH_SIZE
VOCAB_EN_SIZE = len(tokenizer_en.word_index) + 1
VOCAB_RU_SIZE = len(tokenizer_ru.word_index) + 1


eng2rus_dataset = tf.data.Dataset.from_tensor_slices((en_train, ru_train)).shuffle(BUFFER_SIZE)
eng2rus_dataset = eng2rus_dataset.batch(BATCH_SIZE, drop_remainder=True)

rus2eng_dataset = tf.data.Dataset.from_tensor_slices((ru_train, en_train)).shuffle(BUFFER_SIZE)
rus2eng_dataset = rus2eng_dataset.batch(BATCH_SIZE, drop_remainder=True)

##Класс кодировщика

In [6]:
class Encoder(tf.keras.Model):

  def __init__(self,
               vocab_size,
               embedding_dim,
               enc_units,
               batch_sz):

    super(Encoder, self).__init__()
    self.batch_sz = batch_sz
    self.enc_units = enc_units
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.enc_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform')

  def call(self,
           x,
           hidden):

    x = self.embedding(x)
    output, state = self.gru(x, initial_state = hidden)
    return output, state

  def initialize_hidden_state(self):
    return tf.zeros((self.batch_sz, self.enc_units))

##Реализация механизма внимания Богданова

In [7]:
class Attention(Model):

  def __init__(self,
               units):

    super(Attention, self).__init__()
    self.W1 = Dense(units)
    self.W2 = Dense(units)
    self.V =  Dense(1)

  def call(self,
           hidden_state,
           values):

    hidden_with_time_axis = tf.expand_dims(hidden_state, 1)
    score = self.V(tf.nn.tanh(self.W1(values) + self.W2(hidden_with_time_axis)))
    attention_weights = tf.nn.softmax(score, axis=1)
    context_vector = attention_weights * values
    context_vector = tf.reduce_sum(context_vector, axis=1)

    return context_vector, attention_weights

##Класс декодировщика

In [8]:
class Decoder(Model):

  def __init__(self,
               vocab_size,
               embedding_dim,
               dec_units,
               batch_sz):
    super(Decoder, self).__init__()
    self.batch_sz = batch_sz
    self.dec_units = dec_units
    self.embedding = Embedding(vocab_size, embedding_dim)
    self.gru = GRU(self.dec_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform')
    self.fc = Dense(vocab_size)
    self.attention = Attention(ATTENTION_UNITS)

  def call(self,
           x,
           hidden,
           enc_output
          ):
    context_vector, attention_weights = self.attention(hidden, enc_output)

    x = self.embedding(x)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
    output, state = self.gru(x, initial_state = hidden)

    output = tf.reshape(output, (-1, output.shape[2]))
    x = self.fc(output)
    return x, state, attention_weights

##Класс автокодировщика с механизмом внимания

In [9]:
class MainModel():
  def __init__(self, translate_type, vocab_inp_size, vocab_ans_size):

    self.optimizer = tf.keras.optimizers.Adam()
    self.encoder = Encoder(vocab_inp_size, EMBED_DIM, UNITS, BATCH_SIZE)
    self.decoder = Decoder(vocab_ans_size, EMBED_DIM, UNITS, BATCH_SIZE)
    self.checkpoint_dir = './training_checkpoints'
    self.checkpoint_prefix = os.path.join(self.checkpoint_dir, "ckpt")
    self.checkpoint = tf.train.Checkpoint(optimizer=self.optimizer,
                                          encoder=self.encoder,
                                          decoder=self.decoder)
    self.translate_type = translate_type

    if self.translate_type == 'eng2rus':

      self.tokenizer_inp = tokenizer_en
      self.tokenizer_out = tokenizer_ru

      self.max_length_inp = max_length_en
      self.max_length_out = max_length_ru

    elif self.translate_type == 'rus2eng':
      self.tokenizer_inp = tokenizer_ru
      self.tokenizer_out = tokenizer_en

      self.max_length_inp = max_length_ru
      self.max_length_out = max_length_en

  def loss_function(self, real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')(real, pred)
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask

    return tf.reduce_mean(loss)

  @tf.function
  def train_step(self, inp, targ, enc_hidden):
    loss = 0

    with tf.GradientTape() as tape:

      enc_output, enc_hidden = self.encoder(inp, enc_hidden)

      dec_hidden = enc_hidden

      dec_input = tf.expand_dims([self.tokenizer_inp.word_index[SOS]] * BATCH_SIZE, 1)
      for t in range(1, targ.shape[1]):
        predictions, dec_hidden, _ = self.decoder(dec_input, dec_hidden, enc_output)
        loss += self.loss_function(targ[:, t], predictions)
        dec_input = tf.expand_dims(targ[:, t], 1)


    batch_loss = (loss / int(targ.shape[1]))
    variables = self.encoder.trainable_variables + self.decoder.trainable_variables

    gradients = tape.gradient(loss, variables)

    self.optimizer.apply_gradients(zip(gradients, variables))

    return batch_loss

  def train(self, train_data, epochs=EPOCHS):
    for epoch in range(epochs):
      start = time.time()

      enc_hidden = self.encoder.initialize_hidden_state()
      total_loss = 0

      for (batch, (inp, targ)) in enumerate(train_data.take(STEPS_PER_EPOCH)):
        batch_loss = self.train_step(inp, targ, enc_hidden)
        total_loss += batch_loss

      if (epoch + 1) % 5 == 0:
        self.checkpoint.save(file_prefix = self.checkpoint_prefix)

      print(f'Эпоха: {epoch + 1}  |  Потеря:  {(total_loss / STEPS_PER_EPOCH):.4f}  |  Время на эпоху: {time.time() - start} сек \n' )

  def evaluate(self, sentence):
    sentence = preprocess_sentence(sentence)

    words = sentence.split(' ')
    inputs = list()

    for word in words:

        try:
            inputs.append(self.tokenizer_inp.word_index[word])
        except:
            pass

    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
                                                          maxlen=self.max_length_inp,
                                                          padding='post')
    inputs = tf.convert_to_tensor(inputs)
    result = ''

    hidden = [tf.zeros((1, UNITS))]
    enc_out, enc_hidden = self.encoder(inputs, hidden)

    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([self.tokenizer_out.word_index[SOS]], 0)

    for t in range(self.max_length_out):
      predictions, dec_hidden, attention_weights = self.decoder(dec_input,
                                                          dec_hidden,
                                                          enc_out)
      attention_weights = tf.reshape(attention_weights, (-1, ))
      predicted_id = tf.argmax(predictions[0]).numpy()
      result += self.tokenizer_out.index_word[predicted_id] + ' '

      if self.tokenizer_out.index_word[predicted_id] == EOS:
        return result, sentence

      dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence

##Класс переводчика с английского на русский и обратно

In [10]:
class Translator():

  def __init__(self):
    self.eng2rus = MainModel('eng2rus', VOCAB_EN_SIZE, VOCAB_RU_SIZE)
    self.rus2eng = MainModel('rus2eng', VOCAB_RU_SIZE, VOCAB_EN_SIZE)
    self.is_trained = False

  def train(self, eng2rus_dataset, rus2eng_dataset, epochs=EPOCHS):
    self.eng2rus.train(eng2rus_dataset, epochs)
    self.rus2eng.train(rus2eng_dataset, epochs)
    self.is_trained = True

  def translate(self, sentence, translate_type):
    if self.is_trained:
      if translate_type == 'eng2rus':
        return self.eng2rus.evaluate(sentence)[0]
      else:
        return self.rus2eng.evaluate(sentence)[0]
    else:
      return 'Для начала модель необходимо обучить'


##Обучение модели

In [11]:
translator = Translator()
translator.train(eng2rus_dataset, rus2eng_dataset, EPOCHS)

Эпоха: 1  |  Потеря:  1.8831  |  Время на эпоху: 207.05141639709473 сек 

Эпоха: 1  |  Потеря:  1.8606  |  Время на эпоху: 133.57495522499084 сек 



##Демонстрация работы модели

In [12]:
def detokinize(vect, lng='eng'):
  result = ''
  for i in vect:
    try:
      if lng == 'eng':
        result += tokenizer_en.index_word[i] + ' '
      else:
        result += tokenizer_ru.index_word[i] + ' '
    except:
      pass
  return result

In [13]:
n_samples = 10
dct = {'en': [], 'ru': [], 'en -> ru': [], 'ru -> en': []}
for i in range(n_samples):

  en = detokinize(en_test[i], lng='eng')
  ru = detokinize(ru_test[i], lng='rus')
  pred_en2ru = translator.translate(en, 'eng2rus')
  pred_ru2en = translator.translate(ru, 'rus2eng')

  dct['en'].append(en)
  dct['ru'].append(ru)
  dct['en -> ru'].append(pred_en2ru)
  dct['ru -> en'].append(pred_ru2en)


df = pd.DataFrame(dct)
df

Unnamed: 0,en,ru,en -> ru,ru -> en
0,<start> i fixed it . <end>,<start> я её починил . <end>,я меня есть . <end>,i don't be . <end>
1,"<start> i'm young , too . <end>",<start> я тоже молодой . <end>,"я меня , что что что что что что что что что",i was a bit shy . <end>
2,<start> what a sight ! <end>,<start> какое зрелище ! <end>,ты ты это ? <end>,it's a bit . <end>
3,<start> just take me home . <end>,<start> просто отвези меня домой . <end>,"мы меня , что что что что что что что что что","don't go , go . <end>"
4,<start> you need it . <end>,<start> тебе это нужно . <end>,они не меня . <end>,you must go . <end>
5,<start> tom wasn't rich . <end>,<start> том не был богат . <end>,ты не не не не что что что что пожалуйста . <e...,tom was very shy . <end>
6,<start> that's not yours . <end>,<start> это не твоё . <end>,это не не это . <end>,it's not a liar . <end>
7,<start> i like walking . <end>,<start> мне нравится гулять . <end>,я меня есть что что что что что что что что что,i love you . <end>
8,<start> be prepared . <end>,<start> будьте готовы ! <end>,они это . <end>,open the door . <end>
9,<start> it should be fun . <end>,<start> должно быть весело . <end>,ты не не не что что что пожалуйста . <end>,it's a bit good . <end>
