<a href="https://colab.research.google.com/github/IvanovMaxim2000/AI_Course_work/blob/main/MPI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MPI


## Краткое описание

Message Passing Interface (MPI, интерфейс передачи сообщений) — программный интерфейс (API) для передачи информации, который позволяет обмениваться сообщениями между процессами, выполняющими одну задачу. Разработан Уильямом Гроуппом, Эвином Ласком (англ.) и другими.

MPI является наиболее распространённым стандартом интерфейса обмена данными в параллельном программировании, существуют его реализации для большого числа компьютерных платформ. Используется при разработке программ для кластеров и суперкомпьютеров. Основным средством коммуникации между процессами в MPI является передача сообщений друг другу.

Стандартизацией MPI занимается MPI Forum. В стандарте MPI описан интерфейс передачи сообщений, который должен поддерживаться как на платформе, так и в приложениях пользователя. В настоящее время существует большое количество бесплатных и коммерческих реализаций MPI. Существуют реализации для языков Фортран 77/90, Java, Си и C++.

В первую очередь MPI ориентирован на системы с распределенной памятью, то есть когда затраты на передачу данных велики, в то время как OpenMP ориентирован на системы с общей памятью (многоядерные с общим кэшем). Обе технологии могут использоваться совместно, чтобы оптимально использовать в кластере многоядерные системы.

## Постановка задачи:
Необходимо провести ансамблевое голосование на управляющем ядре. Для этого отправляется отправляется модель и часть датасета на обучение всем процессам. После чего результаты предсказаний отправляются на нулевую ноду, и проводится голосование за более встречающийся результат.



## Установка mpi4py

In [2]:
! pip install mpi4py

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mpi4py
  Downloading mpi4py-3.1.4.tar.gz (2.5 MB)
[K     |████████████████████████████████| 2.5 MB 8.2 MB/s 
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (PEP 517) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.4-cp38-cp38-linux_x86_64.whl size=4438509 sha256=80f0f2c37f57b3de99c993577f0cb43eb3868674644306905dea0b59802f0890
  Stored in directory: /root/.cache/pip/wheels/f3/35/48/0b9a7076995eea5ea64a7e4bc3f0f342f453080795276264e7
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.4


## Импорт библиотек

In [104]:
%%writefile main.py

import torch
import torchtext
import sklearn
import numpy as np
import mpi4py
import gc

Writing main.py


In [105]:
%%writefile -a main.py

from mpi4py import MPI

from torch import nn
from torch.optim import Adam
from torch.nn import functional as F
from torch.utils.data import DataLoader
import keras
import keras.utils
from keras import utils as np_utils
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.functional import to_map_style_dataset
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import tensorflow.keras as tk
import operator
import torch.utils.data as data_utils
from torch.utils.data.dataset import random_split


Appending to main.py


In [106]:
%%writefile -a main.py
from torchvision import datasets
from torchvision.transforms import ToTensor
train_data = datasets.MNIST(
    root = 'data',
    train = True,                         
    transform = ToTensor(), 
    download = True,            
)
test_data = datasets.MNIST(
    root = 'data', 
    train = False, 
    transform = ToTensor()
)

Appending to main.py


## Визуализация экземпляра данных

In [107]:
%%writefile -a main.py
import matplotlib.pyplot as plt
plt.imshow(train_data.data[0], cmap='gray')
plt.title('%i' % train_data.targets[0])
plt.show()

Appending to main.py


## Подготовка данных для обучения с помощью DataLoaders

In [108]:
%%writefile -a main.py
from torch.utils.data import DataLoader
loaders = {
    'train' : torch.utils.data.DataLoader(train_data, 
                                          batch_size=100, 
                                          shuffle=True, 
                                          num_workers=1),
    
    'test'  : torch.utils.data.DataLoader(test_data, 
                                          batch_size=100, 
                                          shuffle=True, 
                                          num_workers=1),
}
loaders

Appending to main.py


## Задаем модель сверточной нейронной сети

in_channels (int) — Количество каналов во входном изображении

out_channels (int) — Количество каналов, полученных сверткой

kernel_size (int or tuple) — Размер свертывающегося ядра

stride (int or tuple, optional) — Шаг свертки. Default: 1

padding (int or tuple, optional) — Нулевое заполнение добавлено к обеим сторонам ввода. Default: 0

padding_mode (string, optional) — ‘zeros’, ‘reflect’, ‘replicate’ or ‘circular’. Default: ‘zeros’

dilation (int or tuple, optional) — Расстояние между элементами ядра. Default: 1

groups (int, optional) —Количество заблокированных соединений от входных каналов к выходным каналам. Default: 1

bias (bool, optional) — Если True, к выходным данным добавляется обучаемое смещение.. Default: True

In [109]:
%%writefile -a main.py
import torch.nn as nn
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(         
            nn.Conv2d(
                in_channels=1,              
                out_channels=16,            
                kernel_size=5,              
                stride=1,                   
                padding=2,                  
            ),                              
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=2),    
        )
        self.conv2 = nn.Sequential(         
            nn.Conv2d(16, 32, 5, 1, 2),     
            nn.ReLU(),                      
            nn.MaxPool2d(2),                
        )
        # fully connected layer, output 10 classes
        self.out = nn.Linear(32 * 7 * 7, 10)
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
        x = x.view(x.size(0), -1)       
        output = self.out(x)
        return output, x    # return x for visualization

Appending to main.py


In [110]:
%%writefile -a main.py
cnn = CNN()

Appending to main.py


## Определяем функции потерь

In [111]:
%%writefile -a main.py
loss_func = nn.CrossEntropyLoss()   

Appending to main.py


## Определяем функцию оптимизации

In [112]:
%%writefile -a main.py
from torch import optim
optimizer = optim.Adam(cnn.parameters(), lr = 0.01)   

Appending to main.py


## Обучение модели

In [113]:
%%writefile -a main.py
from torch.autograd import Variable
num_epochs = 10
def train(train_num, num_epochs, cnn, loaders_trains):
  loss_func = nn.CrossEntropyLoss()
  optimizer = optim.Adam(cnn.parameters(), lr = 0.01)
  cnn.train()

  total_step = len(loaders_trains)

  for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(loaders_trains):
      batch_x = Variable(images)
      batch_y = Variable(labels)

      output = cnn(batch_x)[0]
      loss = loss_func(output, batch_y)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      if (i + 1) % 100 == 0:
        print('Train num {}, Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(train_num, 
                                                                              epoch + 1, 
                                                                              num_epochs, 
                                                                              i + 1,
                                                                              total_step,
                                                                              loss.item()))
        pass
      pass
    pass

Appending to main.py


## Оценка модели на тестовых данных

In [114]:
%%writefile -a main.py
def evaluate(cnn_for_test):
  cnn_for_test.eval()

  with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in loaders['test']:
      test_output, last_layer = cnn_for_test(images)
      predicted = torch.max(test_output, 1)[1].data.squeeze()
      accuracy = (predicted == labels).sum().item() / float(labels.size(0))
      pass
    print('Test accuracy : %.2f' % accuracy)
    pass

Appending to main.py


## Подготовка к использованию MPI

In [115]:
%%writefile -a main.py
comm = MPI.COMM_WORLD 
numprocs = comm.Get_size()
rank = comm.Get_rank() 

Appending to main.py


## Установка датасета

In [116]:
%%writefile -a main.py
train_data = datasets.MNIST(root = 'data', train = True, 
                            transform = ToTensor(), download = True)

test_data = datasets.MNIST(root = 'data', train = False, 
                          transform = ToTensor())

train_part = int(len(train_data) / 3)


image_indices = torch.arange(6000)
cut_train = data_utils.Subset(train_data, image_indices)
trains = random_split(cut_train, [2000, 2000, 2000])


loaders = {'train1' : DataLoader(trains[0],
                              batch_size = 100,
                              shuffle = True,
                              num_workers = 1),
          'train2' : DataLoader(trains[1],
                              batch_size = 100,
                              shuffle = True,
                              num_workers = 1),
          'train3' : DataLoader(trains[2],
                              batch_size = 100,
                              shuffle = True,
                              num_workers = 1),
          'test' : DataLoader(test_data,
                              batch_size = 100,
                              shuffle = True,
                              num_workers = 1)}

Appending to main.py


## Применение MPI

In [117]:
%%writefile -a main.py
printing_numbers = 25

if rank != 0:

  'Получаем объекты от нулевого процесса: модель, и части объема данных'
  cnn = comm.recv(source = 0)
  subset = comm.recv(source = 0)
  train_num = comm.recv(source = 0)

  num_epochs = 5

  train(train_num, num_epochs, cnn, subset)
  evaluate(cnn)

  'Получаем изображение и делаем предсказание'
  images = comm.recv(source = 0) 

  test_output, last_layer = cnn(images[:printing_numbers])
  predicted = torch.max(test_output, 1)[1].data.numpy().squeeze()
  preds_proc = comm.send(predicted, dest = 0)

else:
  'Отправляем другим процессам модель и части датасета'
  for k in range(1, numprocs):
    cnn = CNN()
    train_num = 'train' + str(k)
    comm.send(cnn, dest = k) 
    comm.send(loaders[train_num], dest = k) 
    comm.send(train_num, dest = k) 

  sample = next(iter(loaders['test']))
  images, labels = sample
  actual_number = labels[:printing_numbers].numpy()

  'Отправляем изображение, которое хотим предугадать'
  for k in range(1, numprocs):
    comm.send(images, dest = k)

  print(f'Actual number: {actual_number}')

  'Получаем и записываем полученные результаты предсказания с других процессов'
  predicted_numbers = []
  for k in range(1, numprocs):
    predicted = comm.recv(source = k)
    predicted_numbers.append(predicted)

  for k in range(1, numprocs):
    print(f'[{k}] Prediction number: {predicted_numbers[k - 1]}')

  'Голосавание ансамбля'
  predicted_result = []  
  for num in range(len(predicted_numbers[0])):
    check_dict = {}
    for k in range(1, numprocs):
      current_number = predicted_numbers[k - 1][num] 
      num_count = check_dict.get(current_number, 0)
      num_count = num_count + 1
      check_dict[current_number] = num_count

    max_vote_num = max(check_dict.items(), key = operator.itemgetter(1))[0]
    predicted_result.append(max_vote_num)

  print(f'Voting results: {predicted_result}'.replace(',', ''))
  
MPI.Finalize()

Appending to main.py


In [118]:
!mpirun -np 4 --allow-run-as-root python main.py

Figure(640x480)
Figure(640x480)
Figure(640x480)
Figure(640x480)
Test accuracy : 0.90
Test accuracy : 0.93
Test accuracy : 0.94
Actual number: [2 6 7 9 6 7 5 8 7 7 4 6 7 7 3 7 2 9 4 8 6 1 5 5 8]
[1] Prediction number: [2 6 7 9 6 7 5 8 7 7 4 6 7 7 3 7 2 9 4 8 6 1 5 5 8]
[2] Prediction number: [2 6 7 9 6 7 5 8 7 7 4 6 7 7 3 7 2 9 4 8 6 1 5 5 8]
[3] Prediction number: [2 6 7 9 6 7 5 8 7 7 4 4 7 7 3 9 2 9 4 8 6 1 5 5 2]
Voting results: [2 6 7 9 6 7 5 8 7 7 4 6 7 7 3 7 2 9 4 8 6 1 5 5 8]
