In [None]:
import enum
import copy
import pickle
import random
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.auto import tqdm, trange

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from transformers import T5ForConditionalGeneration, AutoTokenizer

#Dataset generation | Генерация датасета


In [None]:
# Создание словаря омоглифов

intentionals = dict()

int_resp = requests.get("https://www.unicode.org/Public/security/latest/intentional.txt", stream=True)
for line in int_resp.iter_lines():
  if len(line):
    line = line.decode('utf-8-sig')
    if line[0] != '#':
      line = line.replace("#*", "#")
      _, line = line.split("#", maxsplit=1)
      if line[3] not in intentionals:
        intentionals[line[3]] = []
      intentionals[line[3]].append(line[7])

In [None]:
!pip install fairseq
!pip install textdistance
!pip install pyarrow
!pip install sacremoses
!pip install fastBPE
!pip install subword_nmt

In [None]:
from abc import ABC
from typing import List, Tuple, Callable, Dict
from fairseq.hub_utils import GeneratorHubInterface
from scipy.optimize import NonlinearConstraint, differential_evolution
from textdistance import levenshtein
import pyarrow
import sacremoses
import fastBPE
import subword_nmt

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
en2fr = torch.hub.load('pytorch/fairseq',
                       'transformer.wmt14.en-fr',
                       tokenizer='moses',
                       bpe='subword_nmt').to(device)

In [None]:
class Objective(ABC):
  """ Abstract class representing objectives for scipy's genetic algorithms."""

  def __init__(self, model: GeneratorHubInterface, input: str, max_perturbs: int, distance: Callable[[str,str],int]):
    if not model:
      raise ValueError("Must supply model.")
    if not input:
      raise ValueError("Must supply input.")

    self.model: GeneratorHubInterface = model
    self.input: str = input
    self.max_perturbs: int = max_perturbs
    self.distance: Callable[[str,str],int] = distance
    self.output = self.model.translate(self.input)

  def objective(self) -> Callable[[List[float]], float]:
    def _objective(perturbations: List[float]) -> float:
      candidate: str = self.candidate(perturbations)
      translation: str = self.model.translate(candidate)
      return -self.distance(self.output, translation)
    return _objective

  def differential_evolution(self, print_result=True, verbose=True, maxiter=60, popsize=32, polish=False) -> str:
    result = differential_evolution(self.objective(), self.bounds(),
                                    disp=verbose, maxiter=maxiter,
                                    popsize=popsize, polish=polish)
    candidate = self.candidate(result.x)
    if (print_result):
      print(f"Result: {candidate}")
      print(f"Result Distance: {result.fun}")
      print(f"Perturbation Encoding: {result.x}")
      print(f"Input Translation: {self.output}")
      print(f"Result Translation: {self.model.translate(candidate)}")
    return candidate

  def bounds(self) -> List[Tuple[float, float]]:
    raise NotImplementedError()

  def candidate(self, perturbations: List[float]) -> str:
    raise NotImplementedError()


def natural(x: float) -> int:
    """Rounds float to the nearest natural number (positive int)"""
    return max(0, round(float(x)))

In [None]:
class HomoglyphObjective(Objective):

  def __init__(self, model: GeneratorHubInterface, input: str, max_perturbs=None, distance: Callable[[str,str],int] = levenshtein.distance, homoglyphs: Dict[str,List[str]] = intentionals, **kwargs):
    super().__init__(model, input, max_perturbs, distance)
    if not self.max_perturbs:
      self.max_perturbs = len(self.input)
    self.homoglyphs = homoglyphs
    self.glyph_map = []
    for i, char in enumerate(self.input):
      if char in self.homoglyphs:
        charmap = self.homoglyphs[char]
        charmap = list(zip([i] * len(charmap), charmap))
        self.glyph_map.extend(charmap)

  def bounds(self) -> List[Tuple[float, float]]:
    return [(-1, len(self.glyph_map)-1)] * self.max_perturbs

  def candidate(self, perturbations: List[float]) -> str:
    candidate = [char for char in self.input]
    for perturb in map(natural, perturbations):
      if perturb >= 0:
        i, char = self.glyph_map[perturb]
        candidate[i] = char
    return ''.join(candidate)

In [None]:
f = open('jfleg.txt', 'r')
text = f.read().split('\n')
data = []
for s in text:
  new_s = s.split(' ')[:4]
  new_s.remove(',') if ',' in new_s else None
  new_s.remove("'s") if "'s" in new_s else None
  data.append(" ".join(new_s))

In [None]:
text = pd.read_csv('unigram_freq.csv')
words = list(text['word'])
start = int(len(words)*0.8) # избегаем самые частые слова вроде предлогов и союзов
words = words[start:start+600]
for w in words:
  if len(w) > 4:
    data.append(w)
data = np.array(data)

In [None]:
np.random.seed(42)
np.random.shuffle(data)
len(data)

In [None]:
def experiment(model, objective, source, min_perturb, max_perturb, maxiter, popsize):
  train = []
  for i in trange(min_perturb, max_perturb, desc="Perturbations"):
    for sentence in tqdm(source, leave=False, desc="Sentences"):
      changed = objective(en2fr, sentence, max_perturbs=i).differential_evolution(print_result=False, verbose=False, maxiter=maxiter, popsize=popsize)
      train.append((changed, sentence))
  return train

In [None]:
min_perturb = 1 # минимальное количество вмешательств
max_perturb = 5 # максимальное количество вмешательств
maxiter = 3 # количество поколений
popsize = 16 # размер популяции
ds = experiment(en2fr, HomoglyphObjective, data, min_perturb, max_perturb, maxiter, popsize)

with open('homoglyph.pkl', 'wb') as f:
  pickle.dump(ds, f)

#Preparation for training | Подготовка к обучению

In [None]:
class AvailableCorrectors(enum.Enum):

    sage_fredt5_large = "ai-forever/sage-fredt5-large"
    sage_fredt5_distilled_95m = "ai-forever/sage-fredt5-distilled-95m"
    sage_m2m100_1B = "ai-forever/sage-m2m100-1.2B"
    sage_mt5_large = "ai-forever/sage-mt5-large"

    m2m100_1B = "ai-forever/RuM2M100-1.2B"
    m2m100_418M = "ai-forever/RuM2M100-418M"
    fred_large = "ai-forever/FRED-T5-large-spell"
    ent5_large = "ai-forever/T5-large-spell"

In [None]:
path_to_model = AvailableCorrectors.sage_fredt5_distilled_95m.value

model = T5ForConditionalGeneration.from_pretrained(path_to_model)
tokenizer = AutoTokenizer.from_pretrained(path_to_model)

In [None]:
tokens = sum(list(intentionals.values()), [])
tokens = set(tokens) - set(tokenizer.vocab.keys())
tokenizer.add_tokens(list(tokens))
model.resize_token_embeddings(len(tokenizer))

In [None]:
# Если данные загружать из ранее сгенерированного файла
with open('homoglyph.pkl', 'rb') as f:
  ds = pickle.load(f)

In [None]:
ds = np.array(ds)
train_size = int(0.8*ds.shape[0])
x_train = ds[:train_size, 0]
y_train = ds[:train_size, 1]
x_train_val = ds[train_size:, 0]
y_train_val = ds[train_size:, 1]
x_train.shape, x_train_val.shape

#Training | Обучение

In [None]:
class HomoglyphDataset(Dataset):

    def __init__(self, X, y):
        self.X = np.array(X)
        self.y = np.array(y)

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, index):
        return (self.X[index], self.y[index])

In [None]:
class M2T():
  def __init__(self, device, dataset,
               model, tokenizer,
               learning_rate, epochs,
               batch_size, optimizer,
               early_stopping_patience=10,
               prefix=""):

    self.device = device
    self.dataset = dataset
    self.model = model
    self.tokenizer = tokenizer
    self.learning_rate = learning_rate
    self.epochs = epochs
    self.batch_size = batch_size
    self.optimizer = optimizer
    self.early_stopping_patience = early_stopping_patience
    self.prefix = prefix

    self.best_model = copy.deepcopy(model)
    self.train_loss = []
    self.val_loss = []
    self.best_val_loss = float('inf')
    self.best_epoch = 0

  def predict(self, x):
    encodings = self.tokenizer(x, return_tensors="pt").to(self.model.device)
    if self.prefix == "":
      generated_tokens = self.model.generate(
          **encodings, forced_bos_token_id=tokenizer.get_lang_id("ru"))
    else:
      generated_tokens = self.model.generate(**encodings)
    answer = self.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
    return answer

  def plot_losses(self):
    c_iter = range(len(self.train_loss))

    fig = plt.figure()
    ax = fig.gca()
    ax.set_xticks(c_iter)
    plt.plot(c_iter, self.train_loss, color='orange', label='train_loss')
    plt.plot(c_iter, self.val_loss, color='blue', label='val_loss')

    plt.legend(loc='upper right')
    plt.title('Losses')
    plt.savefig('losses.png')
    plt.grid()
    plt.show()


  def fit(self, x_train, y_train, x_train_val, y_train_val):

    self.model.to(self.device)
    optimizer = self.optimizer(self.model.parameters(), lr=self.learning_rate)

    train = self.dataset(x_train, y_train)
    val = self.dataset(x_train_val, y_train_val)

    train = DataLoader(train, batch_size=self.batch_size, shuffle=True)
    val = DataLoader(val, batch_size=self.batch_size, shuffle=False)

    for epoch in range(self.epochs):
      mean_loss = 0
      batch_n = 0
      self.model.train()

      print(f'Epoch: {epoch}', end=' ---------------------- ')

      for batch_i, (batch, target) in enumerate(train):
        x = self.tokenizer([self.prefix + sentence for sentence in batch], return_tensors='pt', padding=True).to(model.device)
        y = self.tokenizer(target, return_tensors='pt', padding=True).to(model.device)
        y.input_ids[y.input_ids == 0] = -100

        loss = self.model(
            input_ids=x.input_ids,
            attention_mask=x.attention_mask,
            labels=y.input_ids,
            decoder_attention_mask=y.attention_mask,
            return_dict=True
        ).loss
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        mean_loss += float(loss)
        batch_n += 1

      mean_loss /= batch_n
      self.train_loss.append(mean_loss)
      print(f'Loss_train: {round(mean_loss, 3)}', end=' ; ')

      self.model.eval()
      mean_loss = 0
      batch_n = 0

      with torch.no_grad():
        for batch, target in val:
            x = self.tokenizer(batch, return_tensors='pt', padding=True).to(model.device)
            y = self.tokenizer(target, return_tensors='pt', padding=True).to(model.device)
            y.input_ids[y.input_ids == 0] = -100

            loss = self.model(
                input_ids=x.input_ids,
                attention_mask=x.attention_mask,
                labels=y.input_ids,
                decoder_attention_mask=y.attention_mask,
                return_dict=True
            ).loss

            mean_loss += float(loss)
            batch_n += 1

      mean_loss /= batch_n
      self.val_loss.append(mean_loss)
      print(f'Loss_val: {round(mean_loss, 3)}')
      if mean_loss < self.best_val_loss:
        self.best_epoch = epoch
        self.best_val_loss = mean_loss
        self.best_model = copy.deepcopy(model)
        print('New best model.')
      elif epoch - self.best_epoch > self.early_stopping_patience:
        print(f'Model has not improved in the last {self.early_stopping_patience} epochs. Break.')
        break

In [None]:
batch_size = 32
epochs = 100
early_stop = 10
lr = 1e-3
prefix = "fix homoglyphs | "

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

params = {
    'device': device,
    'dataset': HomoglyphDataset,
    'model': model,
    'tokenizer': tokenizer,
    'epochs': epochs,
    'learning_rate': lr,
    'batch_size': batch_size,
    'optimizer': torch.optim.Adam,
    'early_stopping_patience': early_stop,
    'prefix': prefix,
}

In [None]:
mtt = M2T(**params)
mtt.fit(x_train, y_train, x_train_val, y_train_val)

In [None]:
mtt.plot_losses()