В формулировке заданий будет использоваться понятие **worker**. Это слово обозначает какую-то единицу параллельного выполнения, в случае питона это может быть **поток** или **процесс**, выбирайте то, что лучше будет подходить к конкретной задаче

# Задание 1 (7 баллов)

В одном из заданий по ML от вас требовалось написать кастомную реализацию Random Forest. Её проблема состоит в том, что она работает медленно, так как использует всего один поток для работы. Добавление параллельного программирования в код позволит получить существенный прирост в скорости обучения и предсказаний.

В данном задании от вас требуется добавить возможность обучать случайный лес параллельно и использовать параллелизм для предсказаний. Для этого вам понадобится:
1. Добавить аргумент `n_jobs` в метод `fit`. `n_jobs` показывает количество worker'ов, используемых для распараллеливания
2. Добавить аргумент `n_jobs` в методы `predict` и `predict_proba`
3. Реализовать функционал по распараллеливанию в данных методах

В результате код `random_forest.fit(X, y, n_jobs=2)` и `random_forest.predict(X, y, n_jobs=2)` должен работать в ~1.5-2 раза быстрее, чем `random_forest.fit(X, y, n_jobs=1)` и `random_forest.predict(X, y, n_jobs=1)` соответственно

Если у вас по каким-то причинам нет кода случайного леса из ДЗ по ML, то вы можете написать его заново или попросить у однокурсника. *Детали* реализации ML части оцениваться не будут, НО, если вы поломаете логику работы алгоритма во время реализации параллелизма, то за это будут сниматься баллы

В задании можно использовать только модули из **стандартной библиотеки** питона, а также функции и классы из **sklearn** при помощи которых вы изначально писали лес

In [1]:
import random
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import make_classification
from typing import Tuple, Callable 


class RandomForestClassifierCustom(BaseEstimator):
    '''A custom implementation of random forest classifier
    (the most of the description is taken from `sklearn` source code)
    
    Attributes:
        n_estimators (int): a number of decision tree classifiers in forest
        max_depth (int): the maximum depth of the tree
        max_features (int): the number of features to consider when looking for the best split
        random_state (int): controls both the randomness of pseudosampling and feature selection
        feat_ids_by_tree (list): selected features for each tree
        trees (list): fitted trees for forest
    '''
    
    
    def __init__(
        self, n_estimators: int = 10, max_depth: int = None, max_features: int = None, random_state: int = None
    ) -> None:
        '''Constructs all the necessary attributes for the RandomForestClassifierCustom object

        Parameters:
            n_estimators (int): a number of decision tree classifiers in forest
            max_depth (int): the maximum depth of the tree
            max_features (int): the number of features to consider when looking for the best split
            random_state (int): controls both the randomness of pseudosampling and feature selection
            feat_ids_by_tree (list): selected features for each tree
            trees (list): fitted trees for forest       
        
        '''
        
        
        self.n_estimators: int = n_estimators
        self.max_depth: int = max_depth
        self.max_features: int = max_features
        self.random_state: int = random_state

        self.trees: list = []
        self.feat_ids_by_tree: list = []
        
           
    def __check_jobs(self, n_jobs: int, n_estimators: int) -> int:
        '''A private method for checking n_jobs. If n_jobs not defined, it is setted to 1
        
        Parameters:
            n_jobs (int): the number of jobs to run in parallel
            n_estimators (int): a number of trees in forest
            
        Returns: number of jobs (int)
        '''
        
        
        if n_jobs:
            return min(n_jobs, n_estimators)
        elif not n_jobs:
            return 1
        
        
    def __calc_estim_per_job(self, n_estimators: int, n_jobs: int) -> Tuple[np.ndarray, list]:
        '''A private method to split n_estimators for n_jobs. Also calculates "coordinates" to keep seed regardless of n_jobs
        
        Parameters:
            n_jobs (int): the number of jobs to run in parallel
            n_estimators (int): a number of trees in forest
            
        Returns:
            n_estimators_per_job (np.ndarray): array with splitted number of estimators per each job
            seed_coords (list): list with tuples contained start and stop coordinate for building trees and their seeds    
        '''
        
        
        n_estimators_per_job = np.full(n_jobs, n_estimators // n_jobs, dtype=int)
        n_estimators_per_job[: n_estimators % n_jobs] += 1
        
        # some calculations for building the same trees regardless n_jobs
        lst_cumsum = np.insert(np.cumsum(n_estimators_per_job), 0, 0, axis=0)
        # list for start and stop for calculating seeds
        seed_coords = []
        for i in range(1, len(lst_cumsum)):
            seed_coords.append((lst_cumsum[i-1], lst_cumsum[i]))
        return n_estimators_per_job, seed_coords
            
    
    def _build_trees(self, seed_coords: tuple, X: np.ndarray, y: np.ndarray) -> Tuple[list, list]:
        '''Private function used to build a single tree for forest
        
        Parameters:
            seed_coords (tuple): start and stop coordinates for building trees and their seeds
            X (np.ndarray): array with features
            y (np.ndarray): target vector
            
        Returns: None
        '''
        
        
        start, stop = seed_coords
        trees = []
        feat_ids_by_tree = []
        for i in range(start, stop):
            # setting seed
            np.random.seed(self.random_state + i)

            # selecting n random features 
            feat_ids = np.random.choice(X.shape[1], size=self.max_features, replace=False)
            feat_ids_by_tree.append(feat_ids)

            # creating pseudosample using bootstrap
            # random indices for pseudosample
            pseudo_idx = np.random.choice(X.shape[0], size=X.shape[0], replace=True).reshape(X.shape[0], 1)
            pseudo_X = X[pseudo_idx, feat_ids]
            pseudo_y = y[pseudo_idx]

            # creating and fitting model
            dec_tree_class = DecisionTreeClassifier(max_depth=self.max_depth,
                                                    max_features=self.max_features, 
                                                    random_state=self.random_state)
            dec_tree_class.fit(pseudo_X, pseudo_y)
            trees.append(dec_tree_class)
        return feat_ids_by_tree, trees

    
    def __do_parallel(self, func: Callable, n_jobs: int, *args) -> list: # args may contain X or both X and y
        '''Private method for running fit or predict_proba methods in parallel
        
        Parameters:
            func (Callable): function for parallel execution
            n_jobs (int): the number of jobs to run in parallel
            *args may contain X or both X and y
            
        Returns: list with futures (list)            
        '''
        
        
        n_jobs = self.__check_jobs(n_jobs, self.n_estimators)
        n_est_per_job, seed_coords = self.__calc_estim_per_job(self.n_estimators, n_jobs)
        futures = []
        with ThreadPoolExecutor() as pool:
            for i, _ in enumerate(n_est_per_job):
                futures.append(pool.submit(func, seed_coords[i], *args))
        return futures


    def fit(self, X: np.ndarray, y: np.ndarray, n_jobs: int = None) -> object:
        '''Build a forest of trees from the training set (X, y)
        
        Parameters:
            X (np.ndarray) of shape (n_samples, n_features): the training input samples
            y (np.ndarray) of shape (n_samples): target values labels
            n_jobs (int): the number of jobs to run in parallel
        Returns: self object (RandomForestClassifierCustom)
        '''
        
        
        self.classes_ = sorted(np.unique(y))
        futures = self.__do_parallel(self._build_trees, n_jobs, X, y)
        for future in futures:
            self.feat_ids_by_tree.extend(future.result()[0]) 
            self.trees.extend(future.result()[1])
        return self
            
        
    def _parallel_predict_proba(self, seed_coords: tuple, X: np.array) -> list:
        '''Function for running predict_proba using n trees per job
        
        Parameters:
            seed_coords (tuple): start and stop coordinates for using n trees
            X (np.ndarray): array with features
            
        Returns:
            probs (list): the class probabilities of the input samples based on n trees
        '''
        
        
        start, stop = seed_coords
        probs = []
        for i in range(start, stop):
            proba = self.trees[i].predict_proba(X[:, self.feat_ids_by_tree[i]])
            probs.append(proba)
        return probs
        
    
    def predict_proba(self, X: np.ndarray, n_jobs: int = None) -> list:
        '''Predicts class probabilities for X. The predicted class probabilities of an input sample are computed as
        the mean predicted class probabilities of the trees in the forest
        
        Parameters:
            X (np.ndarray) of shape (n_samples, n_features): the testing input samples
            n_jobs (int): the number of jobs to run in parallel
        '''
        
        
        futures = self.__do_parallel(self._parallel_predict_proba, n_jobs, X)
        result = []
        for future in futures:
            result.extend(future.result())
        return np.mean(result, axis=0)
    
    
    def predict(self, X: np.ndarray, n_jobs: int = None) -> np.ndarray:
        '''Predicts class for X. The predicted class of an input sample is a vote by the trees in the forest, 
        weighted by their probability estimates. That is, the predicted class is the one with highest mean probability
        estimate across the trees
        
        Parameters:
            X (np.ndarray) of shape (n_samples, n_features): the testing input samples
            n_jobs (int): the number of jobs to run in parallel
            
        Returns:
            predictions (np.ndarray) of shape (n_samples,): the predicted classes    
        '''
        
        
        probas = self.predict_proba(X, n_jobs)
        predictions = np.argmax(probas, axis=1)
        return predictions
    
    
X, y = make_classification(n_samples=100000)

In [2]:
random_forest1 = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [3]:
%%time

_ = random_forest1.fit(X, y, n_jobs=1)

CPU times: user 5.14 s, sys: 1.35 ms, total: 5.15 s
Wall time: 5.15 s


In [4]:
%%time

preds_1 = random_forest1.predict(X, n_jobs=1)

CPU times: user 130 ms, sys: 9.63 ms, total: 140 ms
Wall time: 142 ms


In [5]:
random_forest1.feat_ids_by_tree

[array([ 0, 17]),
 array([9, 6]),
 array([15, 14]),
 array([12, 16]),
 array([1, 3]),
 array([10, 14]),
 array([1, 9]),
 array([14, 19]),
 array([15,  8]),
 array([ 1, 12])]

In [6]:
random_forest2 = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [7]:
%%time

_ = random_forest2.fit(X, y, n_jobs=2)

CPU times: user 5.25 s, sys: 30.1 ms, total: 5.28 s
Wall time: 2.81 s


In [8]:
%%time

preds_2 = random_forest2.predict(X, n_jobs=2)

CPU times: user 121 ms, sys: 9.9 ms, total: 131 ms
Wall time: 78.1 ms


In [9]:
random_forest2.feat_ids_by_tree

[array([ 0, 17]),
 array([9, 6]),
 array([15, 14]),
 array([12, 16]),
 array([1, 3]),
 array([10, 14]),
 array([1, 9]),
 array([14, 19]),
 array([15,  8]),
 array([ 1, 12])]

In [10]:
(preds_1 == preds_2).all()   # Количество worker'ов не должно влиять на предсказания

True

#### Какие есть недостатки у вашей реализации параллельного Random Forest (если они есть)? Как это можно исправить? Опишите словами, можно без кода (+1 дополнительный балл)

В этом задании использовала `multiprocessing.ThreadPoolExecutor()`, хоть и задача выглядить абсолютно CPU-bound. Сначала написала для `fit` код на процессах, деревья начали строиться действительно в два раза быстрее. Но когда аналогично написала для `predict_proba`, всё это дело перестало ускоряться, поэтому перешла на потоки, проверила, что `fit` ускоряется так же, как и на процессах, даже чуть-чуть быстрее. Загадочно, честно говоря. Возможно, это тот самый случай, когда процессы стало создавать дороже, чем исполнять рассчёты. В целом, самый большой недостаток в том, что я возвращаю список futures как результат распараллеливания функций, не знаю, можно ли так, конечно, можно доработать, но эти деревья выпили из меня жизнь. Я постаралась, зафиксировать одинаковый seed для каждого дерева вне зависимости от количества workers, а сами деревья сделать разными со своим фиксированным seed. В ячейках выше видно, что деревья создавались по одинаковому набору фичей вне зависимости от количества worker (вывела аттрибут `feat_ids_by_tree`).

# Задание 2 (9 баллов)

Напишите декоратор `memory_limit`, который позволит ограничивать использование памяти декорируемой функцией.

Декоратор должен принимать следующие аргументы:
1. `soft_limit` - "мягкий" лимит использования памяти. При превышении функцией этого лимита должен будет отображён **warning**
2. `hard_limit` - "жёсткий" лимит использования памяти. При превышении функцией этого лимита должно будет брошено исключение, а функция должна немедленно завершить свою работу
3. `poll_interval` - интервал времени (в секундах) между проверками использования памяти

Требования:
1. Потребление функцией памяти должно отслеживаться **во время выполнения функции**, а не после её завершения
2. **warning** при превышении `soft_limit` должен отображаться один раз, даже если функция переходила через этот лимит несколько раз
3. Если задать `soft_limit` или `hard_limit` как `None`, то соответствующий лимит должен быть отключён
4. Лимиты должны передаваться и отображаться в формате `<number>X`, где `X` - символ, обозначающий порядок единицы измерения памяти ("B", "K", "M", "G", "T", ...)
5. В тексте warning'ов и исключений должен быть указан текщий объём используемой памяти и величина превышенного лимита

В задании можно использовать только модули из **стандартной библиотеки** питона, можно писать вспомогательные функции и/или классы

В коде ниже для вас предопределены некоторые полезные функции, вы можете ими пользоваться, а можете не пользоваться

In [11]:
import os
import sys
import psutil
import time
import warnings
import threading


def get_memory_usage():    # Показывает текущее потребление памяти процессом
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return mem_info.rss


def human_readble_to_bytes(limit):
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in symbols:
        if limit[-1] == s:
            value = int(float(limit[:-1])*prefix[s])
            return value
    return int(limit)


def bytes_to_human_readable(n_bytes):
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in reversed(symbols):
        if n_bytes >= prefix[s]:
            value = float(n_bytes) / prefix[s]
            return f"{value:.2f}{s}"
    return f"{n_bytes}B"


def memory_limit(softcap=None, hardcap=None, poll_interval=1):
    # Ваш код здесь
    pass

In [12]:
import os
import sys
import psutil
import time
import warnings
from threading import Thread
import _thread
from typing import Optional


class MemoryThread(Thread):
    '''A thread that monitors the memory usage of a process

    Attributes:
        soft_limit (str): the soft memory limit in a human-readable format (e.g., "512M")
        hard_limit (str): the hard memory limit in a human-readable format (e.g., "1.5G")
        poll_interval (float): the time interval (in seconds) for checking memory usage
        _bytes_soft_limit (int): the soft memory limit in bytes
        _bytes_hard_limit (int): the hard memory limit in bytes
        broken_soft (bool): True if the soft memory limit has been exceeded
        memory_usage (str): the current memory usage in a human-readable format (e.g., "100M")
    '''
    
    
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    
    
    def __init__(self, soft_limit, hard_limit, poll_interval) -> None:
        '''
        Initializes a MemoryThread instance

        Parameters:
            soft_limit (str): the soft memory limit in a human-readable format (e.g., "512M").
            hard_limit (str): the hard memory limit in a human-readable format (e.g., "1.5G").
            poll_interval (float): the time interval (in seconds) for checking memory usage

        Returns: None
        '''
        
        
        super().__init__()
        self.soft_limit = soft_limit
        self.hard_limit = hard_limit
        self.poll_interval = poll_interval
        self.broken_soft = False
        if soft_limit:
            self._bytes_soft_limit = self.human_readble_to_bytes(self.soft_limit)
        if hard_limit:
            self._bytes_hard_limit = self.human_readble_to_bytes(self.hard_limit)
                   
        
    def get_memory_usage(self) -> int: # Показывает текущее потребление памяти процессом
        '''Returns the current memory usage of the process in bytes'''
        
        
        process = psutil.Process(os.getpid())
        mem_info = process.memory_info()
        return mem_info.rss


    def human_readble_to_bytes(self, limit: str) -> int:
        '''Converts a human-readable memory limit (e.g., "512M") to an integer in bytes
        
        Parameters:
            limit (str): a human-readable memory limit to be converted
            
        Returns: memory limit in bytes (int)
        '''
        
        
        for s in MemoryThread.symbols:
            if limit[-1] == s:
                value = int(float(limit[:-1]) * MemoryThread.prefix[s])
                return value
        return int(limit)


    def bytes_to_human_readable(self, n_bytes: int) -> str:
        '''Converts a number of bytes to a human-readable memory size (e.g., "100M")

        Parameters:
            n_bytes (int): the number of bytes

        Returns: a string representing the memory size in a human-readable format (str)
        '''
        
        
        for s in reversed(MemoryThread.symbols):
            if n_bytes >= MemoryThread.prefix[s]:
                value = float(n_bytes) / MemoryThread.prefix[s]
                return f"{value:.2f}{s}"
        return f"{n_bytes}B"

        
    def run(self) -> None:
        '''The main run method of the thread'''
        
        
        while True:
            memory_usage = self.get_memory_usage()
            self.memory_usage = self.bytes_to_human_readable(memory_usage)
            if (not self.broken_soft) and self.soft_limit:
                self.check_soft(memory_usage)
            if self.hard_limit:
                self.check_hard(memory_usage)
            if hasattr(self, 'event'):
                return
            time.sleep(self.poll_interval)
        return
            
                      
    def check_soft(self, memory_usage: int) -> None:
        '''Check if the soft limit has been exceeded

        Parameters:
            memory_usage (int): the current memory usage in bytes

        Returns: None
        '''
        
        
        def custom_formatwarning(msg: str, *args, **kwargs) -> str:
            '''Formats UserWarning message output
            
            Parameters:
                msg (str): message to raise
                *args, **kwargs contains other parameters of UserWarning
                
            Returns: UserWarning message (str)    
            '''
            
            
            # ignore everything except the message
            return  f"Warning! {str(msg)}"
        
        if memory_usage >= self._bytes_soft_limit:
            self.broken_soft = True
            warnings.formatwarning = custom_formatwarning
            warnings.warn(
                f"Soft limit {self.soft_limit} is broken, current memory usage - {self.memory_usage}!", 
                UserWarning
            )
            
            
    def check_hard(self, memory_usage: int) -> None:
        '''Check if the hard limit has been exceeded

        Parameters:
            memory_usage (int): The current memory usage in bytes.

        Returns: None
        '''
        
        
        if memory_usage >= self._bytes_hard_limit:
            _thread.interrupt_main() # I tried to catch KeybordInterrupt error here too
            time.sleep(2)
            # I used it here to restart kernel for cleaning memory
            # for some reason, gc.collect() could not cope with this task 
            os._exit(0)
            
            
class MemLimitException(BaseException):
    '''A custom exception class for raising when memory limit is exceeded'''
    
    
    def __init__(self, obj: MemoryThread) -> None:
        '''Initializes a MemLimitException instance

        Parameters:
            obj (MemoryThread): the instance of MemoryThread where the exception occurred

        Returns: None
        '''
        
        
        self.obj = obj
    
    
    def __str__(self) -> str:
        '''Returns a string representation of the MemLimitException'''
        
        
        return f"Hard limit {self.obj.hard_limit} is broken, current memory usage - {self.obj.memory_usage}!"

            
def memory_limit(
    soft_limit: Optional[str] = "512M",
    hard_limit: Optional[str] = "1.5G",
    poll_interval: float = 0.1
) -> None:
    '''Decorator function to limit the memory usage of a function

    Parameters:
        soft_limit (str): the soft memory limit in a human-readable format (e.g., "512M")
        hard_limit (str): the hard memory limit in a human-readable format (e.g., "1.5G")
        poll_interval (float): the time interval (in seconds) for checking memory usage

    Returns: the decorated function
    '''
    
    
    def decor(func):
        def inner_func(*args, **kwargs):
            thread_started = False
            if soft_limit or hard_limit:
                thread_started = True
                mem_check = MemoryThread(soft_limit, hard_limit, poll_interval)
                mem_check.start()
            try:
                result = func(*args, **kwargs)
                if thread_started:
                    mem_check.event = True
            except KeyboardInterrupt as e: # error is not catched
                raise MemLimitException(mem_check)
            return result
        return inner_func
    return decor


@memory_limit(soft_limit="512M", hard_limit="1.5G", poll_interval=0.1)
def memory_increment():
    """
    Функция для тестирования
    
    В течение нескольких секунд достигает использования памяти 1.89G
    Потребление памяти и скорость накопления можно варьировать, изменяя код
    """
    lst = []
    for i in range(50000000):
        if i % 500000 == 0:
            time.sleep(0.1)
        lst.append(i)
    return lst

memory_increment()



MemLimitException: Hard limit 1.5G is broken, current memory usage - 1.50G!

# Задание 3 (11 баллов)

Напишите функцию `parallel_map`. Это должна быть **универсальная** функция для распараллеливания, которая эффективно работает в любых условиях.

Функция должна принимать следующие аргументы:
1. `target_func` - целевая функция (обязательный аргумент)
2. `args_container` - контейнер с позиционными аргументами для `target_func` (по-умолчанию `None` - позиционные аргументы не передаются)
3. `kwargs_container` - контейнер с именованными аргументами для `target_func` (по-умолчанию `None` - именованные аргументы не передаются)
4. `n_jobs` - количество workers, которые будут использованы для выполнения (по-умолчанию `None` - количество логических ядер CPU в системе)

Функция должна работать аналогично `***PoolExecutor.map`, применяя функцию к переданному набору аргументов, но с некоторыми дополнениями и улучшениями
    
Поскольку мы пишем **универсальную** функцию, то нам нужно будет выполнить ряд требований, чтобы она могла логично и эффективно работать в большинстве ситуаций

1. `target_func` может принимать аргументы любого вида в любом количестве
2. Любые типы данных в `args_container`, кроме `tuple`, передаются в `target_func` как единственный позиционный аргумент. `tuple` распаковываются в несколько аргументов
3. Количество элементов в `args_container` должно совпадать с количеством элементов в `kwargs_container` и наоборот, также значение одного из них или обоих может быть равно `None`, в иных случаях должна кидаться ошибка (оба аргумента переданы, но размеры не совпадают)

4. Функция должна выполнять определённое количество параллельных вызовов `target_func`, это количество зависит от числа переданных аргументов и значения `n_jobs`. Сценарии могут быть следующие
    + `args_container=None`, `kwargs_container=None`, `n_jobs=None`. В таком случае функция `target_func` выполнится параллельно столько раз, сколько на вашем устройстве логических ядер CPU
    + `args_container=None`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **5** раз
    + `args_container=[1, 2, 3]`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **3** раза, несмотря на то, что `n_jobs=5` (так как есть всего 3 набора аргументов для которых нам нужно получить результат, а лишние worker'ы создавать не имеет смысла)
    + `args_container=None`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем именованные аргументы
    + `args_container=[1, 2, 3]`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем и позиционные, и именованные аргументы
    + `args_container=[1, 2, 3, 4]`, `kwargs_container=None`, `n_jobs=2`. В таком случае в каждый момент времени параллельно будет выполняться **не более 2** функций `target_func`, так как нам нужно выполнить её 4 раза, но у нас есть только 2 worker'а.
    + В подобных случаях (из примера выше) должно оптимизироваться время выполнения. Если эти 4 вызова выполняются за 5, 1, 2 и 1 секунды, то параллельное выполнение с `n_jobs=2` должно занять **5 секунд** (не 7 и тем более не 10)

5. `parallel_map` возвращает результаты выполнения `target_func` **в том же порядке**, в котором были переданы соответствующие аргументы
6. Работает с функциями, созданными внутри других функций

Для базового решения от вас не ожидается **сверххорошая** оптимизация по времени и памяти для всех возможных случаев. Однако за хорошо оптимизированную логику работы можно получить до **+3 дополнительных баллов**

Вы можете сделать класс вместо функции, если вам удобнее

В задании можно использовать только модули из **стандартной библиотеки** питона

Ниже приведены тестовые примеры по каждому из требований

In [1]:
import multiprocessing
from typing import Dict, List, Tuple, Any


class ParallelMap:
    '''A class that parallelizes the map function using multiprocessing

    Attributes:
        target_func (function): the function to be applied to the data
        args_container (list): a list contained positional arguments for the target function
        kwargs_container (list): a list of dictionaries containing keyword arguments for the target function
        n_jobs (int): the number of CPUs used for multiprocessing
        l (int): the length of the argument containers
        _cont_mask (list): a list indicating the presence of argument containers
    '''
    
    
    
    def __init__(
        self,
        target_func: callable,
        args_container: List[Tuple] = None,
        kwargs_container: List[Dict] = None,
        n_jobs: int = None
    ) -> None:        
        '''Initializes the ParallelMap instance with the target function, positional and keyword arguments,
        and the number of CPUs to be used

        Parameters:
            target_func (function): the function to be applied to the data
            args_container (list): a list contained positional arguments for the target function
            kwargs_container (list): a list of dictionaries containing keyword arguments for the target function
            n_jobs (int): the number of CPUs used for multiprocessing

        Raises:
            ValueError: if the number of positional arguments does not match the number of keyword arguments

        Returns: None
        '''
        
        
        self.target_func = target_func
        self.args_container = args_container
        self.kwargs_container = kwargs_container
        if not n_jobs:
            self.n_jobs = multiprocessing.cpu_count()
        else:
            self.n_jobs = n_jobs
        self.__check_containers()
        self.n_jobs = min(self.n_jobs, self.l)
        
    
    def __check_containers(self) -> None:
        '''Private method that verifies that the argument containers are passed,
        sets the length of the containers, and adjusts the empy containers of required length as needed
        '''
        
        
        self._cont_mask = [bool(cont) for cont in [self.args_container, self.kwargs_container]]
        if self._cont_mask == [True, True]:
            arg_l, kwarg_l = len(self.args_container), len(self.kwargs_container)
            if arg_l != kwarg_l:
                raise ValueError(
                    f"Numbers of positional arguments and keyword arguments do not match: {arg_l} and {kwarg_l}"
                )
            self.l = arg_l
            self.args_container = self.__adj_args(self.args_container)
        elif self._cont_mask == [True, False]:
            self.l = len(self.args_container)
            self.args_container = self.__adj_args(self.args_container)
            self.kwargs_container = self.__adj_kwargs(self.l)
        elif self._cont_mask == [False, True]:
            self.l = len(self.kwargs_container)
            self.args_container = self.__adj_args(cont_l = self.l)
        else:
            self.l = self.n_jobs
            self.args_container = self.__adj_args(cont_l = self.l)
            self.kwargs_container = self.__adj_kwargs(cont_l = self.l)


    def __adj_args(self, cont: List[Any] = None, cont_l: int = None) -> List[Tuple]:
        '''Private method that transforms elements of the positional argument container to tuples,
        if the positional argument container is not passed, creates list of empty tuples with required length

        Parameters:
            cont (list): the positional argument container
            cont_l (int): the length of the positional argument container

        Returns: 
            cont (list):  a list of tuples with the correct format for positional arguments
        '''
        
        
        if not cont_l:
            cont = [tuple([i]) if (type(i) is not tuple) else i for i in cont]
        else:
            cont = [()]*cont_l
        return cont


    def __adj_kwargs(self, cont_l: int) -> List[Dict]:
        '''Private method that adjusts the list of empty dictionaries with required length

        Parameters:
            cont_l (int): the length of the keyword argument container

        Returns: a list of empty dictionaries with the correct format for keyword arguments
        '''
        
        
        return [{}]*cont_l

    
    def process_tasks(
        self,
        task_queue: multiprocessing.Queue,
        result_dict: dict
    ) -> None:
        '''Processes tasks from a multiprocessing queue

        Parameters:
            task_queue (multiprocessing.Queue): The queue of tasks to be processed
            result_dict (multiprocessing.Manager.dict): a shared dictionary to store the results

        Returns: None
        '''
        
        
        while not task_queue.empty():
            i, args, kwargs = task_queue.get()
            result = self.target_func(*args, **kwargs)
            result_dict[i] = result
        return 


    def parallel_map(self) -> list:
        '''Runs the parallel map function using multiple CPUs

        Returns: a list of results from the map function
        '''
        
        
        task_queue = multiprocessing.Queue()
        for i in range(self.l):
            task_queue.put((i, self.args_container[i], self.kwargs_container[i]))
        manager = multiprocessing.Manager()
        result_dict = manager.dict()
        processes = []
        for i in range(self.n_jobs):
            p = multiprocessing.Process(target=self.process_tasks,
                                        args=(task_queue, result_dict))
            processes.append(p)
            p.start()
        for p in processes:
            p.join()
        return [result_dict[i] for i in sorted(result_dict)]

In [2]:
import time


# Это только один пример тестовой функции, ваша parallel_map должна уметь эффективно работать с ЛЮБЫМИ функциями
# Поэтому обязательно протестируйте код на чём-нибудбь ещё
def test_func(x=1, s=2, a=1, b=1, c=1):
    time.sleep(s)
    return a*x**2 + b*x + c

In [None]:
%%time

# Пример 2.1
# Отдельные значения в args_container передаются в качестве позиционных аргументов
parallel_map(test_func, args_container=[1, 2.0, 3j-1, 4])   # Здесь происходят параллельные вызовы: test_func(1) test_func(2.0) test_func(3j-1) test_func(4)

CPU times: user 395 µs, sys: 8.26 ms, total: 8.65 ms
Wall time: 2.01 s


[3, 7.0, (-8-3j), 21]

In [3]:
%%time

parl_map = ParallelMap(test_func, args_container=[1, 2.0, 3j-1, 4])
parl_map.parallel_map()

CPU times: user 22.3 ms, sys: 1.1 ms, total: 23.4 ms
Wall time: 2.06 s


[3, 7.0, (-8-3j), 21]

In [None]:
%%time

# Пример 2.2
# Элементы типа tuple в args_container распаковываются в качестве позиционных аргументов
parallel_map(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])    # Здесь происходят параллельные вызовы: test_func(1, 1) test_func(2.0, 2) test_func(3j-1, 3) test_func(4)

CPU times: user 7.18 ms, sys: 7.73 ms, total: 14.9 ms
Wall time: 3.01 s


[3, 7.0, (-8-3j), 21]

In [4]:
%%time

parl_map = ParallelMap(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])
parl_map.parallel_map()

CPU times: user 0 ns, sys: 22.9 ms, total: 22.9 ms
Wall time: 3.03 s


[3, 7.0, (-8-3j), 21]

In [None]:
%%time

# Пример 3.1
# Возможна одновременная передача args_container и kwargs_container, но количества элементов в них должны быть равны
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

# Здесь происходят параллельные вызовы: test_func(1, s=3) test_func(2, s=3) test_func(3, s=3) test_func(4, s=3)

CPU times: user 5.89 ms, sys: 8.84 ms, total: 14.7 ms
Wall time: 3.02 s


[3, 7, 13, 21]

In [5]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3, 4],
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])
parl_map.parallel_map()

CPU times: user 10.5 ms, sys: 13.2 ms, total: 23.7 ms
Wall time: 3.04 s


[3, 7, 13, 21]

In [None]:
%%time

# Пример 3.2
# args_container может быть None, а kwargs_container задан явно
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

CPU times: user 6.54 ms, sys: 6.06 ms, total: 12.6 ms
Wall time: 3.02 s


[3, 3, 3, 3]

In [6]:
%%time

parl_map = ParallelMap(test_func,
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])
parl_map.parallel_map()

CPU times: user 1.62 ms, sys: 21.6 ms, total: 23.3 ms
Wall time: 3.03 s


[3, 3, 3, 3]

In [None]:
%%time

# Пример 3.3
# kwargs_container может быть None, а args_container задан явно
parallel_map(test_func,
             args_container=[1, 2, 3, 4])

CPU times: user 4.11 ms, sys: 9.2 ms, total: 13.3 ms
Wall time: 2.01 s


[3, 7, 13, 21]

In [7]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3, 4])
parl_map.parallel_map()

CPU times: user 0 ns, sys: 23.2 ms, total: 23.2 ms
Wall time: 2.03 s


[3, 7, 13, 21]

In [None]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 500 µs, sys: 43.3 ms, total: 43.8 ms
Wall time: 2.04 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [8]:
%%time

parl_map = ParallelMap(test_func)
parl_map.parallel_map()

CPU times: user 0 ns, sys: 69 ms, total: 69 ms
Wall time: 2.08 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [None]:
%%time

# Пример 3.5
# При несовпадении количеств позиционных и именованных аргументов кидается ошибка
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])

ValueError: Numbers of positional arguments and keyword arguments do not match: 4 and 3

In [9]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3, 4],
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])
parl_map.parallel_map()

ValueError: Numbers of positional arguments and keyword arguments do not match: 4 and 3

In [None]:
%%time

# Пример 4.1
# Если функция не имеет обязательных аргументов и аргумент n_jobs не был передан, то она выполняется параллельно столько раз, сколько ваш CPU имеет логических ядер
# В моём случае это 24, у вас может быть больше или меньше
parallel_map(test_func)

CPU times: user 9.3 ms, sys: 51.2 ms, total: 60.5 ms
Wall time: 2.06 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [10]:
multiprocessing.cpu_count()

16

In [11]:
%%time

parl_map = ParallelMap(test_func)
parl_map.parallel_map()

CPU times: user 21.5 ms, sys: 42.7 ms, total: 64.2 ms
Wall time: 2.08 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [47]:
%%time

# Пример 4.2
# Если функция не имеет обязательных аргументов и передан только аргумент n_jobs, то она выполняется параллельно n_jobs раз
parallel_map(test_func, n_jobs=2)

CPU times: user 2.06 ms, sys: 5.92 ms, total: 7.99 ms
Wall time: 2.01 s


[3, 3]

In [13]:
%%time

parl_map = ParallelMap(test_func, n_jobs=2)
parl_map.parallel_map()

CPU times: user 2.18 ms, sys: 11.9 ms, total: 14 ms
Wall time: 2.03 s


[3, 3]

In [48]:
%%time

# Пример 4.3
# Если аргументов для target_func указано МЕНЬШЕ, чем n_jobs, то используется такое же количество worker'ов, сколько было передано аргументов
parallel_map(test_func,
             args_container=[1, 2, 3],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 314 µs, sys: 8.69 ms, total: 9 ms
Wall time: 2.01 s


[3, 7, 13]

In [14]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3],
                       n_jobs=5)
parl_map.parallel_map()

CPU times: user 0 ns, sys: 17.8 ms, total: 17.8 ms
Wall time: 2.03 s


[3, 7, 13]

In [None]:
%%time

# Пример 4.4
# Аналогичный предыдущему случай, но с именованными аргументами
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 1.26 ms, sys: 9.47 ms, total: 10.7 ms
Wall time: 3.01 s


[3, 3, 3]

In [15]:
%%time

parl_map = ParallelMap(test_func,
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
                       n_jobs=5)
parl_map.parallel_map()

CPU times: user 5.45 ms, sys: 13.9 ms, total: 19.4 ms
Wall time: 3.03 s


[3, 3, 3]

In [None]:
%%time

# Пример 4.5
# Комбинация примеров 4.3 и 4.4 (переданы и позиционные и именованные аргументы)
parallel_map(test_func,
             args_container=[1, 2, 3],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 7.88 ms, sys: 0 ns, total: 7.88 ms
Wall time: 3.01 s


[3, 7, 13]

In [16]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3],
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
                       n_jobs=5)
parl_map.parallel_map()

CPU times: user 17.1 ms, sys: 1.5 ms, total: 18.6 ms
Wall time: 3.03 s


[3, 7, 13]

In [None]:
%%time

# Пример 4.6
# Если аргументов для target_func указано БОЛЬШЕ, чем n_jobs, то используется n_jobs worker'ов
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=None,
             n_jobs=2)   # Здесь используется 2 worker'a

CPU times: user 7.88 ms, sys: 0 ns, total: 7.88 ms
Wall time: 3.01 s


[3, 7, 13]

In [17]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3, 4],
                       kwargs_container=None,
                       n_jobs=2)
parl_map.parallel_map()

CPU times: user 3.25 ms, sys: 10.5 ms, total: 13.8 ms
Wall time: 4.03 s


[3, 7, 13, 21]

In [None]:
%%time

# Пример 4.7
# Время выполнения оптимизируется, данный код должен отрабатывать за 5 секунд
parallel_map(test_func,
             kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
             n_jobs=2)

CPU times: user 3.03 ms, sys: 11 ms, total: 14 ms
Wall time: 5.01 s


[3, 3, 3, 3]

In [18]:
%%time

parl_map = ParallelMap(test_func,
                       kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
                       n_jobs=2)
parl_map.parallel_map()

CPU times: user 15.6 ms, sys: 1.11 ms, total: 16.7 ms
Wall time: 5.03 s


[3, 3, 3, 3]

In [None]:
def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string

# Пример 5
# Результаты возвращаются в том же порядке, в котором были переданы соответствующие аргументы вне зависимости от того, когда завершился worker
arguments = ["first", "second", "third", "fourth", "fifth"]
parallel_map(test_func2,
             args_container=arguments,
             kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])

['first', 'second', 'third', 'fourth', 'fifth']

In [19]:
%%time

def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string


arguments = ["first", "second", "third", "fourth", "fifth"]
parl_map = ParallelMap(test_func2,
                       args_container=arguments,
                       kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])
print(parl_map.parallel_map())

['first', 'second', 'third', 'fourth', 'fifth']
CPU times: user 6.65 ms, sys: 23.7 ms, total: 30.3 ms
Wall time: 5.03 s


In [None]:
%%time


def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    return parallel_map(inner_test_func, args_container=[1, 2, 3])

# Пример 6
# Работает с функциями, созданными внутри других функций
test_func3()

[None, None, None]

In [20]:
%%time

def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    parl_map = ParallelMap(inner_test_func, args_container=[1, 2, 3])
    return parl_map.parallel_map()


test_func3()

CPU times: user 1.89 ms, sys: 16.6 ms, total: 18.5 ms
Wall time: 3.03 s


[None, None, None]

In [21]:
%%time

def calc_distance(x1=1, y1=1, x2=1, y2=1, s=1):
    distance = ((x1 - x2)**2 + (y1 - y2)**2)**0.5
    time.sleep(s)
    return distance


parl_map = ParallelMap(calc_distance, 
                       args_container=[(1, 5, 2, 3), (2, 2, 6, 8), (25, 11, 13, 16)],
                       kwargs_container=[{"s": 6}, {"s": 1}, {"s": 10}], 
                       n_jobs=1)
parl_map.parallel_map()

CPU times: user 9.08 ms, sys: 3.9 ms, total: 13 ms
Wall time: 17 s


[2.23606797749979, 7.211102550927978, 13.0]

In [22]:
%%time

parl_map = ParallelMap(calc_distance, 
                       args_container=[(1, 5, 2, 3), (2, 2, 6, 8), (25, 11, 13, 16)],
                       kwargs_container=[{"s": 6}, {"s": 1}, {"s": 10}], 
                       n_jobs=3)
parl_map.parallel_map()

CPU times: user 0 ns, sys: 19.2 ms, total: 19.2 ms
Wall time: 10 s


[2.23606797749979, 7.211102550927978, 13.0]