В формулировке заданий будет использоваться понятие **worker**. Это слово обозначает какую-то единицу параллельного выполнения, в случае питона это может быть **поток** или **процесс**, выбирайте то, что лучше будет подходить к конкретной задаче

В каждом задании нужно писать подробные аннотиции типов для:
1. Аргументов функций и классов
2. Возвращаемых значений
3. Классовых атрибутов (если такие есть)

В каждом задании нужно писать докстроки в определённом стиле (какой вам больше нравится) для всех функций, классов и методов

# Задание 1 (7 баллов)

В одном из заданий по ML от вас требовалось написать кастомную реализацию Random Forest. Её проблема состоит в том, что она работает медленно, так как использует всего один поток для работы. Добавление параллельного программирования в код позволит получить существенный прирост в скорости обучения и предсказаний.

В данном задании от вас требуется добавить возможность обучать случайный лес параллельно и использовать параллелизм для предсказаний. Для этого вам понадобится:
1. Добавить аргумент `n_jobs` в метод `fit`. `n_jobs` показывает количество worker'ов, используемых для распараллеливания
2. Добавить аргумент `n_jobs` в методы `predict` и `predict_proba`
3. Реализовать функционал по распараллеливанию в данных методах

В результате код `random_forest.fit(X, y, n_jobs=2)` и `random_forest.predict(X, y, n_jobs=2)` должен работать в ~1.5-2 раза быстрее, чем `random_forest.fit(X, y, n_jobs=1)` и `random_forest.predict(X, y, n_jobs=1)` соответственно

Если у вас по каким-то причинам нет кода случайного леса из ДЗ по ML, то вы можете написать его заново или попросить у однокурсника. *Детали* реализации ML части оцениваться не будут, НО, если вы поломаете логику работы алгоритма во время реализации параллелизма, то за это будут сниматься баллы

В задании можно использовать только модули из **стандартной библиотеки** питона, а также функции и классы из **sklearn** при помощи которых вы изначально писали лес

In [21]:
from sklearn.base import BaseEstimator
from sklearn.datasets import make_classification
import numpy as np

In [22]:
from sklearn.tree import DecisionTreeClassifier

In [23]:
from concurrent.futures import ProcessPoolExecutor

In [24]:
from typing import Optional, Callable


In [25]:
class RandomForestClassifierCustom(DecisionTreeClassifier):
    def __init__(self, n_estimators, max_depth, max_features, random_state):
        """
        Initialize a random forest regressor.

        Args:
        n_estimators: int
            The number of trees in the forest.
        max_depth: int
            The maximum depth of each tree.
        max_features: int
            The number of features to consider when looking for the best split.
        random_state: int
            The seed used by the random number generator.

        Attributes:
            trees: list of DecisionTreeRegressor
                The individual decision trees in the forest.
            feat_ids_by_tree: list of ndarray
            The indices of the features used by each tree.
        """
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.random_state = random_state

        self.trees = []
        self.feat_ids_by_tree = []

    def learn_tree(self, data):
        """
        Trains a decision tree classifier on the given data.

        Args:
            data (tuple): A tuple containing the features (X), labels (y), and random seed to use.

        Returns:
            clf (DecisionTreeClassifier): A trained decision tree classifier.

        """
        X, y, seed = data
        clf = DecisionTreeClassifier(random_state=seed, max_depth=self.max_depth, max_features=self.max_features)
        clf.fit(X, y)
        return clf

    def fit(self, X, y, n_jobs):
        """
        Fits a random forest classifier on the given data.

        Args:
            X (ndarray): An array of shape (n_samples, n_features) representing the input features.
            y (ndarray): An array of shape (n_samples, ) representing the target labels.
            n_jobs (int): The number of CPU cores to use for parallel processing.

        Returns:
            self (RandomForestClassifier): An instance of the trained random forest classifier.
        """
        data = []
        for i in range(0, self.n_estimators):
            seed = np.random.seed(self.random_state + i)
            idx_features = np.random.choice(X.shape[1], self.max_features, replace=False)
            self.feat_ids_by_tree.append(idx_features)
            idx_obj = np.random.choice(X.shape[0], X.shape[0], replace=True)
            X_sample = X[idx_obj, :][:, idx_features]
            y_sample = y[idx_obj, ]
            data.append((X_sample, y_sample, seed))
        with ProcessPoolExecutor(n_jobs) as pool:
            self.trees = list(pool.map(self.learn_tree, data))
        return self
    
    
    def predict_proba_by_one(self, pair_data):
        """
        Predict the probability of the target class for a bootstrapped sample using the given algorithm.

        Args:
            pair_data (Tuple): A tuple containing the algorithm and the data point to be predicted.

        Returns:
            Numpy array: An array of shape (1, n_classes) containing the probability estimates for each class.
        """
        alg, data = pair_data
        return alg.predict_proba(data)
    

    def predict_proba(self, X, n_jobs):
        """
        Predict class probabilities for input data using a forest of decision trees.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
        The input data.
        n_jobs : int, optional (default=1)
        The number of CPU cores to use for parallel processing.

        Returns
        -------
        probas : array of shape (n_samples, n_classes)
        The class probabilities for each input sample.
        """
        alg_per_data = []
        for i in range(len(self.trees)):
            alg_per_data.append((self.trees[i], X[:, self.feat_ids_by_tree[i]]))
    
        with ProcessPoolExecutor(n_jobs) as pool:
            all_prob = list(pool.map(self.predict_proba_by_one, alg_per_data))
        probas = np.concatenate(all_prob)
        return probas
    

    def predict(self, X, n_jobs):
        """
        Predicts the class labels for the given input features using the trained random forest classifier.

        Args:
            X (ndarray): An array of shape (n_samples, n_features) representing the input features.
            n_jobs (int): The number of CPU cores to use for parallel processing.

        Returns:
            predictions (ndarray): An array of shape (n_samples, ) representing the predicted class labels.
        """
        probas = self.predict_proba(X, n_jobs)
        predictions = np.argmax(probas, 1)       
        return predictions

In [26]:
X, y = make_classification(n_samples=100000)

In [27]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [28]:
%%time

_ = random_forest.fit(X, y, n_jobs=1)

CPU times: user 110 ms, sys: 29.1 ms, total: 139 ms
Wall time: 4.52 s


In [29]:
%%time

_ = random_forest.fit(X, y, n_jobs=2)

CPU times: user 155 ms, sys: 143 ms, total: 299 ms
Wall time: 2.59 s


In [30]:
%%time

_ = random_forest.fit(X, y, n_jobs=5)

CPU times: user 153 ms, sys: 157 ms, total: 310 ms
Wall time: 1.5 s


In [31]:
%%time

preds_1_1 = random_forest.predict_proba(X, n_jobs=1)

CPU times: user 56.9 ms, sys: 142 ms, total: 199 ms
Wall time: 261 ms


In [32]:
%%time

preds_1_2 = random_forest.predict_proba(X, n_jobs=2)

CPU times: user 99.7 ms, sys: 124 ms, total: 223 ms
Wall time: 274 ms


In [33]:
%%time

preds_2_1 = random_forest.predict(X, n_jobs=1)

CPU times: user 51.7 ms, sys: 168 ms, total: 220 ms
Wall time: 299 ms


In [34]:
%%time


preds_2_2 = random_forest.predict(X, n_jobs=2)

CPU times: user 88.2 ms, sys: 173 ms, total: 261 ms
Wall time: 298 ms


In [35]:
(preds_2_1 == preds_2_2).all()   # Количество worker'ов не должно влиять на предсказания

True

#### Какие есть недостатки у вашей реализации параллельного Random Forest (если они есть)? Как это можно исправить? Опишите словами, можно без кода (+1 дополнительный балл)

#### Каждое дерево в рэндом форест можно параллелить, независимо обучая, я же перед тем как обучать собираю пары дерево-данные что занимает O(N) 

# Задание 2 (9 баллов)

Напишите декоратор `memory_limit`, который позволит ограничивать использование памяти декорируемой функцией.

Декоратор должен принимать следующие аргументы:
1. `soft_limit` - "мягкий" лимит использования памяти. При превышении функцией этого лимита должен будет отображён **warning**
2. `hard_limit` - "жёсткий" лимит использования памяти. При превышении функцией этого лимита должно будет брошено исключение, а функция должна немедленно завершить свою работу
3. `poll_interval` - интервал времени (в секундах) между проверками использования памяти

Требования:
1. Потребление функцией памяти должно отслеживаться **во время выполнения функции**, а не после её завершения
2. **warning** при превышении `soft_limit` должен отображаться один раз, даже если функция переходила через этот лимит несколько раз
3. Если задать `soft_limit` или `hard_limit` как `None`, то соответствующий лимит должен быть отключён
4. Лимиты должны передаваться и отображаться в формате `<number>X`, где `X` - символ, обозначающий порядок единицы измерения памяти ("B", "K", "M", "G", "T", ...)
5. В тексте warning'ов и исключений должен быть указан текщий объём используемой памяти и величина превышенного лимита

В задании можно использовать только модули из **стандартной библиотеки** питона, можно писать вспомогательные функции и/или классы

В коде ниже для вас предопределены некоторые полезные функции, вы можете ими пользоваться, а можете не пользоваться

In [36]:

import os
import psutil
import time
import warnings
import threading
import signal

In [37]:


def get_memory_usage() -> int:
    """
    Returns the current memory usage of the Python process in bytes.

    Uses the psutil library to obtain the memory information of the process
    associated with the current process ID (PID). Specifically, it retrieves the
    resident set size" (rss) value of the memory_info() method, which indicates
    the total memory usage of the process, including both code and data.

    Returns:
    An integer representing the memory usage of the process in bytes.
    """

    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return mem_info.rss


def bytes_to_human_readable(n_bytes: int) -> str:
    """
    Converts a number of bytes to a human-readable string representation.

    This function takes a number of bytes as an integer argument and returns a 
    human-readable string representation of that number, expressed in units 
    of kilobytes (K), megabytes (M), gigabytes (G), terabytes (T), petabytes (P), 
    exabytes (E), zettabytes (Z), or yottabytes (Y).

    Args:
        n_bytes (int): An integer representing the number of bytes to be converted.

    Returns:
        A string representing the human-readable size of the input number of bytes. 
        If the input is less than 1 kilobyte, the function returns the input as a string.
    """
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in reversed(symbols):
        if n_bytes >= prefix[s]:
            value = float(n_bytes) / prefix[s]
            return f"{value:.2f}{s}"
    return f"{n_bytes}"




In [38]:
def convert_to_bytes(size_str: str) -> int:
    """
    Converts a string representing a size of memory to an integer number of bytes.

    This function takes a string representing a size in megabytes, with a suffix of either 
    B (bytes), K (kilobytes), M (megabytes), G (gigabytes), T (terabytes), P (petabytes), 
    E (exabytes), Z (zettabytes), or Y (yottabytes), and converts it to an integer 
    representing the number of bytes.

    Args:
        size_str (str): A string representing a size in megabytes with a size suffix.

    Returns:
        An integer representing the size in bytes that corresponds to the input string.
    """

    size_suffixes = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4,  
                    'P': 1024**5,  'E': 1024**6,  'Z': 1024**7,  'T': 1024**8, }
    numeric_part, size_suffix = float(size_str[:-1]), size_str[-1].upper()
    size_bytes = numeric_part * size_suffixes[size_suffix]
    return int(size_bytes)

In [39]:
class Memory_Usage(threading.Thread):
    def __init__(self, get_memory_usage, bytes_to_human_readable, 
                 convert_megabytes_to_bytes, soft_limit, hard_limit, poll_interval):
        """
        A class to monitor memory usage and raise warnings or terminate the program if memory usage exceeds the specified limits.
        Args:
            get_memory_usage (function): A function that returns the current memory usage in bytes.
            bytes_to_human_readable (function): A function that converts a given number of bytes to a human-readable string format.
            convert_megabytes_to_bytes (function): A function that converts a given number of megabytes to bytes.
            soft_limit (int): The soft memory usage limit in megabytes. If the memory usage exceeds this limit, a warning is raised.
            hard_limit (int): The hard memory usage limit in megabytes. If the memory usage exceeds this limit, the program is terminated.
            poll_interval (float): The time interval (in seconds) between successive checks of memory usage.
        Methods:
        run(): The method that runs the memory usage monitoring process. It continuously checks the current memory usage 
                and raises warnings or terminates the program if the memory usage exceeds the specified limits.
        """
        super().__init__()    
        self.get_memory_usage = get_memory_usage
        self.bytes_to_human_readable = bytes_to_human_readable
        self.soft_limit = soft_limit
        self.hard_limit = hard_limit
        self.poll_interval = poll_interval
        self.convert_megabytes_to_bytes = convert_megabytes_to_bytes

    def run(self):
        while True:
            bytes = self.get_memory_usage()
            if self.hard_limit:
                bytes_hard_limit = self.convert_megabytes_to_bytes(self.hard_limit)
                if bytes > bytes_hard_limit:
                    pid = os.getpid()
                    print('Exceed hard limit')
                    os.kill(pid, signal.SIGKILL)
                    
                
            if self.soft_limit:
                bytes_soft_limit = self.convert_megabytes_to_bytes(self.soft_limit)
                if bytes > bytes_soft_limit:
                    diff = bytes - bytes_soft_limit
                    diff_memory = self.bytes_to_human_readable(diff)
                    cur_memory = self.bytes_to_human_readable(bytes)
                    report = f'Exceed memory on {diff_memory}, current memory usage:{cur_memory}'
                    return warnings.warn(report)
            
            time.sleep(self.poll_interval)
        

In [40]:
def memory_limit(soft_limit: Optional[str], hard_limit: Optional[str], poll_interval: int=10) -> Callable:
    def decorator(func):
        """Decorator function to measure memory usage of a function.

        Args:
            func (function): The function to measure memory usage of.

        Returns:
            function: A wrapper function that measures memory usage before and after
            calling the given function.
        """
        def wrapper(*args, **kwargs):
            memory_thread = Memory_Usage(get_memory_usage, bytes_to_human_readable, convert_megabytes_to_bytes, soft_limit, hard_limit, poll_interval)
            memory_thread.start()
            func(*args, **kwargs)
            return func(*args, **kwargs)
        return wrapper
    return decorator

In [41]:
@memory_limit(soft_limit='10M', hard_limit='10G')
def memory_increment() -> list:
    lst = []
    for i in range(1_000):
        if i % 10 == 0:
            time.sleep(0.1)
        lst.append(i)
    return lst

In [17]:
memory_increment()



[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,


In [28]:
@memory_limit(soft_limit='100M', hard_limit='1G', poll_interval=10)
def memory_increment() -> list:
    lst = []
    for i in range(1_000_000_000):
        if i % 100000 == 0:
            time.sleep(0.1)
        lst.append(i)
    return lst

In [29]:
memory_increment()   



: 

: 

##### Видимо - это особенность ноутбука - лог при при превышении hard limit не принтуется.
##### Делал - аналогичный код в скрипте - все было хорошо. Лог -> заверешение процесса. 

#### Задание 3 - не делал

# Задание 3 (11 баллов)

Напишите функцию `parallel_map`. Это должна быть **универсальная** функция для распараллеливания, которая эффективно работает в любых условиях.

Функция должна принимать следующие аргументы:
1. `target_func` - целевая функция (обязательный аргумент)
2. `args_container` - контейнер с позиционными аргументами для `target_func` (по-умолчанию `None` - позиционные аргументы не передаются)
3. `kwargs_container` - контейнер с именованными аргументами для `target_func` (по-умолчанию `None` - именованные аргументы не передаются)
4. `n_jobs` - количество workers, которые будут использованы для выполнения (по-умолчанию `None` - количество логических ядер CPU в системе)

Функция должна работать аналогично `***PoolExecutor.map`, применяя функцию к переданному набору аргументов, но с некоторыми дополнениями и улучшениями
    
Поскольку мы пишем **универсальную** функцию, то нам нужно будет выполнить ряд требований, чтобы она могла логично и эффективно работать в большинстве ситуаций

1. `target_func` может принимать аргументы любого вида в любом количестве
2. Любые типы данных в `args_container`, кроме `tuple`, передаются в `target_func` как единственный позиционный аргумент. `tuple` распаковываются в несколько аргументов
3. Количество элементов в `args_container` должно совпадать с количеством элементов в `kwargs_container` и наоборот, также значение одного из них или обоих может быть равно `None`, в иных случаях должна кидаться ошибка (оба аргумента переданы, но размеры не совпадают)

4. Функция должна выполнять определённое количество параллельных вызовов `target_func`, это количество зависит от числа переданных аргументов и значения `n_jobs`. Сценарии могут быть следующие
    + `args_container=None`, `kwargs_container=None`, `n_jobs=None`. В таком случае функция `target_func` выполнится параллельно столько раз, сколько на вашем устройстве логических ядер CPU
    + `args_container=None`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **5** раз
    + `args_container=[1, 2, 3]`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **3** раза, несмотря на то, что `n_jobs=5` (так как есть всего 3 набора аргументов для которых нам нужно получить результат, а лишние worker'ы создавать не имеет смысла)
    + `args_container=None`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем именованные аргументы
    + `args_container=[1, 2, 3]`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем и позиционные, и именованные аргументы
    + `args_container=[1, 2, 3, 4]`, `kwargs_container=None`, `n_jobs=2`. В таком случае в каждый момент времени параллельно будет выполняться **не более 2** функций `target_func`, так как нам нужно выполнить её 4 раза, но у нас есть только 2 worker'а.
    + В подобных случаях (из примера выше) должно оптимизироваться время выполнения. Если эти 4 вызова выполняются за 5, 1, 2 и 1 секунды, то параллельное выполнение с `n_jobs=2` должно занять **5 секунд** (не 7 и тем более не 10)

5. `parallel_map` возвращает результаты выполнения `target_func` **в том же порядке**, в котором были переданы соответствующие аргументы
6. Работает с функциями, созданными внутри других функций

Для базового решения от вас не ожидается **сверххорошая** оптимизация по времени и памяти для всех возможных случаев. Однако за хорошо оптимизированную логику работы можно получить до **+3 дополнительных баллов**

Вы можете сделать класс вместо функции, если вам удобнее

В задании можно использовать только модули из **стандартной библиотеки** питона

Ниже приведены тестовые примеры по каждому из требований

In [None]:
def parallel_map(target_func,
                 args_container=None,
                 kwargs_container=None,
                 n_jobs=None):
    # Ваш код здесь

In [None]:
import time


# Это только один пример тестовой функции, ваша parallel_map должна уметь эффективно работать с ЛЮБЫМИ функциями
# Поэтому обязательно протестируйте код на чём-нибудбь ещё
def test_func(x=1, s=2, a=1, b=1, c=1):
    time.sleep(s)
    return a*x**2 + b*x + c

In [None]:
%%time

# Пример 2.1
# Отдельные значения в args_container передаются в качестве позиционных аргументов
parallel_map(test_func, args_container=[1, 2.0, 3j-1, 4])   # Здесь происходят параллельные вызовы: test_func(1) test_func(2.0) test_func(3j-1) test_func(4)

CPU times: user 395 µs, sys: 8.26 ms, total: 8.65 ms
Wall time: 2.01 s


[3, 7.0, (-8-3j), 21]

In [None]:
%%time

# Пример 2.2
# Элементы типа tuple в args_container распаковываются в качестве позиционных аргументов
parallel_map(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])    # Здесь происходят параллельные вызовы: test_func(1, 1) test_func(2.0, 2) test_func(3j-1, 3) test_func(4)

CPU times: user 7.18 ms, sys: 7.73 ms, total: 14.9 ms
Wall time: 3.01 s


[3, 7.0, (-8-3j), 21]

In [None]:
%%time

# Пример 3.1
# Возможна одновременная передача args_container и kwargs_container, но количества элементов в них должны быть равны
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

# Здесь происходят параллельные вызовы: test_func(1, s=3) test_func(2, s=3) test_func(3, s=3) test_func(4, s=3)

CPU times: user 5.89 ms, sys: 8.84 ms, total: 14.7 ms
Wall time: 3.02 s


[3, 7, 13, 21]

In [None]:
%%time

# Пример 3.2
# args_container может быть None, а kwargs_container задан явно
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

CPU times: user 6.54 ms, sys: 6.06 ms, total: 12.6 ms
Wall time: 3.02 s


[3, 3, 3, 3]

In [None]:
%%time

# Пример 3.3
# kwargs_container может быть None, а args_container задан явно
parallel_map(test_func,
             args_container=[1, 2, 3, 4])

CPU times: user 4.11 ms, sys: 9.2 ms, total: 13.3 ms
Wall time: 2.01 s


[3, 7, 13, 21]

In [None]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 500 µs, sys: 43.3 ms, total: 43.8 ms
Wall time: 2.04 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [None]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 500 µs, sys: 43.3 ms, total: 43.8 ms
Wall time: 2.04 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [None]:
%%time

# Пример 3.5
# При несовпадении количеств позиционных и именованных аргументов кидается ошибка
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])

ValueError: Numbers of positional arguments and keyword arguments do not match: 4 and 3

In [None]:
%%time

# Пример 4.1
# Если функция не имеет обязательных аргументов и аргумент n_jobs не был передан, то она выполняется параллельно столько раз, сколько ваш CPU имеет логических ядер
# В моём случае это 24, у вас может быть больше или меньше
parallel_map(test_func)

CPU times: user 9.3 ms, sys: 51.2 ms, total: 60.5 ms
Wall time: 2.06 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [None]:
%%time

# Пример 4.2
# Если функция не имеет обязательных аргументов и передан только аргумент n_jobs, то она выполняется параллельно n_jobs раз
parallel_map(test_func, n_jobs=2)

CPU times: user 2.06 ms, sys: 5.92 ms, total: 7.99 ms
Wall time: 2.01 s


[3, 3]

In [None]:
%%time

# Пример 4.3
# Если аргументов для target_func указано МЕНЬШЕ, чем n_jobs, то используется такое же количество worker'ов, сколько было передано аргументов
parallel_map(test_func,
             args_container=[1, 2, 3],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 314 µs, sys: 8.69 ms, total: 9 ms
Wall time: 2.01 s


[3, 7, 13]

In [None]:
%%time

# Пример 4.4
# Аналогичный предыдущему случай, но с именованными аргументами
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 1.26 ms, sys: 9.47 ms, total: 10.7 ms
Wall time: 3.01 s


[3, 3, 3]

In [None]:
%%time

# Пример 4.5
# Комбинация примеров 4.3 и 4.4 (переданы и позиционные и именованные аргументы)
parallel_map(test_func,
             args_container=[1, 2, 3],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 7.88 ms, sys: 0 ns, total: 7.88 ms
Wall time: 3.01 s


[3, 7, 13]

In [None]:
%%time

# Пример 4.6
# Если аргументов для target_func указано БОЛЬШЕ, чем n_jobs, то используется n_jobs worker'ов
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=None,
             n_jobs=2)   # Здесь используется 2 worker'a

CPU times: user 7.88 ms, sys: 0 ns, total: 7.88 ms
Wall time: 3.01 s


[3, 7, 13]

In [None]:
%%time

# Пример 4.7
# Время выполнения оптимизируется, данный код должен отрабатывать за 5 секунд
parallel_map(test_func,
             kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
             n_jobs=2)

CPU times: user 3.03 ms, sys: 11 ms, total: 14 ms
Wall time: 5.01 s


[3, 3, 3, 3]

In [None]:
def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string

# Пример 5
# Результаты возвращаются в том же порядке, в котором были переданы соответствующие аргументы вне зависимости от того, когда завершился worker
arguments = ["first", "second", "third", "fourth", "fifth"]
parallel_map(test_func2,
             args_container=arguments,
             kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])

['first', 'second', 'third', 'fourth', 'fifth']

In [None]:
%%time


def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    return parallel_map(inner_test_func, args_container=[1, 2, 3])

# Пример 6
# Работает с функциями, созданными внутри других функций
test_func3()

[None, None, None]