# Распознование рукописных цифр (MNIST) с помощью RBM

### 1. Подключение библиотек

In [1]:
import os
import torch
import torchvision.datasets
import torchvision.models
import torchvision.transforms
import numpy as np
from sklearn.linear_model import LogisticRegression
import warnings
warnings.filterwarnings('ignore')

### 2. Загрузка набора данных MNIST

__Скачивание данных в текущую директорию:__

In [2]:
dir_name = os.getcwd()

__Чтение тренировочной и тестовой выборок набора данных MNIST :__

Данные представляются в виде пар __(tuple)__, где первый элемент — изображение в формате __PIL.Image.Image__, а второй — целочисленная метка класса. Параметр __transform__ обеспечивает преобразование изображений в формат __torch.Tensor__ для последующей работы.

In [3]:
train_dataset = torchvision.datasets.MNIST(root = dir_name, train = True, 
download = True, transform = torchvision.transforms.ToTensor())

test_dataset = torchvision.datasets.MNIST(root = dir_name, train = False, 
download = True, transform = torchvision.transforms.ToTensor())

__Зададим размер обрабатываемой пачки данных:__

In [4]:
batch_size = 64

__Создадим объекты для последовательной загрузки пачек данных из тренировочной и тестовой выборок:__

In [5]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, shuffle = True)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = batch_size, shuffle = False)

### 3. Создание модели, соответствующей ограниченной машине Больцмана

__Зададим количество нейронов видимого слоя:__ 28 * 28 = 784, поскольку изображения имеют размер 28 на 28 пикселей

In [6]:
visible_units = 28 * 28

__Зададим количество нейронов скрытого слоя:__

In [7]:
hidden_units = 128

__Зададим скорость обучения модели:__

In [8]:
learning_rate = 0.001

__Зададим количество эпох обучения модели:__

In [9]:
num_epochs = 10

__Зададим параметр алгоритма контрастной дивергенции:__

In [10]:
CD_k = 2

__Зададим коэффициент затухания весов:__

In [11]:
weight_decay = 0.0001

__Зададим коэффициент метода импульса:__

In [12]:
momentum_coefficient = 0.5

__Создадим класс сети, соответствующей ограниченной машине Больцмана:__

In [13]:
class RBM():
    # Конструктор
    def __init__(self, num_visible, num_hidden, k, learning_rate, momentum_coefficient, weight_decay):
        # Задаем количество нейронов видимого и скрытого слоев, параметр алгоритма
        # контрастной дивергенции, скорость обучения модели, коэффициенты
        # затухания весов и метода импульса
        self.num_visible = num_visible
        self.num_hidden = num_hidden
        self.k = k
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.momentum_coefficient = momentum_coefficient

        # Задаем вектора смещений и матрицу связей (заполняем их случайными
        # числами из стандартного нормального распределения)
        self.weights = torch.randn(num_visible, num_hidden)
        self.visible_bias = torch.randn(num_visible)
        self.hidden_bias = torch.randn(num_hidden)
        
        # Задаем вектора смещений и матрицу связей для метода моментов
        # (заполняем их нулями)
        self.weights_momentum = torch.zeros(num_visible, num_hidden)
        self.visible_bias_momentum = torch.zeros(num_visible)
        self.hidden_bias_momentum = torch.zeros(num_hidden)

    # Будем считать, что W - матрица связей, a и b - вектора смещений 
    # для видимых и скрытых нейронов соответственно, v - вектор состояния
    # нейронов видимого слоя, h - вектор состояний нейронов скрытого слоя
    
    # Тогда, в соответствии с введенными обозначениями:
    
    # Функция принимает на вход вектор v и вычисляет вектор p(h|v) = sigmoid(v * W + b)
    def sample_hidden(self, visible_probabilities):
        hidden_activations = torch.matmul(visible_probabilities, self.weights) + self.hidden_bias
        hidden_probabilities = self._sigmoid(hidden_activations)
        return hidden_probabilities
    
    # Функция принимает на вход вектор h и вычисляет вектор p(v|h) = sigmoid(h * W^T + a)
    def sample_visible(self, hidden_probabilities):
        visible_activations = torch.matmul(hidden_probabilities, self.weights.t()) + self.visible_bias
        visible_probabilities = self._sigmoid(visible_activations)
        return visible_probabilities
    
    # Стоит отметить тот факт, что данные формулы для вычисления условных вероятностей
    # отличаются от стандартных: p(h|v) = sigmoid(W^T * v + b) и p(v|h) = sigmoid(W * h + a)
    # Это происходит из-за того, что в данной реализации считается, что вектора - это строки,
    # а не столбцы, и если данные выражения для условных вероятностей транспонировать, то
    # можно убедиться, что формулы согласуются с теорией
    
    def contrastive_divergence(self, input_data):
        # Вычисляем вектор p(h|v)
        positive_hidden_probabilities = self.sample_hidden(input_data)
        
        # Активируем нейроны следующим способом:
        # Генерируется вектор с той же длинны с случайными числами
        # из стандартного нормального распределения, после чего
        # вектор p(h|v), полученный ранее, покомпонентно с ним сравнивается,
        # в результате чего получаем вектор, состоящий из 0 и 1, где 1 соответствует
        # нейрону, который активировался
        positive_hidden_activations = (positive_hidden_probabilities >= self._random_probabilities(self.num_hidden)).float()
        
        # 
        positive_associations = torch.matmul(input_data.t(), positive_hidden_activations)

        # Negative phase
        hidden_activations = positive_hidden_activations

        for step in range(self.k):
            visible_probabilities = self.sample_visible(hidden_activations)
            hidden_probabilities = self.sample_hidden(visible_probabilities)
            hidden_activations = (hidden_probabilities >= self._random_probabilities(self.num_hidden)).float()

        negative_visible_probabilities = visible_probabilities
        negative_hidden_probabilities = hidden_probabilities

        negative_associations = torch.matmul(negative_visible_probabilities.t(), negative_hidden_probabilities)

        # Update parameters
        self.weights_momentum *= self.momentum_coefficient
        self.weights_momentum += (positive_associations - negative_associations)

        self.visible_bias_momentum *= self.momentum_coefficient
        self.visible_bias_momentum += torch.sum(input_data - negative_visible_probabilities, dim=0)

        self.hidden_bias_momentum *= self.momentum_coefficient
        self.hidden_bias_momentum += torch.sum(positive_hidden_probabilities - negative_hidden_probabilities, dim=0)

        batch_size = input_data.size(0)

        self.weights += self.weights_momentum * self.learning_rate / batch_size
        self.visible_bias += self.visible_bias_momentum * self.learning_rate / batch_size
        self.hidden_bias += self.hidden_bias_momentum * self.learning_rate / batch_size

        self.weights -= self.weights * self.weight_decay  # L2 weight decay

        # Вычисление квадратичной ошибки
        error = torch.sum((input_data - negative_visible_probabilities)**2)

        return error
    

    # Задаем сигмоидальную функцию: 1 / (1 + e^(-x))
    def _sigmoid(self, x):
        return 1 / (1 + torch.exp(-x))
    
    # Данная функция создает вектор длины num, заполняет его случайными 
    # числами из стандартного нормального распределения и возвращает его
    def _random_probabilities(self, num): 
        return torch.rand(num)
    
    # Одно нижнее подчеркивание фактически означает, что данные методы 
    # имеют спецификатор доступа protected (но это считается на уровне 
    # соглашения, данные методы можно вызвать вне класса)

__Создадим объект разработанного класса:__

In [14]:
rbm = RBM(visible_units, hidden_units, CD_k, learning_rate, momentum_coefficient, weight_decay)

### 3. Обучение построенной модели

__Обучим модель:__

In [15]:
%%time
for epoch in range(num_epochs): # проход по эпохам
    epoch_error = 0.0
    
    for batch, _ in train_loader:
        batch = batch.view(len(batch), visible_units)
        batch_error = rbm.contrastive_divergence(batch)
        epoch_error += batch_error

    print('Epoch error (epoch = %d): %.4f' % (epoch + 1, epoch_error))

Epoch error (epoch = 1): 8811007.0000
Epoch error (epoch = 2): 4942183.0000
Epoch error (epoch = 3): 3994426.5000
Epoch error (epoch = 4): 3441513.2500
Epoch error (epoch = 5): 3060120.0000
Epoch error (epoch = 6): 2779013.2500
Epoch error (epoch = 7): 2560873.0000
Epoch error (epoch = 8): 2390059.5000
Epoch error (epoch = 9): 2255230.2500
Epoch error (epoch = 10): 2151300.7500
Wall time: 1min 10s


In [16]:
train_features = np.zeros((len(train_dataset), hidden_units))
train_labels = np.zeros(len(train_dataset))
test_features = np.zeros((len(test_dataset), hidden_units))
test_labels = np.zeros(len(test_dataset))

for i, (batch, labels) in enumerate(train_loader):
    batch = batch.view(len(batch), visible_units)
    train_features[i*batch_size:i*batch_size+len(batch)] = rbm.sample_hidden(batch).numpy()
    train_labels[i*batch_size:i*batch_size+len(batch)] = labels.numpy()

for i, (batch, labels) in enumerate(test_loader):
    batch = batch.view(len(batch), visible_units)
    test_features[i*batch_size:i*batch_size+len(batch)] = rbm.sample_hidden(batch).numpy()
    test_labels[i*batch_size:i*batch_size+len(batch)] = labels.numpy()

In [17]:
clf = LogisticRegression(solver = 'lbfgs', max_iter = 1000)
clf.fit(train_features, train_labels)
predictions = clf.predict(test_features)

print('Result: %d/%d' % (sum(predictions == test_labels), test_labels.shape[0]))

Result: 9141/10000
