В формулировке заданий будет использоваться понятие **worker**. Это слово обозначает какую-то единицу параллельного выполнения, в случае питона это может быть **поток** или **процесс**, выбирайте то, что лучше будет подходить к конкретной задаче

# Задание 1 (7 баллов)

В одном из заданий по ML от вас требовалось написать кастомную реализацию Random Forest. Её проблема состоит в том, что она работает медленно, так как использует всего один поток для работы. Добавление параллельного программирования в код позволит получить существенный прирост в скорости обучения и предсказаний.

В данном задании от вас требуется добавить возможность обучать случайный лес параллельно и использовать параллелизм для предсказаний. Для этого вам понадобится:
1. Добавить аргумент `n_jobs` в метод `fit`. `n_jobs` показывает количество worker'ов, используемых для распараллеливания
2. Добавить аргумент `n_jobs` в методы `predict` и `predict_proba`
3. Реализовать функционал по распараллеливанию в данных методах

В результате код `random_forest.fit(X, y, n_jobs=2)` и `random_forest.predict(X, y, n_jobs=2)` должен работать в ~1.5-2 раза быстрее, чем `random_forest.fit(X, y, n_jobs=1)` и `random_forest.predict(X, y, n_jobs=1)` соответственно

Если у вас по каким-то причинам нет кода случайного леса из ДЗ по ML, то вы можете написать его заново или попросить у однокурсника. *Детали* реализации ML части оцениваться не будут, НО, если вы поломаете логику работы алгоритма во время реализации параллелизма, то за это будут сниматься баллы

В задании можно использовать только модули из **стандартной библиотеки** питона, а также функции и классы из **sklearn** при помощи которых вы изначально писали лес

In [None]:
from sklearn.base import BaseEstimator
from sklearn.datasets import make_classification
from sklearn.tree import DecisionTreeClassifier
import numpy as np

from concurrent.futures import ProcessPoolExecutor
from typing import Tuple, Any

In [None]:
class RandomForestClassifierCustom(BaseEstimator):
    '''
    Random forest classifier based on DecisionTreeClassifier (scikit-learn).
    '''

    def __init__(
            self, n_estimators: int = 10, max_depth: int = None, max_features: int = None, random_state: int = None
    ) -> None:
        '''
        :param int n_estimators: number of trees in random forest (by default 10)
        :param int or None max_depth: max depht for every tree (by default None)
        :param int or None max_features: max features (by default None)
        :param int or None random_state: random state (by default None)
        '''
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.random_state = random_state
        self.trees = []
        self.feat_ids_by_tree = []

    def _fitting_model(self, idx: int) -> Tuple[DecisionTreeClassifier, Any]:
        '''
        Fitting tree. Helper method for fit().

        :param int idx: index of tree
        :return: fitted tree and featured indices
        :rtype: np.ndarray, np.ndarray
        '''
        np.random.seed(self.random_state + idx)
        features_ids = np.random.choice(self.X.shape[1], size=self.max_features, replace=False)
        pseudosampling_ids = np.random.choice(self.X.shape[0], size=self.X.shape[0], replace=True)
        pseudo_X = self.X[pseudosampling_ids][:, features_ids]
        pseudo_y = self.y[pseudosampling_ids]

        tree_class = DecisionTreeClassifier(max_depth=self.max_depth,
                                            max_features=self.max_features,
                                            random_state=self.random_state,
                                            )
        fitted_tree = tree_class.fit(pseudo_X, pseudo_y)
        return fitted_tree, features_ids

    def fit(self, X: np.ndarray, y: np.ndarray, n_jobs: int = 1) -> object:
        '''
        Basic method for fitting trees. Can work in multiprocessor mode.

        :param np.ndarray X: array of features
        :param np.ndarray y: array of targets
        :param int n_jobs: number of processes (by default 1)
        :return: self object
        :rtype: object of RandomForestClassifierCustom
        '''
        self.X = X
        self.y = y
        self.classes_ = sorted(np.unique(self.y))
        with ProcessPoolExecutor(n_jobs) as pool:
            processes = list(pool.map(self._fitting_model, list(range(self.n_estimators))))
            for proc in processes:
                tree, feat_ids = proc
                self.trees.append(tree)
                self.feat_ids_by_tree.append(feat_ids)
            return self

    def _prediction_calc(self, tree_and_feat_ids: int) -> np.ndarray:
        '''
        Calculate y_pred. Helper method for predict_proba().

        :param int tree_and_feat_ids: fitted tree and featured indices
        :return: prediction by tree
        :rtype: np.ndarray
        '''
        tree, feat_ids = tree_and_feat_ids
        pred = tree.predict_proba(self.X[:, feat_ids])
        return pred

    def predict_proba(self, X: np.ndarray, n_jobs: int = 1) -> np.ndarray:
        '''
        Predicting classes by X.

        :param np.ndarray X: array of features
        :param int n_jobs: number of processes (by default 1)
        :return: predicted y (target)
        :rtype: np.ndarray
        '''
        self.X = X
        y_pred = np.zeros((self.X.shape[0], len(self.classes_)))
        with ProcessPoolExecutor(n_jobs) as pool:
            processes = list(pool.map(self._prediction_calc, zip(self.trees, self.feat_ids_by_tree)))
            for proc in processes:
                y_pred += proc
        return y_pred

    def predict(self, X: np.ndarray, n_jobs: int = 1) -> np.ndarray:
        '''
        Maximizes received predictions

        :param np.ndarray X: array of features
        :param int n_jobs: number of processes (by default 1)
        :return: predicted labels
        :rtype: np.ndarray
        '''
        probas = self.predict_proba(X, n_jobs)
        predictions = np.argmax(probas, axis=1)
        return predictions


X, y = make_classification(n_samples=100000)

In [None]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [None]:
%%time
if __name__ == '__main__':
    _ = random_forest.fit(X, y, n_jobs=1)

In [None]:
%%time

preds_1 = random_forest.predict(X, n_jobs=1)

In [None]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [None]:
%%time

_ = random_forest.fit(X, y, n_jobs=2)

In [None]:
%%time

preds_2 = random_forest.predict(X, n_jobs=2)

In [None]:
(preds_1 == preds_2).all()   # Количество worker'ов не должно влиять на предсказания

#### Какие есть недостатки у вашей реализации параллельного Random Forest (если они есть)? Как это можно исправить? Опишите словами, можно без кода (+1 дополнительный балл)

Ответ пишите тут

Смотреть больно, но работает. Как исправить - не знаю (решила не трогать).

# Задание 2 (9 баллов)

Напишите декоратор `memory_limit`, который позволит ограничивать использование памяти декорируемой функцией.

Декоратор должен принимать следующие аргументы:
1. `soft_limit` - "мягкий" лимит использования памяти. При превышении функцией этого лимита должен будет отображён **warning**
2. `hard_limit` - "жёсткий" лимит использования памяти. При превышении функцией этого лимита должно будет брошено исключение, а функция должна немедленно завершить свою работу
3. `poll_interval` - интервал времени (в секундах) между проверками использования памяти

Требования:
1. Потребление функцией памяти должно отслеживаться **во время выполнения функции**, а не после её завершения
2. **warning** при превышении `soft_limit` должен отображаться один раз, даже если функция переходила через этот лимит несколько раз
3. Если задать `soft_limit` или `hard_limit` как `None`, то соответствующий лимит должен быть отключён
4. Лимиты должны передаваться и отображаться в формате `<number>X`, где `X` - символ, обозначающий порядок единицы измерения памяти ("B", "K", "M", "G", "T", ...)
5. В тексте warning'ов и исключений должен быть указан текщий объём используемой памяти и величина превышенного лимита

В задании можно использовать только модули из **стандартной библиотеки** питона, можно писать вспомогательные функции и/или классы

В коде ниже для вас предопределены некоторые полезные функции, вы можете ими пользоваться, а можете не пользоваться

In [None]:
import os
import psutil
import time
import warnings
import threading
import sys
from typing import Union, Optional, Callable

In [None]:
class ControlsMemoryUsage(threading.Thread):
    '''
    Takes into account the consumption of RAM when the program is running.
    '''
    def __init__(self, soft_limit: Optional[Union[str, int]] = None, hard_limit: Optional[Union[str, int]] = None, poll_interval: Union[int, float] = 1) -> None:
        '''
        :param str soft_limit: soft limit of memory usage. If the function exceeds this limit, warning is displayed (by default None)
        :param str hard_limit: hard memory usage limit. If the function exceeds this limit, return exception and the function ends (by default None)
        :param int or float poll_interval: time interval (in seconds) between memory usage checks (by default 1 second)
        '''
        super().__init__()
        self.soft_limit = self.human_readable_to_bytes(soft_limit)
        self.hard_limit = self.human_readable_to_bytes(hard_limit)
        self.poll_interval = poll_interval

    @staticmethod
    def get_memory_usage() -> int:
        '''
        Shows the current memory consumption of the process.

        :return: usage memory in Bytes
        :rtype: int
        '''
        process = psutil.Process(os.getpid())
        mem_info = process.memory_info()
        return mem_info.rss

    @staticmethod
    def bytes_to_human_readable(n_bytes: int) -> str:
        '''
        Converts bytes to a human-readable record.

        :param int n_bytes: number of bytes
        :return: format human-readable record
        :rtype: str
        '''
        symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
        prefix = {}
        for idx, s in enumerate(symbols):
            prefix[s] = 1 << (idx + 1) * 10
        for s in reversed(symbols):
            if n_bytes >= prefix[s]:
                value = float(n_bytes) / prefix[s]
                return f"{value:.2f}{s}"
        return f"{n_bytes}B"

    @staticmethod
    def human_readable_to_bytes(memory_as_str: str) -> Optional[int]:
        '''
        Converts human-readable used memory to bytes.

        :param str memory_as_str: human-readable record of memory
        :return: format bytes
        :rtype: int or None
        '''
        if isinstance(memory_as_str, type(None)):
            return None
        elif isinstance(memory_as_str, str):
            symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
            prefix = {}
            for idx, s in enumerate(symbols):
                prefix[s] = 1 << (idx + 1) * 10
            symb = memory_as_str[-1]
            n_bytes = prefix[symb] * float(memory_as_str[:-1])
            return int(n_bytes)

    def run(self) -> None:
        '''
        Run
        Starts the job of tracking RAM consumption.

        :return: None
        :rtype: NoneType
        '''
        has_over_soft_limit = False
        while True:
            cur_mem_info = self.get_memory_usage()
            if self.soft_limit != None and cur_mem_info > self.soft_limit and has_over_soft_limit is False:
                readable_soft_limit = self.bytes_to_human_readable(self.soft_limit)
                readable_used_mem = self.bytes_to_human_readable(cur_mem_info)
                warnings.warn(
                        f'Uses lots of memory. Soft limit is {readable_soft_limit}; used - {readable_used_mem}.')
                has_over_soft_limit = True
            elif self.hard_limit != None and cur_mem_info > self.hard_limit:
                print('MemoryError: RAM limit exceeded', file=sys.stderr)
                os._exit(1)
            time.sleep(self.poll_interval)

def memory_limit(soft_limit: str, hard_limit: str, poll_interval: Union[int, float]) -> Callable:
    '''
    Decorator for tracking RAM consumption of function.

    :param str soft_limit: soft limit of memory usage. If the function exceeds this limit, warning is displayed (by default None)
    :param str hard_limit: hard memory usage limit. If the function exceeds this limit, return exception and the function ends (by default None)
    :param int or float poll_interval: time interval (in seconds) between memory usage checks
    :return: wrapped function with RAM usage
    :rtype: Callable
    '''
    def decorator(func: Callable) -> Callable:
        def inner_func():
            control_memory = ControlsMemoryUsage(soft_limit, hard_limit, poll_interval)
            control_memory.start()
            while True:
                func()
        return inner_func
    return decorator

In [None]:
@memory_limit(soft_limit='512M', hard_limit='1.5G', poll_interval=0.1)
def memory_increment() -> list:
    """
    Function for testing.

    :return: list of numbers
    :rtype: list
    """
    lst = []
    for i in range(50000000):
        if i % 500000 == 0:
            time.sleep(0.1)
        lst.append(i)
    return lst

# Задание 3 (11 баллов)

Напишите функцию `parallel_map`. Это должна быть **универсальная** функция для распараллеливания, которая эффективно работает в любых условиях.

Функция должна принимать следующие аргументы:
1. `target_func` - целевая функция (обязательный аргумент)
2. `args_container` - контейнер с позиционными аргументами для `target_func` (по-умолчанию `None` - позиционные аргументы не передаются)
3. `kwargs_container` - контейнер с именованными аргументами для `target_func` (по-умолчанию `None` - именованные аргументы не передаются)
4. `n_jobs` - количество workers, которые будут использованы для выполнения (по-умолчанию `None` - количество логических ядер CPU в системе)

Функция должна работать аналогично `***PoolExecutor.map`, применяя функцию к переданному набору аргументов, но с некоторыми дополнениями и улучшениями
    
Поскольку мы пишем **универсальную** функцию, то нам нужно будет выполнить ряд требований, чтобы она могла логично и эффективно работать в большинстве ситуаций

1. `target_func` может принимать аргументы любого вида в любом количестве
2. Любые типы данных в `args_container`, кроме `tuple`, передаются в `target_func` как единственный позиционный аргумент. `tuple` распаковываются в несколько аргументов
3. Количество элементов в `args_container` должно совпадать с количеством элементов в `kwargs_container` и наоборот, также значение одного из них или обоих может быть равно `None`, в иных случаях должна кидаться ошибка (оба аргумента переданы, но размеры не совпадают)

4. Функция должна выполнять определённое количество параллельных вызовов `target_func`, это количество зависит от числа переданных аргументов и значения `n_jobs`. Сценарии могут быть следующие
    + `args_container=None`, `kwargs_container=None`, `n_jobs=None`. В таком случае функция `target_func` выполнится параллельно столько раз, сколько на вашем устройстве логических ядер CPU
    + `args_container=None`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **5** раз
    + `args_container=[1, 2, 3]`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **3** раза, несмотря на то, что `n_jobs=5` (так как есть всего 3 набора аргументов для которых нам нужно получить результат, а лишние worker'ы создавать не имеет смысла)
    + `args_container=None`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем именованные аргументы
    + `args_container=[1, 2, 3]`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем и позиционные, и именованные аргументы
    + `args_container=[1, 2, 3, 4]`, `kwargs_container=None`, `n_jobs=2`. В таком случае в каждый момент времени параллельно будет выполняться **не более 2** функций `target_func`, так как нам нужно выполнить её 4 раза, но у нас есть только 2 worker'а.
    + В подобных случаях (из примера выше) должно оптимизироваться время выполнения. Если эти 4 вызова выполняются за 5, 1, 2 и 1 секунды, то параллельное выполнение с `n_jobs=2` должно занять **5 секунд** (не 7 и тем более не 10)

5. `parallel_map` возвращает результаты выполнения `target_func` **в том же порядке**, в котором были переданы соответствующие аргументы
6. Работает с функциями, созданными внутри других функций

Для базового решения от вас не ожидается **сверххорошая** оптимизация по времени и памяти для всех возможных случаев. Однако за хорошо оптимизированную логику работы можно получить до **+3 дополнительных баллов**

Вы можете сделать класс вместо функции, если вам удобнее

В задании можно использовать только модули из **стандартной библиотеки** питона

Ниже приведены тестовые примеры по каждому из требований

In [None]:
import multiprocessing
from typing import Union, Optional, Callable, List, Tuple, Dict, Any
import time

In [None]:
class ParallelRunning:
    '''
    Universal class for parallelization across processes.
    '''
    def __init__(self,
                 target_func: Union[Callable, List[Callable]],
                 args_container: Optional[Union[List[Tuple], Tuple]] = None,
                 kwargs_container: Optional[List[Dict[str, Any]]] = None,
                 n_jobs: int = None) -> None:
        '''
        :param function or list target_func: objective function or functions
        :param list or None args_container: container with positional arguments numbers or tuples (by default None)
        :param list kwargs_container: container with named arguments
        :param int n_jobs: number of processes
        '''
        self._target_funcs = target_func
        self._args_container = args_container
        self._kwargs_container = kwargs_container
        self._n_jobs = n_jobs
        self._counter = 0

    def _compare_containers_len(self):
        '''
        Compares the number of named and positional arguments.
        '''
        if self._args_container is not None and self._kwargs_container is not None and len(
                self._args_container) != len(self._kwargs_container):
            raise ValueError(
                'the number of positional and named arguments differs')

    def _check_n_funcs(self):
        '''
        Checks the number of target functions. If there is only one function, then convert it to a list of length 1.
        '''
        if isinstance(self._target_funcs, list):
            self._target_funcs = self._target_funcs
        else:
            self._target_funcs = [self._target_funcs]

    def _fill_empty_args_kwargs(self):
        '''
        Containers are filled with empty values, if positional and/or named containers is None.
        '''
        if self._args_container is None and self._kwargs_container is not None:
            self._args_container = [tuple() for _ in range(len(self._kwargs_container))]
        elif self._kwargs_container is None and self._args_container is not None:
            self._kwargs_container = [dict() for _ in range(len(self._args_container))]
        elif self._args_container is None and self._kwargs_container is None:
            self._args_container = [tuple() for _ in range(self._n_jobs)]
            self._kwargs_container = [dict() for _ in range(self._n_jobs)]

    def _check_n_jobs(self):
        '''
        Adapts the number of processes to use according to the number of input arguments.
        '''
        if self._n_jobs is None:
            self._n_jobs = multiprocessing.cpu_count()
        self._fill_empty_args_kwargs()
        self._n_jobs = min(self._n_jobs, len(self._args_container))

    def _add_tasks_in_queue(self, queue: multiprocessing.Queue) -> multiprocessing.Queue:
        '''
        Adds tasks to the queue.

        :param multiprocessing.Queue queue: object multiprocessing.Queue
        :return: filled queue
        :rtype: multiprocessing.Queue
        '''
        counter = 0
        for arg, kwarg in zip(self._args_container, self._kwargs_container):
            queue.put((counter, arg, kwarg))
            counter += 1
        return queue

    def _process_tasks(self, func: Callable, queue: multiprocessing.Queue, manager_dict: multiprocessing.Manager) -> None:
        '''
        Handles tasks from the queue.

        :param function func: input target function
        :param multiprocessing.Queue queue: filled queue
        :param dict manager_dict: multiprocessing.Manager object
        :return: None
        :rtype: NoneType
        '''
        while not queue.empty():
            counter, args, kwargs = queue.get()
            if not isinstance(args, tuple):
                args = (args,)
            call_func = func(*args, **kwargs)
            manager_dict[counter] = call_func
        return None

    def parallel_map(self) -> List[list]:
        '''
        Multiprocessing run of target function/functions.

        :return: list of strings results
        :rtype: List[list]
        '''
        self._compare_containers_len()
        self._check_n_funcs()
        self._check_n_jobs()
        processes = []
        results = []
        for func in self._target_funcs:
            queue = self._add_tasks_in_queue(multiprocessing.Queue())
            manager = multiprocessing.Manager()
            manager_dict = manager.dict()  # need to remember what the process is first, second, etc
            for job in range(self._n_jobs):
                process = multiprocessing.Process(target=self._process_tasks, args=(func, queue, manager_dict))
                processes.append(process)
                process.start()
            for process in processes:
                process.join()

            manager_dict = dict(sorted(manager_dict.items()))
            interm_res = []
            for key in manager_dict:
                interm_res.append(manager_dict[key])
            results.append(interm_res)
            interm_res = []
        return results

In [None]:

# Это только один пример тестовой функции, ваша parallel_map должна уметь эффективно работать с ЛЮБЫМИ функциями
# Поэтому обязательно протестируйте код на чём-нибудбь ещё
def test_func(x=1, s=2, a=1, b=1, c=1):
    time.sleep(s)
    return a*x**2 + b*x + c

In [None]:
run_parallel = ParallelRunning(test_func, args_container=[1, 2.0, 3j-1, 4])
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 2.1
# Отдельные значения в args_container передаются в качестве позиционных аргументов
run_parallel = ParallelRunning(test_func, args_container=[1, 2.0, 3j-1, 4])
result = run_parallel.parallel_map() # Здесь происходят параллельные вызовы: test_func(1) test_func(2.0) test_func(3j-1) test_func(4)
print(result)

In [None]:
%%time

# Пример 2.2
# Элементы типа tuple в args_container распаковываются в качестве позиционных аргументов
run_parallel = ParallelRunning(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])
result = run_parallel.parallel_map()  # Здесь происходят параллельные вызовы: test_func(1, 1) test_func(2.0, 2) test_func(3j-1, 3) test_func(4)
print(result)

In [None]:
%%time

# Пример 3.1
# Возможна одновременная передача args_container и kwargs_container, но количества элементов в них должны быть равны
run_parallel = ParallelRunning(test_func, 
                               args_container=[1, 2, 3, 4],
                               kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])
result = run_parallel.parallel_map()
print(result)
# Здесь происходят параллельные вызовы: test_func(1, s=3) test_func(2, s=3) test_func(3, s=3) test_func(4, s=3)

In [None]:
%%time

# Пример 3.2
# args_container может быть None, а kwargs_container задан явно
run_parallel = ParallelRunning(test_func, kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 3.3
# kwargs_container может быть None, а args_container задан явно
run_parallel = ParallelRunning(test_func, args_container=[1, 2, 3, 4])
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
run_parallel = ParallelRunning(test_func)
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)
run_parallel = ParallelRunning(test_func)
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 3.5
# При несовпадении количеств позиционных и именованных аргументов кидается ошибка
run_parallel = ParallelRunning(test_func,
                               args_container=[1, 2, 3, 4],
                               kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 4.1
# Если функция не имеет обязательных аргументов и аргумент n_jobs не был передан, то она выполняется параллельно столько раз, сколько ваш CPU имеет логических ядер
# В моём случае это 24, у вас может быть больше или меньше
run_parallel = ParallelRunning(test_func)
result = run_parallel.parallel_map()
print(result)


In [None]:
%%time

# Пример 4.2
# Если функция не имеет обязательных аргументов и передан только аргумент n_jobs, то она выполняется параллельно n_jobs раз
run_parallel = ParallelRunning(test_func, n_jobs=2)
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 4.3
# Если аргументов для target_func указано МЕНЬШЕ, чем n_jobs, то используется такое же количество worker'ов, сколько было передано аргументов
parallel_map(test_func,
             args_container=[1, 2, 3],
             n_jobs=5)   # Здесь используется 3 worker'a
run_parallel = ParallelRunning(test_func,
                               args_container=[1, 2, 3],
                               n_jobs=5)
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 4.4
# Аналогичный предыдущему случай, но с именованными аргументами
run_parallel = ParallelRunning(test_func,
                               kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
                               n_jobs=5)  # Здесь используется 3 worker'a
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 4.5
# Комбинация примеров 4.3 и 4.4 (переданы и позиционные и именованные аргументы)
run_parallel = ParallelRunning(test_func,
                               args_container=[1, 2, 3],
                               kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
                               n_jobs=5)  # Здесь используется 3 worker'a
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 4.6
# Если аргументов для target_func указано БОЛЬШЕ, чем n_jobs, то используется n_jobs worker'ов
run_parallel = ParallelRunning(test_func,
                               args_container=[1, 2, 3, 4],
                               kwargs_container=None,
                               n_jobs=2)  # Здесь используется 2 worker'a
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time

# Пример 4.7
# Время выполнения оптимизируется, данный код должен отрабатывать за 5 секунд
run_parallel = ParallelRunning(test_func,
                               kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
                               n_jobs=2)
result = run_parallel.parallel_map()
print(result)

In [None]:
def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string

# Пример 5
# Результаты возвращаются в том же порядке, в котором были переданы соответствующие аргументы вне зависимости от того, когда завершился worker
arguments = ["first", "second", "third", "fourth", "fifth"]
run_parallel = ParallelRunning(test_func2,
             args_container=arguments,
             kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])
result = run_parallel.parallel_map()
print(result)

In [None]:
%%time


def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    return ParallelRunning(inner_test_func, args_container=[1, 2, 3]).parallel_map()

# Пример 6
# Работает с функциями, созданными внутри других функций
test_func3()