В формулировке заданий будет использоваться понятие **worker**. Это слово обозначает какую-то единицу параллельного выполнения, в случае питона это может быть **поток** или **процесс**, выбирайте то, что лучше будет подходить к конкретной задаче

# Задание 1 (7 баллов)

В одном из заданий по ML от вас требовалось написать кастомную реализацию Random Forest. Её проблема состоит в том, что она работает медленно, так как использует всего один поток для работы. Добавление параллельного программирования в код позволит получить существенный прирост в скорости обучения и предсказаний.

В данном задании от вас требуется добавить возможность обучать случайный лес параллельно и использовать параллелизм для предсказаний. Для этого вам понадобится:
1. Добавить аргумент `n_jobs` в метод `fit`. `n_jobs` показывает количество worker'ов, используемых для распараллеливания
2. Добавить аргумент `n_jobs` в методы `predict` и `predict_proba`
3. Реализовать функционал по распараллеливанию в данных методах

В результате код `random_forest.fit(X, y, n_jobs=2)` и `random_forest.predict(X, y, n_jobs=2)` должен работать в ~1.5-2 раза быстрее, чем `random_forest.fit(X, y, n_jobs=1)` и `random_forest.predict(X, y, n_jobs=1)` соответственно

Если у вас по каким-то причинам нет кода случайного леса из ДЗ по ML, то вы можете написать его заново или попросить у однокурсника. *Детали* реализации ML части оцениваться не будут, НО, если вы поломаете логику работы алгоритма во время реализации параллелизма, то за это будут сниматься баллы

В задании можно использовать только модули из **стандартной библиотеки** питона, а также функции и классы из **sklearn** при помощи которых вы изначально писали лес

In [123]:
from joblib import Parallel, delayed
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import make_classification
import numpy as np

class RandomForestClassifierCustom:
    def __init__(self, n_estimators: int = 100, max_depth: int = None, max_features: int = 'auto',
                 random_state: int = None):
        self.n_estimators: int = n_estimators
        self.max_depth: int = max_depth
        self.max_features: int = max_features
        self.random_state: int = random_state
        self.trees: list = []
        self.feat_ids_by_tree: list = []
        self.classes_: np.ndarray = None
        self.X: np.ndarray = None
        self.y: np.ndarray = None

    def _fitting_model(self, idx: int) -> tuple:
        rng: np.random.Generator = np.random.default_rng(self.random_state + idx)
        features_ids: np.ndarray = rng.choice(self.X.shape[1], size=self.max_features, replace=False)
        pseudosampling_ids: np.ndarray = rng.choice(self.X.shape[0], size=self.X.shape[0], replace=True)
        pseudo_X: np.ndarray = self.X[pseudosampling_ids][:, features_ids]
        pseudo_y: np.ndarray = self.y[pseudosampling_ids]

        tree_class: DecisionTreeClassifier = DecisionTreeClassifier(max_depth=self.max_depth,
                                            max_features=self.max_features,
                                            random_state=self.random_state,
                                            )
        fitted_tree: DecisionTreeClassifier = tree_class.fit(pseudo_X, pseudo_y)
        return fitted_tree, features_ids

    def fit(self, X: np.ndarray, y: np.ndarray, n_jobs: int = 1) -> object:
        self.X: np.ndarray = X
        self.y: np.ndarray = y
        self.classes_: np.ndarray = sorted(np.unique(self.y))
        processes = Parallel(n_jobs=n_jobs)(delayed(self._fitting_model)(idx) for idx in range(self.n_estimators))
        self.trees, self.feat_ids_by_tree = zip(*processes)
        return self

    def _prediction_calc(self, tree_and_feat_ids: tuple) -> np.ndarray:
        tree, feat_ids = tree_and_feat_ids
        pred: np.ndarray = tree.predict_proba(self.X[:, feat_ids])
        return pred

    def predict_proba(self, X: np.ndarray, n_jobs: int = 1) -> np.ndarray:
        self.X: np.ndarray = X
        y_pred: np.ndarray = np.zeros((self.X.shape[0], len(self.classes_)))
        processes = Parallel(n_jobs=n_jobs)(delayed(self._prediction_calc)(tf) for tf in zip(self.trees, self.feat_ids_by_tree))
        y_pred: np.ndarray = sum(processes)
        return y_pred

    def predict(self, X: np.ndarray, n_jobs: int = 1) -> np.ndarray:
        probas: np.ndarray = self.predict_proba(X, n_jobs)
        predictions: np.ndarray = np.argmax(probas, axis=1)
        return predictions


X, y = make_classification(n_samples=100000)

In [124]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [125]:
%%time

_ = random_forest.fit(X, y, n_jobs=1)

CPU times: user 5.29 s, sys: 52.6 ms, total: 5.34 s
Wall time: 5.34 s


In [126]:
%%time

preds_1 = random_forest.predict(X, n_jobs=1)

CPU times: user 171 ms, sys: 6.41 ms, total: 177 ms
Wall time: 176 ms


In [127]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [128]:
%%time

_ = random_forest.fit(X, y, n_jobs=2)

CPU times: user 31.4 ms, sys: 41.3 ms, total: 72.7 ms
Wall time: 3.44 s


In [129]:
%%time

preds_2 = random_forest.predict(X, n_jobs=2)

CPU times: user 47 ms, sys: 58.4 ms, total: 105 ms
Wall time: 372 ms


In [132]:
(preds_1 == preds_2).all()   # Количество worker'ов не должно влиять на предсказания

True

#### Какие есть недостатки у вашей реализации параллельного Random Forest (если они есть)? Как это можно исправить? Опишите словами, можно без кода (+1 дополнительный балл)

Ответ пишите тут

# Задание 2 (9 баллов)

Напишите декоратор `memory_limit`, который позволит ограничивать использование памяти декорируемой функцией.

Декоратор должен принимать следующие аргументы:
1. `soft_limit` - "мягкий" лимит использования памяти. При превышении функцией этого лимита должен будет отображён **warning**
2. `hard_limit` - "жёсткий" лимит использования памяти. При превышении функцией этого лимита должно будет брошено исключение, а функция должна немедленно завершить свою работу
3. `poll_interval` - интервал времени (в секундах) между проверками использования памяти

Требования:
1. Потребление функцией памяти должно отслеживаться **во время выполнения функции**, а не после её завершения
2. **warning** при превышении `soft_limit` должен отображаться один раз, даже если функция переходила через этот лимит несколько раз
3. Если задать `soft_limit` или `hard_limit` как `None`, то соответствующий лимит должен быть отключён
4. Лимиты должны передаваться и отображаться в формате `<number>X`, где `X` - символ, обозначающий порядок единицы измерения памяти ("B", "K", "M", "G", "T", ...)
5. В тексте warning'ов и исключений должен быть указан текщий объём используемой памяти и величина превышенного лимита

В задании можно использовать только модули из **стандартной библиотеки** питона, можно писать вспомогательные функции и/или классы

В коде ниже для вас предопределены некоторые полезные функции, вы можете ими пользоваться, а можете не пользоваться

In [1]:
import os
import psutil
import time
import warnings

def get_memory_usage() -> int:    # Показывает текущее потребление памяти процессом
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return mem_info.rss


def bytes_to_human_readable(n_bytes:int) -> str:
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in reversed(symbols):
        if n_bytes >= prefix[s]:
            value = float(n_bytes) / prefix[s]
            return f"{value:.2f}{s}"
    return f"{n_bytes}B"

def memory_limit(soft_limit: Optional[str] = None, hard_limit: Optional[str] = None, poll_interval: float = 1) -> Callable:
    def decorator(func: Callable) -> Callable:
        def wrapper(*args, **kwargs):
            def print_warning() -> int:
                mem = get_memory_usage()
                if mem > soft_limit_bytes and not warning_printed[0]:
                    warnings.warn(f"Memory usage ({bytes_to_human_readable(mem)}) is above the soft limit ({bytes_to_human_readable(soft_limit_bytes)})")
                    warning_printed[0] = True
                return mem
            
            def check_hard_limit():
                mem = get_memory_usage()
                if mem > hard_limit_bytes:
                    raise MemoryError(f"Memory usage ({bytes_to_human_readable(mem)}) is above the hard limit ({bytes_to_human_readable(hard_limit_bytes)})")

            warning_printed = [False]   # Флаг, показывающий, было ли уже напечатано warning при превышении soft_limit
            soft_limit_bytes = None if soft_limit is None else int(soft_limit[:-1]) * (1024 ** "BKMGT".index(soft_limit[-1]))
            hard_limit_bytes = None if hard_limit is None else int(float(hard_limit[:-1]) * (1024 ** "BKMGT".index(hard_limit[-1])))
            
            t = time.time()
            max_mem = 0
            while True:
                try:
                    mem = print_warning()
                    check_hard_limit()
                    max_mem = max(max_mem, mem)
                    result = func(*args, **kwargs)
                    return result
                finally:
                    time.sleep(poll_interval)
                    if time.time() - t > 60:  # Если функция выполняется более 60 секунд, перестаём отслеживать память
                        break
            raise Exception(f"Memory usage ({bytes_to_human_readable(max_mem)}) exceeded the soft limit ({bytes_to_human_readable(soft_limit_bytes)})")
        return wrapper
    return decorator

In [4]:
@memory_limit(soft_limit="512M", hard_limit="1.5G", poll_interval=0.1)
def memory_increment():
    """
    Функция для тестирования
    
    В течение нескольких секунд достигает использования памяти 1.89G
    Потребление памяти и скорость накопления можно варьировать, изменяя код
    """
    lst = []
    for i in range(50000000):
        if i % 500000 == 0:
            time.sleep(0.1)
        lst.append(i)
    return lst


In [5]:
result = memory_increment()
print(memory_limit(result))

<function memory_limit.<locals>.decorator at 0x1055689d0>


# Задание 3 (11 баллов)

Напишите функцию `parallel_map`. Это должна быть **универсальная** функция для распараллеливания, которая эффективно работает в любых условиях.

Функция должна принимать следующие аргументы:
1. `target_func` - целевая функция (обязательный аргумент)
2. `args_container` - контейнер с позиционными аргументами для `target_func` (по-умолчанию `None` - позиционные аргументы не передаются)
3. `kwargs_container` - контейнер с именованными аргументами для `target_func` (по-умолчанию `None` - именованные аргументы не передаются)
4. `n_jobs` - количество workers, которые будут использованы для выполнения (по-умолчанию `None` - количество логических ядер CPU в системе)

Функция должна работать аналогично `***PoolExecutor.map`, применяя функцию к переданному набору аргументов, но с некоторыми дополнениями и улучшениями
    
Поскольку мы пишем **универсальную** функцию, то нам нужно будет выполнить ряд требований, чтобы она могла логично и эффективно работать в большинстве ситуаций

1. `target_func` может принимать аргументы любого вида в любом количестве
2. Любые типы данных в `args_container`, кроме `tuple`, передаются в `target_func` как единственный позиционный аргумент. `tuple` распаковываются в несколько аргументов
3. Количество элементов в `args_container` должно совпадать с количеством элементов в `kwargs_container` и наоборот, также значение одного из них или обоих может быть равно `None`, в иных случаях должна кидаться ошибка (оба аргумента переданы, но размеры не совпадают)

4. Функция должна выполнять определённое количество параллельных вызовов `target_func`, это количество зависит от числа переданных аргументов и значения `n_jobs`. Сценарии могут быть следующие
    + `args_container=None`, `kwargs_container=None`, `n_jobs=None`. В таком случае функция `target_func` выполнится параллельно столько раз, сколько на вашем устройстве логических ядер CPU
    + `args_container=None`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **5** раз
    + `args_container=[1, 2, 3]`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **3** раза, несмотря на то, что `n_jobs=5` (так как есть всего 3 набора аргументов для которых нам нужно получить результат, а лишние worker'ы создавать не имеет смысла)
    + `args_container=None`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем именованные аргументы
    + `args_container=[1, 2, 3]`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем и позиционные, и именованные аргументы
    + `args_container=[1, 2, 3, 4]`, `kwargs_container=None`, `n_jobs=2`. В таком случае в каждый момент времени параллельно будет выполняться **не более 2** функций `target_func`, так как нам нужно выполнить её 4 раза, но у нас есть только 2 worker'а.
    + В подобных случаях (из примера выше) должно оптимизироваться время выполнения. Если эти 4 вызова выполняются за 5, 1, 2 и 1 секунды, то параллельное выполнение с `n_jobs=2` должно занять **5 секунд** (не 7 и тем более не 10)

5. `parallel_map` возвращает результаты выполнения `target_func` **в том же порядке**, в котором были переданы соответствующие аргументы
6. Работает с функциями, созданными внутри других функций

Для базового решения от вас не ожидается **сверххорошая** оптимизация по времени и памяти для всех возможных случаев. Однако за хорошо оптимизированную логику работы можно получить до **+3 дополнительных баллов**

Вы можете сделать класс вместо функции, если вам удобнее

В задании можно использовать только модули из **стандартной библиотеки** питона

Ниже приведены тестовые примеры по каждому из требований

In [22]:
def parallel_map(target_func,
                 args_container=None,
                 kwargs_container=None,
                 n_jobs=None):
    # Ваш код здесь

In [164]:
from typing import Union, List, Tuple, Dict, Any, Callable, Optional
import multiprocessing



class ParallelRunner:
    '''
    Universal class for parallelization across processes.
    '''
    def __init__(self,
                 target_funcs: Union[Callable, List[Callable]],
                 args_container: Optional[Union[List[Tuple], Tuple]] = None,
                 kwargs_container: Optional[List[Dict[str, Any]]] = None,
                 n_jobs: int = None) -> None:
        '''
        :param target_funcs: objective function or functions
        :param args_list: container with positional arguments numbers or tuples (by default None)
        :param kwargs_list: container with named arguments
        :param n_jobs: number of processes
        '''
        self.target_funcs = target_funcs
        self.args_container = args_container
        self.kwargs_container = kwargs_container
        self.n_jobs = n_jobs
        self.counter = 0

    def _compare_args_kwargs_len(self):
        if self.args_container is not None and self.kwargs_container is not None and len(
                self.args_container) != len(self.kwargs_container):
            raise ValueError('the number of positional and named arguments differs')

    def _fill_empty_args_kwargs(self):
        if self.args_container is None and self.kwargs_container is not None:
            self.args_container = [tuple() for _ in range(len(self.kwargs_container))]
        elif self.kwargs_container is None and self.args_container is not None:
            self.kwargs_container = [dict() for _ in range(len(self.args_container))]
        elif self.args_container is None and self.kwargs_container is None:
            self.args_container = [tuple() for _ in range(self.n_jobs)]
            self.kwargs_container = [dict() for _ in range(self.n_jobs)]

    def _check_n_jobs(self):
        if self.n_jobs is None:
            self.n_jobs = multiprocessing.cpu_count()
        self._fill_empty_args_kwargs()
        self.n_jobs = min(self.n_jobs, len(self.args_container))

    def _add_tasks_in_queue(self, queue: multiprocessing.Queue) -> multiprocessing.Queue:
        counter = 0
        for args, kwargs in zip(self.args_container, self.kwargs_container):
            queue.put((counter, args, kwargs))
            counter += 1
        return queue

    def _process_tasks(self, target_func: Callable, queue: multiprocessing.Queue, results_dict: multiprocessing.Manager) -> None:
        while not queue.empty():
            counter, args, kwargs = queue.get()
            if not isinstance(args, tuple):
                args = (args,)
            result = target_func(*args, **kwargs)
            results_dict[counter] = result
        return None
    def _check_n_funcs(self):
        if isinstance(self.target_funcs, list):
            self.target_funcs = self.target_funcs
        else:
            self.target_funcs = [self.target_funcs]

    def parallel_map(self) -> List[list]:
        self._compare_args_kwargs_len()
        self._check_n_funcs()
        self._check_n_jobs()
        processes = []
        results = []
        for func in self.target_funcs:
            queue = self._add_tasks_in_queue(multiprocessing.Queue())
            manager = multiprocessing.Manager()
            manager_dict = manager.dict()  # need to remember what the process is first, second, etc
            for job in range(self.n_jobs):
                process = multiprocessing.Process(target=self._process_tasks, args=(func, queue, manager_dict))
                processes.append(process)
                process.start()
            for process in processes:
                process.join()

            manager_dict = dict(sorted(manager_dict.items()))
            interm_res = []
            for key in manager_dict:
                interm_res.append(manager_dict[key])
            results.append(interm_res)
            interm_res = []
        return results


In [165]:
import time


# Это только один пример тестовой функции, ваша parallel_map должна уметь эффективно работать с ЛЮБЫМИ функциями
# Поэтому обязательно протестируйте код на чём-нибудбь ещё
def test_func(x=1, s=2, a=1, b=1, c=1):
    time.sleep(s)
    return a*x**2 + b*x + c

In [166]:
%%time

# Пример 2.1
# Отдельные значения в args_container передаются в качестве позиционных аргументов
run = ParallelRunner(test_func, args_container=[1, 2.0, 3j-1, 4])  # Здесь происходят параллельные вызовы: test_func(1) test_func(2.0) test_func(3j-1) test_func(4)
print(run.parallel_map())

[[]]
CPU times: user 19.6 ms, sys: 25.7 ms, total: 45.3 ms
Wall time: 142 ms


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'ParallelRunner' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self 

In [140]:
%%time

# Пример 2.2
# Элементы типа tuple в args_container распаковываются в качестве позиционных аргументов
parallel_map(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])    # Здесь происходят параллельные вызовы: test_func(1, 1) test_func(2.0, 2) test_func(3j-1, 3) test_func(4)

ValueError: args_container and kwargs_container must have the same length

In [101]:
%%time

# Пример 3.1
# Возможна одновременная передача args_container и kwargs_container, но количества элементов в них должны быть равны
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

# Здесь происходят параллельные вызовы: test_func(1, s=3) test_func(2, s=3) test_func(3, s=3) test_func(4, s=3)

TypeError: __main__.test_func() argument after * must be an iterable, not int

In [102]:
%%time

# Пример 3.2
# args_container может быть None, а kwargs_container задан явно
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

CPU times: user 1.86 ms, sys: 1.59 ms, total: 3.46 ms
Wall time: 3.01 s


[3, 3, 3, 3]

In [43]:
%%time

# Пример 3.3
# kwargs_container может быть None, а args_container задан явно
parallel_map(test_func,
             args_container=[1, 2, 3, 4])

CPU times: user 4.11 ms, sys: 9.2 ms, total: 13.3 ms
Wall time: 2.01 s


[3, 7, 13, 21]

In [44]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 500 µs, sys: 43.3 ms, total: 43.8 ms
Wall time: 2.04 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [44]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 500 µs, sys: 43.3 ms, total: 43.8 ms
Wall time: 2.04 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [32]:
%%time

# Пример 3.5
# При несовпадении количеств позиционных и именованных аргументов кидается ошибка
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])

ValueError: Numbers of positional arguments and keyword arguments do not match: 4 and 3

In [45]:
%%time

# Пример 4.1
# Если функция не имеет обязательных аргументов и аргумент n_jobs не был передан, то она выполняется параллельно столько раз, сколько ваш CPU имеет логических ядер
# В моём случае это 24, у вас может быть больше или меньше
parallel_map(test_func)

CPU times: user 9.3 ms, sys: 51.2 ms, total: 60.5 ms
Wall time: 2.06 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [47]:
%%time

# Пример 4.2
# Если функция не имеет обязательных аргументов и передан только аргумент n_jobs, то она выполняется параллельно n_jobs раз
parallel_map(test_func, n_jobs=2)

CPU times: user 2.06 ms, sys: 5.92 ms, total: 7.99 ms
Wall time: 2.01 s


[3, 3]

In [48]:
%%time

# Пример 4.3
# Если аргументов для target_func указано МЕНЬШЕ, чем n_jobs, то используется такое же количество worker'ов, сколько было передано аргументов
parallel_map(test_func,
             args_container=[1, 2, 3],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 314 µs, sys: 8.69 ms, total: 9 ms
Wall time: 2.01 s


[3, 7, 13]

In [49]:
%%time

# Пример 4.4
# Аналогичный предыдущему случай, но с именованными аргументами
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 1.26 ms, sys: 9.47 ms, total: 10.7 ms
Wall time: 3.01 s


[3, 3, 3]

In [50]:
%%time

# Пример 4.5
# Комбинация примеров 4.3 и 4.4 (переданы и позиционные и именованные аргументы)
parallel_map(test_func,
             args_container=[1, 2, 3],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 7.88 ms, sys: 0 ns, total: 7.88 ms
Wall time: 3.01 s


[3, 7, 13]

In [50]:
%%time

# Пример 4.6
# Если аргументов для target_func указано БОЛЬШЕ, чем n_jobs, то используется n_jobs worker'ов
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=None,
             n_jobs=2)   # Здесь используется 2 worker'a

CPU times: user 7.88 ms, sys: 0 ns, total: 7.88 ms
Wall time: 3.01 s


[3, 7, 13]

In [51]:
%%time

# Пример 4.7
# Время выполнения оптимизируется, данный код должен отрабатывать за 5 секунд
parallel_map(test_func,
             kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
             n_jobs=2)

CPU times: user 3.03 ms, sys: 11 ms, total: 14 ms
Wall time: 5.01 s


[3, 3, 3, 3]

In [57]:
def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string

# Пример 5
# Результаты возвращаются в том же порядке, в котором были переданы соответствующие аргументы вне зависимости от того, когда завершился worker
arguments = ["first", "second", "third", "fourth", "fifth"]
parallel_map(test_func2,
             args_container=arguments,
             kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])

['first', 'second', 'third', 'fourth', 'fifth']

In [58]:
%%time


def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    return parallel_map(inner_test_func, args_container=[1, 2, 3])

# Пример 6
# Работает с функциями, созданными внутри других функций
test_func3()

[None, None, None]