# Выполнение в несколько потоков

In [1]:
from multiprocessing import Pool, current_process

### Пример работы с Pool

In [2]:
import time

In [3]:
def process_step(step):
    print('Начинаю шаг {}, worker {}'.format(step, current_process()))
    time.sleep(1)
    print('Закончил шаг {}'.format(step))

In [4]:
%%time
for step in range(10):
    process_step(step)

Начинаю шаг 0, worker <_MainProcess(MainProcess, started)>
Закончил шаг 0
Начинаю шаг 1, worker <_MainProcess(MainProcess, started)>
Закончил шаг 1
Начинаю шаг 2, worker <_MainProcess(MainProcess, started)>
Закончил шаг 2
Начинаю шаг 3, worker <_MainProcess(MainProcess, started)>
Закончил шаг 3
Начинаю шаг 4, worker <_MainProcess(MainProcess, started)>
Закончил шаг 4
Начинаю шаг 5, worker <_MainProcess(MainProcess, started)>
Закончил шаг 5
Начинаю шаг 6, worker <_MainProcess(MainProcess, started)>
Закончил шаг 6
Начинаю шаг 7, worker <_MainProcess(MainProcess, started)>
Закончил шаг 7
Начинаю шаг 8, worker <_MainProcess(MainProcess, started)>
Закончил шаг 8
Начинаю шаг 9, worker <_MainProcess(MainProcess, started)>
Закончил шаг 9
CPU times: user 11.9 ms, sys: 4.55 ms, total: 16.4 ms
Wall time: 10 s


In [5]:
%%time
with Pool(processes=10) as p:
    p.map(process_step, range(10))

Начинаю шаг 1, worker <ForkProcess(ForkPoolWorker-2, started daemon)>
Начинаю шаг 3, worker <ForkProcess(ForkPoolWorker-4, started daemon)>
Начинаю шаг 4, worker <ForkProcess(ForkPoolWorker-5, started daemon)>
Начинаю шаг 0, worker <ForkProcess(ForkPoolWorker-1, started daemon)>
Начинаю шаг 2, worker <ForkProcess(ForkPoolWorker-3, started daemon)>
Начинаю шаг 5, worker <ForkProcess(ForkPoolWorker-6, started daemon)>
Начинаю шаг 6, worker <ForkProcess(ForkPoolWorker-7, started daemon)>
Начинаю шаг 7, worker <ForkProcess(ForkPoolWorker-8, started daemon)>
Начинаю шаг 9, worker <ForkProcess(ForkPoolWorker-10, started daemon)>
Начинаю шаг 8, worker <ForkProcess(ForkPoolWorker-9, started daemon)>
Закончил шаг 3
Закончил шаг 1
Закончил шаг 4
Закончил шаг 0
Закончил шаг 6
Закончил шаг 2
Закончил шаг 5
Закончил шаг 7
Закончил шаг 9
Закончил шаг 8
CPU times: user 28.5 ms, sys: 40.1 ms, total: 68.6 ms
Wall time: 1.21 s


### Стемминг через Pool

In [6]:
import time
from nltk.stem.snowball import SnowballStemmer

In [7]:
def process_line(line):
    keyword, shows = line.strip().split(',')
    return [stemmer.stem(word) for word in keyword.split(' ')]

In [8]:
%%time
stemmer = SnowballStemmer("russian")
result = []

with open('keywords.csv') as f:
    result = [process_line(line) for line in f]
        
result[:10]

CPU times: user 15.5 s, sys: 158 ms, total: 15.7 s
Wall time: 17.2 s


[['keyword'],
 ['вк'],
 ['одноклассник'],
 ['порн'],
 ['ютуб'],
 ['вконтакт'],
 ['одноклассник', 'мо', 'страниц'],
 ['майл'],
 ['авит'],
 ['переводчик']]

In [9]:
%%time
f = open('keywords.csv')
stemmer = SnowballStemmer("russian")

with Pool(5) as p:
    result = p.map(process_line, f)
    
result[:10]

CPU times: user 298 ms, sys: 125 ms, total: 423 ms
Wall time: 11.7 s


[['keyword'],
 ['вк'],
 ['одноклассник'],
 ['порн'],
 ['ютуб'],
 ['вконтакт'],
 ['одноклассник', 'мо', 'страниц'],
 ['майл'],
 ['авит'],
 ['переводчик']]

# Сложные объединения
Необходимо объединить таблицы left_table и right_table с учетом условий:
- если числовое значение из right_table укладывается в интервал из left_table, то добавляем столбец из right_table
- если значение из right_table вне интервала из left_table, но меньше 200, то подставить 0
- если значение из right_table больше 200 или нет соответствия, то исключить строчку из результата полностью
- таблица right_table влазит в RAM, левая - нет

In [10]:
left_table = [
    ['a', 10, 20],
    ['b', 30, 40],
    ['c', 50, 60],
]

In [11]:
right_table = [
    ['a', 15],
    ['b', 100],
    ['c', 300],
]

In [12]:
right_dict = {rec[0]: rec[1] for rec in right_table}
right_dict

{'a': 15, 'b': 100, 'c': 300}

In [13]:
def apply_rules(left_table_line):
    key, left_border, right_border = left_table_line
    
    # условие 3 (если есть соответствие)
    if key in right_dict:
        value_from_right_table = right_dict[key]
        
        # условие 3 (если значение < 200)
        if value_from_right_table < 200:
            if left_border < value_from_right_table < right_border:
                return value_from_right_table
            else:
                return 0

In [14]:
for line in left_table:
    right_value = apply_rules(line)
    if right_value is not None:
        line.append(right_value)
        print(line)

['a', 10, 20, 15]
['b', 30, 40, 0]
