В формулировке заданий будет использоваться понятие **worker**. Это слово обозначает какую-то единицу параллельного выполнения, в случае питона это может быть **поток** или **процесс**, выбирайте то, что лучше будет подходить к конкретной задаче

В каждом задании нужно писать подробные аннотиции типов для:
1. Аргументов функций и классов
2. Возвращаемых значений
3. Классовых атрибутов (если такие есть)

В каждом задании нужно писать докстроки в определённом стиле (какой вам больше нравится) для всех функций, классов и методов

# Задание 1 (7 баллов)

В одном из заданий по ML от вас требовалось написать кастомную реализацию Random Forest. Её проблема состоит в том, что она работает медленно, так как использует всего один поток для работы. Добавление параллельного программирования в код позволит получить существенный прирост в скорости обучения и предсказаний.

В данном задании от вас требуется добавить возможность обучать случайный лес параллельно и использовать параллелизм для предсказаний. Для этого вам понадобится:
1. Добавить аргумент `n_jobs` в метод `fit`. `n_jobs` показывает количество worker'ов, используемых для распараллеливания
2. Добавить аргумент `n_jobs` в методы `predict` и `predict_proba`
3. Реализовать функционал по распараллеливанию в данных методах

В результате код `random_forest.fit(X, y, n_jobs=2)` и `random_forest.predict(X, y, n_jobs=2)` должен работать в ~1.5-2 раза быстрее, чем `random_forest.fit(X, y, n_jobs=1)` и `random_forest.predict(X, y, n_jobs=1)` соответственно

Если у вас по каким-то причинам нет кода случайного леса из ДЗ по ML, то вы можете написать его заново или попросить у однокурсника. *Детали* реализации ML части оцениваться не будут, НО, если вы поломаете логику работы алгоритма во время реализации параллелизма, то за это будут сниматься баллы

В задании можно использовать только модули из **стандартной библиотеки** питона, а также функции и классы из **sklearn** при помощи которых вы изначально писали лес

In [1]:
from typing import Optional, List, Tuple

In [2]:
import numpy as np
import warnings
import random

from sklearn.tree import (DecisionTreeRegressor,
                          DecisionTreeClassifier)

# establish seed and silence warnings
warnings.filterwarnings("ignore")

SEED: int = 111
random.seed(SEED)
np.random.seed(SEED)

In [3]:
from sklearn.base import BaseEstimator
from sklearn.datasets import make_classification

from joblib import Parallel, delayed
from multiprocessing import Pool

class RandomForestClassifierCustom(BaseEstimator):
  """
  A custom implementation of RandomForestClassifier using scikit-learn's BaseEstimator
  """
  def __init__(self, 
               n_estimators: int = 10, 
               max_depth: Optional[int] = None,
               max_features: Optional[int] = None, 
               random_state: int = SEED) -> None:
    """
    Initialization method for RandomForestClassifierCustom

    :param n_estimators: int, optional (default=10)
        The number of decision trees in the forest.
    :param max_depth: int, optional (default=None)
        The maximum depth of each decision tree. If None, then nodes are expanded until all the leaves are pure.
    :param max_features: int, optional (default=None)
        The number of features to consider when looking for the best split. If None, then all features will be used.
    :param random_state: int, optional (default=SEED)
        Seed used by the random number generator.

    :return: None
    """
    self.classes_ = None
    self.n_estimators = n_estimators
    self.max_depth = max_depth
    self.max_features = max_features
    self.random_state = random_state

    self.trees = []
    self.feat_ids_by_tree = []

  def fit(self, X: np.ndarray, y: np.ndarray, 
          n_jobs: int = 1) -> "RandomForestClassifierCustom":

    """
    Build a forest of decision trees from the training set (X, y).

    :param X: array-like of shape (n_samples, n_features)
        The training input samples.
    :param y: array-like of shape (n_samples,)
        The target values (class labels) as integers or strings.
    :param n_jobs: int, optional (default=1)
        The number of jobs to run in parallel for both `feat_ids_by_tree` and `trees` formation.

    :return: RandomForestClassifierCustom
        Fitted estimator instance
    """
    self.classes_ = sorted(np.unique(y))

    def tree_step_maker(i: int) -> np.ndarray:
      np.random.seed(self.random_state + i)
      feat_ids_by_tree_step = np.random.choice(np.arange(X.shape[1]), 
                                                size=self.max_features, 
                                                replace=False)
      return feat_ids_by_tree_step

    self.feat_ids_by_tree = Parallel(n_jobs=n_jobs)(delayed(tree_step_maker)(i) for i in range(self.n_estimators))

    def trees_maker(i: int) -> DecisionTreeClassifier:
      np.random.seed(self.random_state + i)
      pseudo_slice = np.random.choice(np.arange(len(X)), 
                                      size=len(X), replace=True)
      
      clf = DecisionTreeClassifier(max_depth=self.max_depth,
                                  max_features=self.max_features,
                                  random_state=self.random_state+i)
      
      return clf.fit(X[pseudo_slice][:, self.feat_ids_by_tree[i]], y[pseudo_slice])

    self.trees = Parallel(n_jobs=n_jobs)(delayed(trees_maker)(i) for i in range(self.n_estimators))

    return self

  def _pred_maker(self, feats: np.ndarray, 
                  tree: DecisionTreeClassifier, 
                  X: np.ndarray) -> np.ndarray:
    return tree.predict_proba(X[:, feats])

  def predict_proba(self, X: np.ndarray, n_jobs: int = 2) -> np.ndarray:
    """
    Predict class probabilities of input data using a single decision tree.

    :param n_jobs: the number of parallel jobs to run
    :param X: array-like of shape (n_samples, n_features)
        The input data.

    :return: np.ndarray
        Array of predicted probabilities.
    """
    # y_pred_storage: List[np.ndarray] = []

    with Pool(n_jobs) as pool:
      y_pred_storage = pool.starmap(self._pred_maker, [(feats, tree, X) for feats, tree in zip(self.feat_ids_by_tree, self.trees)])

    return np.average(y_pred_storage, axis=0)
    
  def predict(self, X: np.ndarray, n_jobs: int = 1) -> np.ndarray:
    """
    Predict class probabilities of input data using a single decision tree.

    :param n_jobs: the number of parallel jobs to run
    :param X: array-like of shape (n_samples, n_features)
        The input data.

    :return: np.ndarray
        Array of the most likely outcome.
    """
    probas = self.predict_proba(X, n_jobs=n_jobs)
    predictions = np.argmax(probas, axis=1)
  
    return predictions


X, y = make_classification(n_samples=100000)

In [4]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [5]:
%%time

_ = random_forest.fit(X, y, n_jobs=1)

CPU times: user 11.3 s, sys: 118 ms, total: 11.4 s
Wall time: 21.5 s


In [6]:
%%time

preds_1 = random_forest.predict(X, n_jobs=1)

CPU times: user 524 ms, sys: 372 ms, total: 896 ms
Wall time: 1.22 s


In [7]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [8]:
%%time

_ = random_forest.fit(X, y, n_jobs=2)

CPU times: user 210 ms, sys: 99.2 ms, total: 309 ms
Wall time: 9.75 s


In [9]:
%%time

preds_2 = random_forest.predict(X, n_jobs=2)

CPU times: user 206 ms, sys: 235 ms, total: 442 ms
Wall time: 848 ms


In [10]:
(preds_1 == preds_2).all()   # Количество worker'ов не должно влиять на предсказания

True

#### Какие есть недостатки у вашей реализации параллельного Random Forest (если они есть)? Как это можно исправить? Опишите словами, можно без кода (+1 дополнительный балл)

**Ответ пишите тут**: 

*   для предикта нет заметного убыстрения
*   ещё любопытное замечание: то как предлагает чат гпт (через `from joblib import Parallel, delayed`) в 3 раза быстрее того, что Рома предлагал (`from multiprocessing import Pool`)
*   НО оно всё не сравниться с "диким" воплощением, которое на ПОРЯДОК БЫСТРЕЕ



**Why could implementation without parallel programming be way faster?**

Implementation without parallel programming could be faster in some cases because parallel programming adds overhead due to the need for communication and coordination between parallel processes. Additionally, some algorithms may not be well-suited for parallelization, either because they involve a lot of synchronization between parallel processes or because they have a lot of dependencies between individual computations, which can lead to a lot of waiting and idle time for some processes. Therefore, for small or simple tasks, the overhead of parallel programming may outweigh any potential speedup, resulting in slower overall performance. It's important to carefully consider the task at hand and the characteristics of the hardware before deciding whether to use parallel programming or not.

# Задание 2 (9 баллов)

Напишите декоратор `memory_limit`, который позволит ограничивать использование памяти декорируемой функцией.

Декоратор должен принимать следующие аргументы:
1. `soft_limit` - "мягкий" лимит использования памяти. При превышении функцией этого лимита должен будет отображён **warning**
2. `hard_limit` - "жёсткий" лимит использования памяти. При превышении функцией этого лимита должно будет брошено исключение, а функция должна немедленно завершить свою работу
3. `poll_interval` - интервал времени (в секундах) между проверками использования памяти

Требования:
1. Потребление функцией памяти должно отслеживаться **во время выполнения функции**, а не после её завершения
2. **warning** при превышении `soft_limit` должен отображаться один раз, даже если функция переходила через этот лимит несколько раз
3. Если задать `soft_limit` или `hard_limit` как `None`, то соответствующий лимит должен быть отключён
4. Лимиты должны передаваться и отображаться в формате `<number>X`, где `X` - символ, обозначающий порядок единицы измерения памяти ("B", "K", "M", "G", "T", ...)
5. В тексте warning'ов и исключений должен быть указан текщий объём используемой памяти и величина превышенного лимита

В задании можно использовать только модули из **стандартной библиотеки** питона, можно писать вспомогательные функции и/или классы

В коде ниже для вас предопределены некоторые полезные функции, вы можете ими пользоваться, а можете не пользоваться

In [11]:
import os
import psutil
import time
import warnings
import threading

from typing import Optional, Callable

def get_memory_usage() -> int:
    """
    Returns the current memory usage of the process in bytes.

    Returns:
        int: Current memory usage of the process in bytes.
    """
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return mem_info.rss

def bytes_to_human_readable(n_bytes: int) -> str:
    """
    Converts a byte count to a human-readable string.

    Args:
        n_bytes (int): Number of bytes to convert.

    Returns:
        str: A string representing the input bytes count in human-readable format.
    """
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in reversed(symbols):
        if n_bytes >= prefix[s]:
            value = float(n_bytes) / prefix[s]
            return f"{value:.2f}{s}"
    return f"{n_bytes}B"

def human_to_bytes_readable(human_size: str) -> float:
    """
    Converts a human-readable byte count to an integer.

    Args:
        human_size (str): A string representing the byte count in human-readable format.

    Returns:
        float: The equivalent byte count as a floating point number.
    """
    symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
      prefix[s] = 1 << (idx) * 10

    return float(human_size[:-1]) * prefix[human_size[-1]]

In [12]:
def memory_limit(hard_limit: Optional[str] = None, 
                 soft_limit: Optional[str] = None,
                 poll_interval: Optional[float] = None) -> Callable:
  """
  A decorator that limits the memory usage of a function by crashing it if it exceeds the specified limits.

  Args:
      hard_limit: A string representing the maximum memory usage allowed by the function in human-readable format.
          If None, the hard limit will be twice the current memory usage.
      soft_limit: A string representing the memory usage threshold for a warning message in human-readable format.
          If None, no warning will be issued.
      poll_interval: The interval in seconds between memory usage checks.

  Returns:
      The decorated function.

  Raises:
      MemoryError: If the memory usage exceeds the hard limit.
  """ 
  def decorator(func: Callable):
    def wrapper(*args, **kwargs):
      def check_memory_usage(hard_limit: Optional[str] = hard_limit, 
                             soft_limit: Optional[str] =soft_limit, 
                             poll_interval: Optional[float] = poll_interval) -> None:
        """
        Check the memory usage of the current process periodically and raise an error when the hard memory limit is exceeded.
        
        :param hard_limit: A string representing the hard memory limit, e.g. "1G".
                          If not provided, it defaults to 2 times the current memory usage.
        :param soft_limit: A string representing the soft memory limit, e.g. "500M".
                          If provided, a warning is issued when the memory usage exceeds this limit.
        :param poll_interval: The time interval between memory usage checks, in seconds.
                              If not provided, it defaults to 1 second.
        """
        print('check starts \n')
        flag = True

        if hard_limit:
          hard_lim_set = human_to_bytes_readable(hard_limit)
        else: 
          hard_lim_set = 2 * get_memory_usage()

        while get_memory_usage() < hard_lim_set:
            time.sleep(poll_interval)

            if flag and soft_limit:
              if get_memory_usage() > human_to_bytes_readable(soft_limit):
                now = bytes_to_human_readable(get_memory_usage())
                warnings.warn(f'Warning: the soft memory limit of {soft_limit} is exceeded: the current memory usage is {now}')
                flag = False

            if not hard_limit:
              hard_lim_set = 2 * get_memory_usage()

        now = bytes_to_human_readable(get_memory_usage())
        print(f'\nThe hard memory limit of {hard_limit} is exceeded: the current memory usage is {now}')
        print('The session will be crashed in 0.3 seconds....')

        time.sleep(0.3)
        # raise MemoryError('Memory usage exceeded limit')
        os._exit(1)

      thread = threading.Thread(target=check_memory_usage)
      thread.start()

      result = func(*args, **kwargs)

      thread.join()
      print('the decorator is finished')

      return result
    return wrapper
  return decorator

In [None]:
@memory_limit(soft_limit="512M", hard_limit="1.2G", poll_interval=0.1)
def memory_increment():
    """
    Функция для тестирования
    
    В течение нескольких секунд достигает использования памяти 1.89G
    Потребление памяти и скорость накопления можно варьировать, изменяя код
    """
    print('hehe i have started\n')
    lst = []
    for i in range(40000000):
        if i % 400000 == 0:
            time.sleep(0.1)
        lst.append(i)

    print('hehe u sucks')
    return lst

a = memory_increment()
a = None

check starts 
hehe i have started



The hard memory limit of 1.2G is exceeded: the current memory usage is 1.20G
The session will be crashed in 0.3 seconds....


# Задание 3 (11 баллов)

Напишите функцию `parallel_map`. Это должна быть **универсальная** функция для распараллеливания, которая эффективно работает в любых условиях.

Функция должна принимать следующие аргументы:
1. `target_func` - целевая функция (обязательный аргумент)
2. `args_container` - контейнер с позиционными аргументами для `target_func` (по-умолчанию `None` - позиционные аргументы не передаются)
3. `kwargs_container` - контейнер с именованными аргументами для `target_func` (по-умолчанию `None` - именованные аргументы не передаются)
4. `n_jobs` - количество workers, которые будут использованы для выполнения (по-умолчанию `None` - количество логических ядер CPU в системе)

Функция должна работать аналогично `***PoolExecutor.map`, применяя функцию к переданному набору аргументов, но с некоторыми дополнениями и улучшениями
    
Поскольку мы пишем **универсальную** функцию, то нам нужно будет выполнить ряд требований, чтобы она могла логично и эффективно работать в большинстве ситуаций

1. `target_func` может принимать аргументы любого вида в любом количестве
2. Любые типы данных в `args_container`, кроме `tuple`, передаются в `target_func` как единственный позиционный аргумент. `tuple` распаковываются в несколько аргументов
3. Количество элементов в `args_container` должно совпадать с количеством элементов в `kwargs_container` и наоборот, также значение одного из них или обоих может быть равно `None`, в иных случаях должна кидаться ошибка (оба аргумента переданы, но размеры не совпадают)

4. Функция должна выполнять определённое количество параллельных вызовов `target_func`, это количество зависит от числа переданных аргументов и значения `n_jobs`. Сценарии могут быть следующие
    + `args_container=None`, `kwargs_container=None`, `n_jobs=None`. В таком случае функция `target_func` выполнится параллельно столько раз, сколько на вашем устройстве логических ядер CPU
    + `args_container=None`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **5** раз
    + `args_container=[1, 2, 3]`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **3** раза, несмотря на то, что `n_jobs=5` (так как есть всего 3 набора аргументов для которых нам нужно получить результат, а лишние worker'ы создавать не имеет смысла)
    + `args_container=None`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем именованные аргументы
    + `args_container=[1, 2, 3]`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем и позиционные, и именованные аргументы
    + `args_container=[1, 2, 3, 4]`, `kwargs_container=None`, `n_jobs=2`. В таком случае в каждый момент времени параллельно будет выполняться **не более 2** функций `target_func`, так как нам нужно выполнить её 4 раза, но у нас есть только 2 worker'а.
    + В подобных случаях (из примера выше) должно оптимизироваться время выполнения. Если эти 4 вызова выполняются за 5, 1, 2 и 1 секунды, то параллельное выполнение с `n_jobs=2` должно занять **5 секунд** (не 7 и тем более не 10)

5. `parallel_map` возвращает результаты выполнения `target_func` **в том же порядке**, в котором были переданы соответствующие аргументы
6. Работает с функциями, созданными внутри других функций

Для базового решения от вас не ожидается **сверххорошая** оптимизация по времени и памяти для всех возможных случаев. Однако за хорошо оптимизированную логику работы можно получить до **+3 дополнительных баллов**

Вы можете сделать класс вместо функции, если вам удобнее

В задании можно использовать только модули из **стандартной библиотеки** питона

Ниже приведены тестовые примеры по каждому из требований

In [1]:
import multiprocessing
from typing import Any, Callable, Dict, Iterable, List, Tuple


def parallel_map(target_func: Callable[..., Any],
                 args_container: List[Iterable] = None,
                 kwargs_container: List[Dict[str, Any]] = None,
                 n_jobs: int = None) -> Iterable[Tuple[Iterable, Dict[str, Any], Any]]:
    """
    Applies a function to a list of arguments in parallel using multiple processes.

    Args:
        target_func (Callable): The function to be applied in parallel.
        args_container (List[Iterable], optional): A list of argument iterables to be passed to the function.
            Defaults to None.
        kwargs_container (List[Dict[str, Any]], optional): A list of keyword argument dictionaries to be passed to
            the function. Defaults to None.
        n_jobs (int, optional): The number of parallel processes to use. Defaults to None, which means that
            the number of processes used will be equal to the number of available CPU cores.

    Returns:
        An iterable of tuples containing the input arguments, keyword arguments, and results of applying the function.

    Raises:
        ValueError: If the lengths of `args_container` and `kwargs_container` do not match.

    """
  
    n_jobs = (multiprocessing.cpu_count(), n_jobs)[int(bool(n_jobs))]

    if args_container or kwargs_container:
      if args_container is None:
          args_container = [()] * len(kwargs_container)
      if kwargs_container is None:
          kwargs_container = [{}] * len(args_container)

      if len(args_container) != len(kwargs_container):
          raise ValueError("Lengths of args_container and kwargs_container do not match")

    print('lengths checked')

    task_queue = multiprocessing.Queue()

    print('queue made')

    if args_container or kwargs_container:
      for args, kwargs in zip(args_container, kwargs_container):
        task_queue.put((target_func, args, kwargs))
    else:
      task_queue.put(target_func)

    print('args, kwargs sorted')

    def run_task(task: Tuple[Callable[..., Any], Iterable, 
                             Dict[str, Any]]) -> Tuple[Iterable, Dict[str, Any], Any]:
      target_func, args, kwargs = task
      return args, kwargs, target_func(*args, **kwargs)

    with multiprocessing.Pool(n_jobs) as pool:
        results = pool.imap_unordered(run_task, task_queue)

    print('multiprocessing done')

    return results

In [2]:
import time
from typing import Union

# Это только один пример тестовой функции, ваша parallel_map должна уметь эффективно работать с ЛЮБЫМИ функциями
# Поэтому обязательно протестируйте код на чём-нибудбь ещё
def test_func(x: Union[int, float, complex] = 1, 
              s: Union[int, float, complex] = 2, 
              a: Union[int, float, complex] = 1, 
              b: Union[int, float, complex] = 1, 
              c: Union[int, float, complex] = 1):
    time.sleep(s)
    return a*x**2 + b*x + c

In [3]:
%%time

test_func()

CPU times: user 11.6 ms, sys: 3.9 ms, total: 15.5 ms
Wall time: 2 s


3

In [4]:
%%time

# Пример 2.1
# Отдельные значения в args_container передаются в качестве позиционных аргументов
a = parallel_map(test_func, args_container=[1, 2.0, 3j-1, 4])   # Здесь происходят параллельные вызовы: test_func(1) test_func(2.0) test_func(3j-1) test_func(4)

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 22.3 ms, sys: 12.4 ms, total: 34.7 ms
Wall time: 91.8 ms


In [5]:
%%time

# Пример 2.2
# Элементы типа tuple в args_container распаковываются в качестве позиционных аргументов
parallel_map(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])    # Здесь происходят параллельные вызовы: test_func(1, 1) test_func(2.0, 2) test_func(3j-1, 3) test_func(4)

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 12.5 ms, sys: 16.1 ms, total: 28.6 ms
Wall time: 92.4 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec507790>

In [6]:
%%time

# Пример 3.1
# Возможна одновременная передача args_container и kwargs_container, но количества элементов в них должны быть равны
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

# Здесь происходят параллельные вызовы: test_func(1, s=3) test_func(2, s=3) test_func(3, s=3) test_func(4, s=3)

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 12.3 ms, sys: 14.1 ms, total: 26.3 ms
Wall time: 120 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec534bb0>

In [7]:
%%time

# Пример 3.2
# args_container может быть None, а kwargs_container задан явно
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 11.9 ms, sys: 14.7 ms, total: 26.6 ms
Wall time: 116 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec535c30>

In [8]:
%%time

# Пример 3.3
# kwargs_container может быть None, а args_container задан явно
parallel_map(test_func,
             args_container=[1, 2, 3, 4])

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 6.58 ms, sys: 16 ms, total: 22.6 ms
Wall time: 53.6 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec536e60>

In [9]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 13.1 ms, sys: 10.8 ms, total: 23.9 ms
Wall time: 47.2 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec537910>

In [10]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 18.9 ms, sys: 15.7 ms, total: 34.6 ms
Wall time: 111 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec544670>

In [11]:
%%time

# Пример 3.5
# При несовпадении количеств позиционных и именованных аргументов кидается ошибка
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])

ValueError: ignored

In [12]:
%%time

# Пример 4.1
# Если функция не имеет обязательных аргументов и аргумент n_jobs не был передан, то она выполняется параллельно столько раз, сколько ваш CPU имеет логических ядер
# В моём случае это 24, у вас может быть больше или меньше
parallel_map(test_func)

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 13.1 ms, sys: 13 ms, total: 26.2 ms
Wall time: 68 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec545f30>

In [13]:
%%time

# Пример 4.2
# Если функция не имеет обязательных аргументов и передан только аргумент n_jobs, то она выполняется параллельно n_jobs раз
parallel_map(test_func, n_jobs=2)

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 12.4 ms, sys: 18.8 ms, total: 31.2 ms
Wall time: 94.2 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec5474f0>

In [14]:
%%time

# Пример 4.3
# Если аргументов для target_func указано МЕНЬШЕ, чем n_jobs, то используется такое же количество worker'ов, сколько было передано аргументов
parallel_map(test_func,
             args_container=[1, 2, 3],
             n_jobs=5)   # Здесь используется 3 worker'a

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 20.2 ms, sys: 32.4 ms, total: 52.5 ms
Wall time: 164 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec5acc10>

In [15]:
%%time

# Пример 4.4
# Аналогичный предыдущему случай, но с именованными аргументами
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 15.9 ms, sys: 34.3 ms, total: 50.2 ms
Wall time: 133 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec5ae1d0>

In [16]:
%%time

# Пример 4.5
# Комбинация примеров 4.3 и 4.4 (переданы и позиционные и именованные аргументы)
parallel_map(test_func,
             args_container=[1, 2, 3],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 16.4 ms, sys: 32 ms, total: 48.5 ms
Wall time: 153 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec5af8e0>

In [17]:
%%time

# Пример 4.6
# Если аргументов для target_func указано БОЛЬШЕ, чем n_jobs, то используется n_jobs worker'ов
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=None,
             n_jobs=2)   # Здесь используется 2 worker'a

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 12.5 ms, sys: 13.3 ms, total: 25.7 ms
Wall time: 63.2 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec5bc9a0>

In [18]:
%%time

# Пример 4.7
# Время выполнения оптимизируется, данный код должен отрабатывать за 5 секунд
parallel_map(test_func,
             kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
             n_jobs=2)

lengths checked
queue made
args, kwargs sorted
multiprocessing done
CPU times: user 7.59 ms, sys: 17.6 ms, total: 25.2 ms
Wall time: 55.9 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec5bdc60>

In [19]:
def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string

# Пример 5
# Результаты возвращаются в том же порядке, в котором были переданы соответствующие аргументы вне зависимости от того, когда завершился worker
arguments = ["first", "second", "third", "fourth", "fifth"]
parallel_map(test_func2,
             args_container=arguments,
             kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])

lengths checked
queue made
args, kwargs sorted
multiprocessing done


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec5bf4c0>

In [20]:
%%time


def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    return parallel_map(inner_test_func, args_container=[1, 2, 3])

# Пример 6
# Работает с функциями, созданными внутри других функций
test_func3()

lengths checked
queue made
args, kwargs sorted


Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'test_func3.<locals>.inner_test_func'
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'test_func3.<locals>.inner_test_func'
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'test_func3.<locals>.inner_

multiprocessing done
CPU times: user 24 ms, sys: 15.7 ms, total: 39.7 ms
Wall time: 81.4 ms


<multiprocessing.pool.IMapUnorderedIterator at 0x7f62ec5d8160>