##### Импорт библиотек

In [42]:
from argparse import Namespace
from collections import Counter
import json
import os
import string

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm.notebook import tqdm

#### Data Vectorization classes

##### Словарь

In [43]:
class Vocabulary(object):
    """Класс для обработки текста и извлечения словаря для сопоставления"""

    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
        """
        Args:
            token_to_idx (dict): уже существующий словарь токенов с индексами
            add_unk (bool): флаг, указывающий, следует ли добавлять токен UNK
            unk_token (str): токен UNK для добавления в словарь
        """

        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx: token 
                              for token, idx in self._token_to_idx.items()}
        
        self._add_unk = add_unk
        self._unk_token = unk_token
        
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token) 
        
        
    def to_serializable(self):
        """ возвращает словарь, который можно сериализовать """
        return {'token_to_idx': self._token_to_idx, 
                'add_unk': self._add_unk, 
                'unk_token': self._unk_token}

    @classmethod
    def from_serializable(cls, contents):
        """ создает экземпляр словаря из сериализованного словаря """
        return cls(**contents)

    def add_token(self, token):
        """Обновить правила сопоставления на основе токена.

        Args:
            token (str): элемент, который нужно добавить в словарь
        Returns:
            index (int): целое число, соответствующее токену
        """
        try:
            index = self._token_to_idx[token]
        except KeyError:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def add_many(self, tokens):
        """Добавления списка токенов в словарь
        
        Args:
            tokens (list): список строковых токенов
        Returns:
            indices (list): список индексов, соответствующих токенам
        """
        return [self.add_token(token) for token in tokens]

    def lookup_token(self, token):
        """Получить индекс, связанный с токеном
           или индекс UNK, если токен отсутствует.
        
        Args:
            token (str): токен для поиска 
        Returns:
            index (int): индекс, соответствующий токену
        Notes:
            `unk_index` должен быть >=0 (добавлен в словарь)
                для функциональности UNK
        """
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

    def lookup_index(self, index):
        """Вернуть токен, связанный с индексом
        
        Args: 
            index (int): индекс для поиска
        Returns:
            token (str): токен, соответствующий индексу
        Raises:
            KeyError: if the index is not in the Vocabulary
        """
        if index not in self._idx_to_token:
            raise KeyError("the index (%d) is not in the Vocabulary" % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)

##### Векторизация

In [44]:
class SurnameVectorizer(object):
    """ Векторизатор, который координирует словари и использует их """
    def __init__(self, surname_vocab, nationality_vocab):
        """
        Args:
            surname_vocab (Vocabulary): отображает символы в целые числа
            nationality_vocab (Vocabulary): отображает национальности в целые числа
        """
        self.surname_vocab = surname_vocab
        self.nationality_vocab = nationality_vocab

    def vectorize(self, surname):
        """
        Args:
            surname (str): фамилия

        Returns:
            one_hot (np.ndarray): свёрнутая one-hot кодировка 
        """
        vocab = self.surname_vocab
        one_hot = np.zeros(len(vocab), dtype=np.float32)
        for token in surname:
            one_hot[vocab.lookup_token(token)] = 1

        return one_hot

    @classmethod
    def from_dataframe(cls, surname_df):
        """Создание экземпляра векторизатора из датафрейма набора данных.
        
        Args:
            surname_df (pandas.DataFrame): датасет фамилий
        Returns:
            экземпляр SurnameVectorizer
        """
        surname_vocab = Vocabulary(unk_token="@")
        nationality_vocab = Vocabulary(add_unk=False)

        for index, row in surname_df.iterrows():
            for letter in row.surname:
                surname_vocab.add_token(letter)
            nationality_vocab.add_token(row.nationality)

        return cls(surname_vocab, nationality_vocab)

    @classmethod
    def from_serializable(cls, contents):
        surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
        nationality_vocab =  Vocabulary.from_serializable(contents['nationality_vocab'])
        return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)

    def to_serializable(self):
        return {'surname_vocab': self.surname_vocab.to_serializable(),
                'nationality_vocab': self.nationality_vocab.to_serializable()}

##### Датасет

In [45]:
class SurnameDataset(Dataset):
    def __init__(self, surname_df, vectorizer):
        """
        Args:
            surname_df (pandas.DataFrame): датасет
            vectorizer (SurnameVectorizer): векторизатор, созданный из датасета
        """
        self.surname_df = surname_df
        self._vectorizer = vectorizer

        self.train_df = self.surname_df[self.surname_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.surname_df[self.surname_df.split=='val']
        self.validation_size = len(self.val_df)

        self.test_df = self.surname_df[self.surname_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}

        self.set_split('train')
        
        # Class weights
        class_counts = surname_df.nationality.value_counts().to_dict()
        def sort_key(item):
            return self._vectorizer.nationality_vocab.lookup_token(item[0])
        sorted_counts = sorted(class_counts.items(), key=sort_key)
        frequencies = [count for _, count in sorted_counts]
        self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)

    @classmethod
    def load_dataset_and_make_vectorizer(cls, surname_csv):
        """Загружает датасет и создаёт новую векторизацию с нуля
        
        Args:
            surname_csv (str): расположение датасета
        Returns:
            экземпляр SurnameDataset
        """
        surname_df = pd.read_csv(surname_csv)
        train_surname_df = surname_df[surname_df.split=='train']
        return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))

    @classmethod
    def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
        """Загрузить набор данных и соответствующий векторизатор. 
        Используется в случае, если в векторизаторе было кэширование для повторного использования
        
        Args:
            surname_csv (str): расположение датасета
            vectorizer_filepath (str): расположение сохранённого векторизатора
        Returns:
            экземпляр SurnameDataset
        """
        surname_df = pd.read_csv(surname_csv)
        vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
        return cls(surname_df, vectorizer)

    @staticmethod
    def load_vectorizer_only(vectorizer_filepath):
        """статический метод загрузки векторизатора из файла
        
        Args:
            vectorizer_filepath (str): расположение сериализованного векторизатора
        Returns:
            экземпляр SurnameVectorizer
        """
        with open(vectorizer_filepath) as fp:
            return SurnameVectorizer.from_serializable(json.load(fp))

    def save_vectorizer(self, vectorizer_filepath):
        """сохраняет векторизатор на диск, используя json
        
        Args:
            vectorizer_filepath (str): место для сохранения векторизатора
        """
        with open(vectorizer_filepath, "w") as fp:
            json.dump(self._vectorizer.to_serializable(), fp)

    def get_vectorizer(self):
        """ возвращает векторизатор """
        return self._vectorizer

    def set_split(self, split="train"):
        """ выбирает разделения в наборе данных, используя столбец в кадре данных """
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __len__(self):
        return self._target_size

    def __getitem__(self, index):
        """метод основной точки входа для наборов данных PyTorch
        
        Args:
            index (int): индекс точки данных 
        Returns:
            словарь, содержащий точки данных:
                функции (x_surname)
                метка (y_nationality)
        """
        row = self._target_df.iloc[index]

        surname_vector = \
            self._vectorizer.vectorize(row.surname)

        nationality_index = \
            self._vectorizer.nationality_vocab.lookup_token(row.nationality)

        return {'x_surname': surname_vector,
                'y_nationality': nationality_index}

    def get_num_batches(self, batch_size):
        """Учитывая размер пакета, возвращает количество пакетов в датасете
        
        Args:
            batch_size (int)
        Returns:
            количество пакетов в датасете
        """
        return len(self) // batch_size

    
def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"): 
    """
    Функция-генератор, которая оборачивает PyTorch DataLoader. 
    Это гарантирует, что каждый тензор находится в месте записи.
    """
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

#### Модель: SurnameClassifier

In [46]:
class SurnameClassifier(nn.Module):
    """ Двухслойный многослойный персептрон для классификации фамилий """
    def __init__(self, input_dim, hidden_dim, output_dim):
        """
        Args:
            input_dim (int): размер входных векторов
            hidden_dim (int): выходной размер первого линейного слоя
            output_dim (int): выходной размер второго линейного слоя
        """
        super(SurnameClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x_in, apply_softmax=False):
        """Прямой проход классификатора
        
        Args:
            x_in (torch.Tensor): тензор входных данных. 
                x_in.shape должен быть (batch, input_dim)
            apply_softmax (bool): флаг активации softmax должен быть False, 
            если используется с функцией потерь в виде перекрестной энтропии
        Returns:
            возвращает tensor. tensor.shape должен быть (batch, output_dim)
        """
        intermediate_vector = F.relu(self.fc1(x_in))
        prediction_vector = self.fc2(intermediate_vector)

        if apply_softmax:
            prediction_vector = F.softmax(prediction_vector, dim=1)

        return prediction_vector

#### Тренировка

##### Вспомогательные функции

In [47]:
def make_train_state(args):
    return {'stop_early': False,
            'early_stopping_step': 0,
            'early_stopping_best_val': 1e8,
            'learning_rate': args.learning_rate,
            'epoch_index': 0,
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'test_loss': -1,
            'test_acc': -1,
            'model_filename': args.model_state_file}

def update_train_state(args, model, train_state):
    """Обработка обновлений состояния обучения.

    Компоненты:
     - Ранняя остановка: предотвращение переобучения.
     - Контрольная точка модели: модель сохраняется, если модель лучше

    :param args: основные аргументы
    :param model: модель для обучения
    :param train_state: словарь, представляющий значения состояния обучения
    :returns:
        новый train_state
    """

    # Сохраняет модель
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # Сохраняет модель при улучшении производительности
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # Если функция потерь возрасла
        if loss_t >= train_state['early_stopping_best_val']:
            # Обновление шага
            train_state['early_stopping_step'] += 1
        # Если функция потерь уменьшилась
        else:
            # Сохранение лучшей модели
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # Сброс шага ранней остановки
            train_state['early_stopping_step'] = 0

        # Ранняя остановка ?
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria

    return train_state

def compute_accuracy(y_pred, y_target):
    _, y_pred_indices = y_pred.max(dim=1)
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

##### Главные функции

In [48]:
def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)

#### Настройка и подготовительная работа

In [49]:
args = Namespace(
    # Пути к директориям
    surname_csv="data/surnames/surnames_with_splits.csv",
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch4/surname_mlp",
    # Гиперпараметры модели
    hidden_dim=300,
    # Гиперпараметры для тренировки
    seed=1337,
    num_epochs=100,
    early_stopping_criteria=5,
    learning_rate=0.001,
    batch_size=64,
    # Опции рантайма
    cuda=False,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True,
)

if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)
    
    print("Expanded filepaths: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))
    
# Проверка CUDA
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")

print("Using CUDA: {}".format(args.cuda))


# Установка seed
set_seed_everywhere(args.seed, args.cuda)

# Обработка директорий
handle_dirs(args.save_dir)

Expanded filepaths: 
	model_storage/ch4/surname_mlp/vectorizer.json
	model_storage/ch4/surname_mlp/model.pth
Using CUDA: False


#### Инициализация

In [None]:
if args.reload_from_files:
    # Обучение с сохранения
    print("Reloading!")
    dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
                                                              args.vectorizer_file)
else:
    # создание датасета и векторизатора
    print("Creating fresh!")
    dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
    dataset.save_vectorizer(args.vectorizer_file)
    
vectorizer = dataset.get_vectorizer()
classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab), 
                               hidden_dim=args.hidden_dim, 
                               output_dim=len(vectorizer.nationality_vocab))

Creating fresh!


#### Цикл тренировки

In [None]:
classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)

    
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                                 mode='min', factor=0.5,
                                                 patience=1)

train_state = make_train_state(args)

epoch_bar = tqdm(desc='training routine', 
                          total=args.num_epochs,
                          position=0)

dataset.set_split('train')
train_bar = tqdm(desc='split=train',
                          total=dataset.get_num_batches(args.batch_size), 
                          position=1, 
                          leave=True)
dataset.set_split('val')
val_bar = tqdm(desc='split=val',
                        total=dataset.get_num_batches(args.batch_size), 
                        position=1, 
                        leave=True)

try:
    for epoch_index in range(args.num_epochs):
        train_state['epoch_index'] = epoch_index

        # Перебор обучающего датасета

        # настройка: batch generator, установка loss и accuracy = 0, режим обучения

        dataset.set_split('train')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.0
        running_acc = 0.0
        classifier.train()

        for batch_index, batch_dict in enumerate(batch_generator):
            # тренировка состоит из 5 шагов:

            # --------------------------------------
            # шаг 1. обнуление градиентов
            optimizer.zero_grad()

            # шаг 2. вычислить результат
            y_pred = classifier(batch_dict['x_surname'])

            # шаг 3. вычислить функцию потерь
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # шаг 4. используя функцию для вычисления градиентов
            loss.backward()

            # шаг 5. используя оптимизатор, делаем градиентный шаг
            optimizer.step()
            # -----------------------------------------
            # вычисление accuracy
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)

            # обновление прогресса обучения
            train_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            train_bar.update()

        train_state['train_loss'].append(running_loss)
        train_state['train_acc'].append(running_acc)

        # Перебор датасета val

        # настройка: batch generator, установка loss и accuracy = 0, режим оценки

        dataset.set_split('val')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.
        running_acc = 0.
        classifier.eval()

        for batch_index, batch_dict in enumerate(batch_generator):

            # вычисление вывода
            y_pred =  classifier(batch_dict['x_surname'])

            # Вычисление функции потерь
            loss = loss_func(y_pred, batch_dict['y_nationality'])
            loss_t = loss.to("cpu").item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # вычисление accuracy
            acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)
            val_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            val_bar.update()

        train_state['val_loss'].append(running_loss)
        train_state['val_acc'].append(running_acc)

        train_state = update_train_state(args=args, model=classifier,
                                         train_state=train_state)

        scheduler.step(train_state['val_loss'][-1])

        if train_state['stop_early']:
            break

        train_bar.n = 0
        val_bar.n = 0
        epoch_bar.update()
except KeyboardInterrupt:
    print("Exiting loop")

training routine:   0%|          | 0/100 [00:00<?, ?it/s]

split=train:   0%|          | 0/120 [00:00<?, ?it/s]

split=val:   0%|          | 0/25 [00:00<?, ?it/s]

In [None]:
# вычисление функции потерь и точности на тестовом наборе, используя наилучшую доступную модель

classifier.load_state_dict(torch.load(train_state['model_filename']))

classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)
loss_func = nn.CrossEntropyLoss(dataset.class_weights)

dataset.set_split('test')
batch_generator = generate_batches(dataset, 
                                   batch_size=args.batch_size, 
                                   device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    # вычисление выхода
    y_pred =  classifier(batch_dict['x_surname'])
    
    # вычисление функции потерь
    loss = loss_func(y_pred, batch_dict['y_nationality'])
    loss_t = loss.item()
    running_loss += (loss_t - running_loss) / (batch_index + 1)

    # вычисление accuracy
    acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
    running_acc += (acc_t - running_acc) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc

In [None]:
print("Test loss: {};".format(train_state['test_loss']))
print("Test Accuracy: {}".format(train_state['test_acc']))

Test loss: 1.7831853818893435;
Test Accuracy: 46.31249999999999


#### Вывод

In [None]:
def predict_nationality(surname, classifier, vectorizer):
    """Предсказание национальности по новой фамилии
    
    Args:
        surname (str): фамилия классифицируемого
        classifier (SurnameClassifer): экземпляр классификатора
        vectorizer (SurnameVectorizer): соответствующий векторизатор
    Returns:
        словарь с наиболее вероятной национальностью и ее вероятностью
    """
    vectorized_surname = vectorizer.vectorize(surname)
    vectorized_surname = torch.tensor(vectorized_surname).view(1, -1)
    result = classifier(vectorized_surname, apply_softmax=True)

    probability_values, indices = result.max(dim=1)
    index = indices.item()

    predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
    probability_value = probability_values.item()

    return {'nationality': predicted_nationality, 'probability': probability_value}

In [None]:
new_surname = input("Enter a surname to classify: ")
classifier = classifier.to("cpu")
prediction = predict_nationality(new_surname, classifier, vectorizer)
print("{} -> {} (p={:0.2f})".format(new_surname,
                                    prediction['nationality'],
                                    prediction['probability']))

Yagilev -> Russian (p=0.89)


#### Вывод K наиболее вероятных предсказаний

In [None]:
vectorizer.nationality_vocab.lookup_index(0)

'Arabic'

In [52]:
def predict_topk_nationality(name, classifier, vectorizer, k=5):
    vectorized_name = vectorizer.vectorize(name)
    vectorized_name = torch.tensor(vectorized_name).view(1, -1)
    prediction_vector = classifier(vectorized_name, apply_softmax=True)
    probability_values, indices = torch.topk(prediction_vector, k=k)
    
    # возвращаемый размер: 1,k
    probability_values = probability_values.detach().numpy()[0]
    indices = indices.detach().numpy()[0]
    
    results = []
    for prob_value, index in zip(probability_values, indices):
        nationality = vectorizer.nationality_vocab.lookup_index(index)
        results.append({'nationality': nationality, 
                        'probability': prob_value})
    
    return results


new_surname = input("Enter a surname to classify: ")
classifier = classifier.to("cpu")

k = int(input("How many of the top predictions to see? "))
if k > len(vectorizer.nationality_vocab):
    print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
    k = len(vectorizer.nationality_vocab)
    
predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)

print("Top {} predictions:".format(k))
print("===================")
for prediction in predictions:
    print("{} -> {} (p={:0.2f})".format(new_surname,
                                        prediction['nationality'],
                                        prediction['probability']))

Top 5 predictions:
Jagilev -> Russian (p=0.76)
Jagilev -> Czech (p=0.12)
Jagilev -> English (p=0.04)
Jagilev -> Spanish (p=0.02)
Jagilev -> Irish (p=0.01)
