### Parallel GloVe
https://github.com/stanfordnlp/GloVe

In [1]:
%env MKL_NUM_THREADS=1
%env NUMEXPR_NUM_THREADS=1
%env OMP_NUM_THREADS=1
# set numpy to single-threaded mode for benchmarking

!pip install --upgrade nltk datasets tqdm
!wget https://raw.githubusercontent.com/mryab/efficient-dl-systems/main/week02_distributed/utils.py -O utils.py

import time, random
import multiprocessing as mp
import numpy as np
from tqdm import tqdm, trange
from IPython.display import clear_output
import matplotlib.pyplot as plt
%matplotlib inline
clear_output()

### Multiprocessing basics

In [24]:
def foo(i):
    """ Imagine particularly computation-heavy function... """
    print(end=f"Began foo({i})...\n")
    result = np.sin(i)
    time.sleep(abs(result))
    # time.sleep(5)
    print(end=f"Finished foo({i}) = {result:.3f}.\n")
    return result

In [3]:
%%time
results_naive = [foo(i) for i in range(10)]
clear_output()

CPU times: user 47.3 ms, sys: 7.04 ms, total: 54.3 ms
Wall time: 5.96 s


Создание процесса

In [9]:
# Правда мы можем создать процесс
proc = mp.Process(target = foo, args  = [5])

In [10]:
proc.start()

Began foo(5)...


In [11]:
proc.join() # Ждем пока процесс выполнится

Создание класса для работы с процессами

Главное реализовать метод **run**

In [12]:
# Когда логика немного сложнее, чем одна функция, можно воспользоваться созданием класса
class DoerOfSomething(mp.Process):
  def __init__(self, ):
    super().__init__()

  def run(self):
    i = 5
    print(end=f"Began foo({i})...\n")
    result = np.sin(i)
    time.sleep(abs(result))
    time.sleep(5)
    print(end=f"Finished foo({i}) = {result:.3f}.\n")
    return result

In [19]:
proc = DoerOfSomething()

In [20]:
proc.start()
proc.join()

Began foo(5)...
Finished foo(5) = -0.959.


Как мы могли наблюдать простая реализация функции foo в среднем занимает 5 секунд

Давайте ускорим это с помощью параллелизма

In [26]:
%%time 
processes = []
# создали наши процессы
for i in range(10):
  proc = mp.Process(target = foo, args = [i])
  processes.append(proc)
print(f'Мы создали {len(processes)} процессов')

# Стартуем вычислять их параллельно

for i in range(10):
  processes[i].start()
for i in range(10):
  processes[i].join()
clear_output()

# 5 секунд превратились в 1....

CPU times: user 48.1 ms, sys: 97.7 ms, total: 146 ms
Wall time: 1.14 s


Процесс мы ускорили, теперь задача вытаскивать файл из самого процесса. Для этого нам поможет pipe.


Немного про pipe.

* pipe.send(data) - отправка данных в трубу
* data = pipe.recv() - чтение данных из трубы

Правила: 
* Каждая сторона должна контролироваться одним процессом
* Данные должны быть serializable
* `duplex=True` - процесс может коммуницировать с двух сторон

In [28]:
side_a, side_b = mp.Pipe()

side_a.send(123)
side_a.send({'olo': np.random.randn(3)})

print('side_b.recv() -> ', side_b.recv())
print('side_b.recv() -> ', side_b.recv())

side_b.recv() ->  123
side_b.recv() ->  {'olo': array([-0.77744094,  0.51581657,  0.27863038])}


Чтобы отдавать во время выполнении функции данные, можно подать аргументом сам pipe

In [35]:
def compute_and_send(i, output_pipe):
  print(end = f'Начал вычисления ({i})...\n')
  result = np.sin(i)
  time.sleep(abs(result))
  print(end = f'Закончил вычисления и отправил({i}) = {result:.3f}.\n')
  output_pipe.send(result)


In [36]:
%%time 

result_pipes = []

for i in range(10):
  side_A, side_B = mp.Pipe(duplex = False)
  # Side_B - может только отправлять 
  # Side_A - только читать
  result_pipes.append(side_A)
  proc = mp.Process(target = compute_and_send, args = [i, side_B])
  proc.start()

print('Главный процесс: ждет результаты...')
for pipe in result_pipes:
  print(f'Главный процесс получил: received {pipe.recv()}')
print('Главный процесс готов!')

Начал вычисления (0)...
Закончил вычисления и отправил(0) = 0.000.
Начал вычисления (1)...
Начал вычисления (2)...
Начал вычисления (3)...
Начал вычисления (4)...
Начал вычисления (5)...
Начал вычисления (6)...
Начал вычисления (7)...
Начал вычисления (8)...
Начал вычисления (9)...
Закончил вычисления и отправил(3) = 0.141.
Главный процесс: ждет результаты...
Главный процесс получил: received 0.0
Закончил вычисления и отправил(6) = -0.279.
Закончил вычисления и отправил(9) = 0.412.
Закончил вычисления и отправил(7) = 0.657.
Закончил вычисления и отправил(4) = -0.757.
Закончил вычисления и отправил(1) = 0.841.
Закончил вычисления и отправил(2) = 0.909.
Закончил вычисления и отправил(5) = -0.959.
Главный процесс получил: received 0.8414709848078965
Главный процесс получил: received 0.9092974268256817
Главный процесс получил: received 0.1411200080598672
Главный процесс получил: received -0.7568024953079282
Главный процесс получил: received -0.9589242746631385
Главный процесс получил: rece

In [31]:
side_A, side_B = mp.Pipe(duplex = False)
side_A

<multiprocessing.connection.Connection at 0x7f8351aa4760>

Также можно использовать специальные шаблоны: Queue

Queue - позволяет создать объект, к которому обращаются несколько процессов параеллельно.

* queue.put() - добавить значение в очередь, которое доступно для всех других процессов

* queue.get() - возвращение ранее добавленного значения и удаление оного из очереди.


In [5]:
queue = mp.Queue()

def func_A(queue):
  print('A: Ждет очередь...')
  print('A: возвращает значение из очереди', queue.get())
  print('A: Ждет очередь...')
  print('A: возвращает значение из очереди', queue.get())
  print('A: завершилось!')

def func_B(i, queue):
  np.random.seed()
  value = np.random.rand()
  time.sleep(value)
  print(f'proc_B{i}: передаю значение в очередь')
  queue.put(value)

proc_A = mp.Process(target = func_A, args = [queue])
proc_A.start()

proc_B1 = mp.Process(target = func_B, args = [1, queue])
proc_B2 = mp.Process(target = func_B, args = [2, queue])

proc_B1.start()
proc_B2.start()

proc_A.join()

A: Ждет очередь...
proc_B2: передаю значение в очередь
A: возвращает значение из очереди 0.5404923506004935
A: Ждет очередь...
proc_B1: передаю значение в очередь
A: возвращает значение из очереди 0.6578486185522561
A: завершилось!


Главные отличия `pipe` от `queue`

* Pipe быстрее для коммуникации 1v1
* Queue поддерживают произвольное количество процессов
* Queue может осуществляться вместе pipes

### Glove preprocessing 

Создаим  co-occurence

In [8]:
import datasets
data = datasets.load_dataset('wikitext', 'wikitext-103-raw-v1')
clear_output()
print("Example:", data['train']['text'][5])

Example:  It met with positive sales in Japan , and was praised by both Japanese and western critics . After release , it received downloadable content , along with an expanded edition in November of that year . It was also adapted into manga and an original video animation series . Due to low sales of Valkyria Chronicles II , Valkyria Chronicles III was not localized , but a fan translation compatible with the game 's expanded edition was released in 2014 . Media.Vision would return to the franchise with the development of Valkyria : Azure Revolution for the PlayStation 4 . 



In [9]:
from collections import Counter
from nltk.tokenize import NLTKWordTokenizer
tokenizer = NLTKWordTokenizer()

def count_tokens(lines, top_k=None):
    """ Tokenize lines and return top_k most frequent tokens and their counts """
    sent_tokens = tokenizer.tokenize_sents(map(str.lower, lines))
    token_counts = Counter([token for sent in sent_tokens for token in sent])
    return Counter(dict(token_counts.most_common(top_k)))

count_tokens(data['train']['text'][:100], top_k=10)

Counter({'the': 459,
         ',': 349,
         '.': 225,
         'of': 193,
         'to': 150,
         'and': 147,
         'in': 104,
         '@': 100,
         'a': 93,
         'was': 83})

In [15]:
%%time
# sequential algorithm 
texts = data['train']['text'][:100_000]
vocabulary_size = 32_000
batch_size = 10000

token_counts = Counter()

for batch_start in trange(0, len(texts), batch_size):
  batch_texts = texts[batch_start: batch_start + batch_size]
  batch_counts = count_tokens(batch_texts, top_k = vocabulary_size)
  token_counts += batch_counts
clear_output()
token_counts_reference = Counter(token_counts)

100%|██████████| 10/10 [00:27<00:00,  2.79s/it]

CPU times: user 29.5 s, sys: 341 ms, total: 29.9 s
Wall time: 30.8 s





Давайте попорубем распараллелить нашу задачу

Создадим 2 очереди: в одной будут лежать номера текстов, в другой будут складывать обработанные счетчики слов

Почему номера а не сами текстов: копирование самих текстов дорого по памяти

In [26]:
texts = data['train']['text'][: 100_000]
vocabulary_size = 32_000
batch_size = 100

input_queue = mp.Queue()
output_queue = mp.Queue()

for batch_start in range(0, len(texts), batch_size):
  input_queue.put((batch_start, batch_start + batch_size))

def worker_function():
  # дает процессу неотработанный текст (индекс текста)
  batch_start, batch_end = input_queue.get()
  print(f'Я взял кусочек с {batch_start} до {batch_end}\n')
  # обработать их 
  word_counter = count_tokens(
      texts[batch_start: batch_end],
      top_k = vocabulary_size)
  # передача их в очередь
  output_queue.put(word_counter)

In [27]:
for i in range(4):
  proc = mp.Process(target = worker_function, args = [])
  proc.start()

proc.join()

Я взял кусочек с 0 до 100

Я взял кусочек с 100 до 200

Я взял кусочек с 200 до 300

Я взял кусочек с 300 до 400



In [19]:
%%time
side_A, side_B = mp.Pipe(duplex=False)
side_C, side_D = mp.Pipe(duplex=False)
def foo_1(pipe):
  # sequential algorithm 
  texts = data['train']['text'][:50_000]
  vocabulary_size = 32_000
  batch_size = 10000
  token_counts = Counter()
  for batch_start in range(0, len(texts), batch_size):
    batch_texts = texts[batch_start: batch_start + batch_size]
    batch_counts = count_tokens(batch_texts, top_k = vocabulary_size)
    token_counts += batch_counts
  pipe.send(token_counts)
def foo_2(pipe):
  # sequential algorithm 
  texts = data['train']['text'][50_000:100_000]
  vocabulary_size = 32_000
  batch_size = 10000
  token_counts = Counter()
  for batch_start in range(0, len(texts), batch_size):
    batch_texts = texts[batch_start: batch_start + batch_size]
    batch_counts = count_tokens(batch_texts, top_k = vocabulary_size)
    token_counts += batch_counts
  pipe.send(token_counts)

proc_A = mp.Process(target = foo_1, args = [side_B])
proc_B= mp.Process(target = foo_2, args = [side_D])

proc_A.start()
proc_B.start()

proc_A.join()
#proc_B.join()

Process Process-14:
Process Process-13:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<timed exec>", line 15, in foo_1
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<timed exec>", line 26, in foo_2
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 405, in _send_bytes
    self._send(buf)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3

KeyboardInterrupt: ignored