The concept **worker** will be used in the formulation of tasks. This word denotes some kind of unit of parallel execution, in the case of Python it can be a **thread** or **process**, choose whichever is best suited to the specific tas

# Task 1

One of the ML assignments required you to write a custom implementation of Random Forest. Its problem is that it is slow because it uses only one thread to work. Adding parallel programming to the code will allow you to get a significant increase in the speed of learning and predictions.

This assignment requires you to add the ability to train a random forest in parallel and use parallelism for predictions. To do this you will need:
1. Add the `n_jobs` argument to the `fit` method. `n_jobs` shows the number of workers used for parallelization
2. Add the `n_jobs` argument to the `predict` and `predict_proba` methods
3. Implement parallelization functionality in these methods

As a result, the code `random_forest.fit(X, y, n_jobs=2)` and `random_forest.predict(X, y, n_jobs=2)` should run ~1.5-2 times faster than `random_forest.fit(X, y, n_jobs=1)` and `random_forest.predict(X, y, n_jobs=1)` respectively

If for some reason you do not have the random forest code from the ML DB, then you can write it again or ask a classmate. *Details* of the implementation of the ML part will not be assessed, BUT, if you break the logic of the algorithm while implementing parallelism, then points will be deducted for this

In the task, you can only use modules from the **standard Python library**, as well as functions and classes from **sklearn** with which you originally wrote the forest

In [1]:
import random
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import make_classification
from typing import Tuple, Callable 


class RandomForestClassifierCustom(BaseEstimator):
    '''A custom implementation of random forest classifier
    (the most of the description is taken from `sklearn` source code)
    
    Attributes:
        n_estimators (int): a number of decision tree classifiers in forest
        max_depth (int): the maximum depth of the tree
        max_features (int): the number of features to consider when looking for the best split
        random_state (int): controls both the randomness of pseudosampling and feature selection
        feat_ids_by_tree (list): selected features for each tree
        trees (list): fitted trees for forest
    '''
    
    
    def __init__(
        self, n_estimators: int = 10, max_depth: int = None, max_features: int = None, random_state: int = None
    ) -> None:
        '''Constructs all the necessary attributes for the RandomForestClassifierCustom object

        Parameters:
            n_estimators (int): a number of decision tree classifiers in forest
            max_depth (int): the maximum depth of the tree
            max_features (int): the number of features to consider when looking for the best split
            random_state (int): controls both the randomness of pseudosampling and feature selection
            feat_ids_by_tree (list): selected features for each tree
            trees (list): fitted trees for forest       
        
        '''
        
        
        self.n_estimators: int = n_estimators
        self.max_depth: int = max_depth
        self.max_features: int = max_features
        self.random_state: int = random_state

        self.trees: list = []
        self.feat_ids_by_tree: list = []
        
           
    def __check_jobs(self, n_jobs: int, n_estimators: int) -> int:
        '''A private method for checking n_jobs. If n_jobs not defined, it is setted to 1
        
        Parameters:
            n_jobs (int): the number of jobs to run in parallel
            n_estimators (int): a number of trees in forest
            
        Returns: number of jobs (int)
        '''
        
        
        if n_jobs:
            return min(n_jobs, n_estimators)
        elif not n_jobs:
            return 1
        
        
    def __calc_estim_per_job(self, n_estimators: int, n_jobs: int) -> Tuple[np.ndarray, list]:
        '''A private method to split n_estimators for n_jobs. Also calculates "coordinates" to keep seed regardless of n_jobs
        
        Parameters:
            n_jobs (int): the number of jobs to run in parallel
            n_estimators (int): a number of trees in forest
            
        Returns:
            n_estimators_per_job (np.ndarray): array with splitted number of estimators per each job
            seed_coords (list): list with tuples contained start and stop coordinate for building trees and their seeds    
        '''
        
        
        n_estimators_per_job = np.full(n_jobs, n_estimators // n_jobs, dtype=int)
        n_estimators_per_job[: n_estimators % n_jobs] += 1
        
        # some calculations for building the same trees regardless n_jobs
        lst_cumsum = np.insert(np.cumsum(n_estimators_per_job), 0, 0, axis=0)
        # list for start and stop for calculating seeds
        seed_coords = []
        for i in range(1, len(lst_cumsum)):
            seed_coords.append((lst_cumsum[i-1], lst_cumsum[i]))
        return n_estimators_per_job, seed_coords
            
    
    def _build_trees(self, seed_coords: tuple, X: np.ndarray, y: np.ndarray) -> Tuple[list, list]:
        '''Private function used to build a single tree for forest
        
        Parameters:
            seed_coords (tuple): start and stop coordinates for building trees and their seeds
            X (np.ndarray): array with features
            y (np.ndarray): target vector
            
        Returns: 
            feat_ids_by_tree (lst): list of arrays of features used for training of corresponding tree
            trees (lst): list of fitted trees
        '''
        
        
        start, stop = seed_coords
        trees = []
        feat_ids_by_tree = []
        for i in range(start, stop):
            # setting seed
            np.random.seed(self.random_state + i)

            # selecting n random features 
            feat_ids = np.random.choice(X.shape[1], size=self.max_features, replace=False)
            feat_ids_by_tree.append(feat_ids)

            # creating pseudosample using bootstrap
            # random indices for pseudosample
            pseudo_idx = np.random.choice(X.shape[0], size=X.shape[0], replace=True).reshape(X.shape[0], 1)
            pseudo_X = X[pseudo_idx, feat_ids]
            pseudo_y = y[pseudo_idx]

            # creating and fitting model
            dec_tree_class = DecisionTreeClassifier(max_depth=self.max_depth,
                                                    max_features=self.max_features, 
                                                    random_state=self.random_state)
            dec_tree_class.fit(pseudo_X, pseudo_y)
            trees.append(dec_tree_class)
        return feat_ids_by_tree, trees

    
    def __do_parallel(self, func: Callable, n_jobs: int, *args) -> list: # args may contain X or both X and y
        '''Private method for running fit or predict_proba methods in parallel
        
        Parameters:
            func (Callable): function for parallel execution
            n_jobs (int): the number of jobs to run in parallel
            *args may contain X or both X and y
            
        Returns: list with futures (list)            
        '''
        
        
        n_jobs = self.__check_jobs(n_jobs, self.n_estimators)
        n_est_per_job, seed_coords = self.__calc_estim_per_job(self.n_estimators, n_jobs)
        futures = []
        with ThreadPoolExecutor() as pool:
            for i, _ in enumerate(n_est_per_job):
                futures.append(pool.submit(func, seed_coords[i], *args))
        return futures


    def fit(self, X: np.ndarray, y: np.ndarray, n_jobs: int = None) -> object:
        '''Build a forest of trees from the training set (X, y)
        
        Parameters:
            X (np.ndarray) of shape (n_samples, n_features): the training input samples
            y (np.ndarray) of shape (n_samples): target values labels
            n_jobs (int): the number of jobs to run in parallel
        Returns: self object (RandomForestClassifierCustom)
        '''
        
        
        self.classes_ = sorted(np.unique(y))
        futures = self.__do_parallel(self._build_trees, n_jobs, X, y)
        for future in futures:
            self.feat_ids_by_tree.extend(future.result()[0]) 
            self.trees.extend(future.result()[1])
        return self
            
        
    def _parallel_predict_proba(self, seed_coords: tuple, X: np.array) -> list:
        '''Function for running predict_proba using n trees per job
        
        Parameters:
            seed_coords (tuple): start and stop coordinates for using n trees
            X (np.ndarray): array with features
            
        Returns:
            probs (list): the class probabilities of the input samples based on n trees
        '''
        
        
        start, stop = seed_coords
        probs = []
        for i in range(start, stop):
            proba = self.trees[i].predict_proba(X[:, self.feat_ids_by_tree[i]])
            probs.append(proba)
        return probs
        
    
    def predict_proba(self, X: np.ndarray, n_jobs: int = None) -> list:
        '''Predicts class probabilities for X. The predicted class probabilities of an input sample are computed as
        the mean predicted class probabilities of the trees in the forest
        
        Parameters:
            X (np.ndarray) of shape (n_samples, n_features): the testing input samples
            n_jobs (int): the number of jobs to run in parallel
        '''
        
        
        futures = self.__do_parallel(self._parallel_predict_proba, n_jobs, X)
        result = []
        for future in futures:
            result.extend(future.result())
        return np.mean(result, axis=0)
    
    
    def predict(self, X: np.ndarray, n_jobs: int = None) -> np.ndarray:
        '''Predicts class for X. The predicted class of an input sample is a vote by the trees in the forest, 
        weighted by their probability estimates. That is, the predicted class is the one with highest mean probability
        estimate across the trees
        
        Parameters:
            X (np.ndarray) of shape (n_samples, n_features): the testing input samples
            n_jobs (int): the number of jobs to run in parallel
            
        Returns:
            predictions (np.ndarray) of shape (n_samples,): the predicted classes    
        '''
        
        
        probas = self.predict_proba(X, n_jobs)
        predictions = np.argmax(probas, axis=1)
        return predictions
    
    
X, y = make_classification(n_samples=100000)

In [2]:
random_forest1 = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [3]:
%%time

random_forest1.fit(X, y, n_jobs=1)

CPU times: total: 17.4 s
Wall time: 17.6 s


In [5]:
%%time

preds_1 = random_forest1.predict(X, n_jobs=1)

CPU times: total: 344 ms
Wall time: 329 ms


In [6]:
random_forest1.feat_ids_by_tree

[array([ 0, 17]),
 array([9, 6]),
 array([15, 14]),
 array([12, 16]),
 array([1, 3]),
 array([10, 14]),
 array([1, 9]),
 array([14, 19]),
 array([15,  8]),
 array([ 1, 12])]

In [7]:
random_forest2 = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [8]:
%%time

_ = random_forest2.fit(X, y, n_jobs=2)

CPU times: total: 20.8 s
Wall time: 10.9 s


In [9]:
%%time

preds_2 = random_forest2.predict(X, n_jobs=2)

CPU times: total: 391 ms
Wall time: 208 ms


In [10]:
random_forest2.feat_ids_by_tree

[array([ 0, 17]),
 array([9, 6]),
 array([15, 14]),
 array([12, 16]),
 array([1, 3]),
 array([10, 14]),
 array([1, 9]),
 array([14, 19]),
 array([15,  8]),
 array([ 1, 12])]

In [11]:
(preds_1 == preds_2).all()   # Количество worker'ов не должно влиять на предсказания

True

#### What are the shortcomings (if any) of your parallel Random Forest implementation? How can this be fixed? Describe in words, without code

In this task I used `multiprocessing.ThreadPoolExecutor()`, although the task looked absolutely CPU-bound. First I wrote the code for `fit` on processes, the trees really started to be built twice as fast. But when I wrote the same for `predict_proba`, the whole thing stopped accelerating, so I switched to threads and checked that `fit` accelerated in the same way as on processes, even a little faster. Mysterious, to be honest. Perhaps this is the same case when creating processes has become more expensive than executing calculations. Overall, the biggest drawback is that I return a list of futures as a result of parallelizing functions.

# Task 2

Write a `memory_limit` decorator that allows you to limit the memory usage of the function you are decorating.

The decorator must accept the following arguments:
1. `soft_limit` - “soft” memory usage limit. If a function exceeds this limit, **warning** should be displayed
2. `hard_limit` - “hard” memory usage limit. If a function exceeds this limit, an exception should be thrown and the function should exit immediately
3. `poll_interval` - time interval (in seconds) between memory usage checks

Requirements:
1. Memory consumption of a function should be monitored **during the execution of the function**, and not after it completes
2. **warning** when `soft_limit` is exceeded should be displayed once, even if the function has crossed this limit several times
3. If you set `soft_limit` or `hard_limit` to `None`, then the corresponding limit should be disabled
4. Limits must be transmitted and displayed in the format `<number>X`, where `X` is a character indicating the order of the memory unit ("B", "K", "M", "G", "T", . ..)
5. The text of warnings and exceptions must indicate the current amount of memory used and the amount of the exceeded limit

In a task, you can only use modules from the Python **standard library**; you can write auxiliary functions and/or classes

In the code below, some useful functions are predefined for you, you may or may not use them.

In [13]:
import os
import sys
import psutil
import time
import warnings
import threading


def get_memory_usage():    # Shows the current memory consumption of the process
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return mem_info.rss


def human_readble_to_bytes(limit):
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in symbols:
        if limit[-1] == s:
            value = int(float(limit[:-1])*prefix[s])
            return value
    return int(limit)


def bytes_to_human_readable(n_bytes):
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in reversed(symbols):
        if n_bytes >= prefix[s]:
            value = float(n_bytes) / prefix[s]
            return f"{value:.2f}{s}"
    return f"{n_bytes}B"

In [12]:
import os
import sys
import psutil
import time
import warnings
from threading import Thread
import _thread
from typing import Optional


class MemoryThread(Thread):
    '''A thread that monitors the memory usage of a process

    Attributes:
        soft_limit (str): the soft memory limit in a human-readable format (e.g., "512M")
        hard_limit (str): the hard memory limit in a human-readable format (e.g., "1.5G")
        poll_interval (float): the time interval (in seconds) for checking memory usage
        _bytes_soft_limit (int): the soft memory limit in bytes
        _bytes_hard_limit (int): the hard memory limit in bytes
        broken_soft (bool): True if the soft memory limit has been exceeded
        memory_usage (str): the current memory usage in a human-readable format (e.g., "100M")
    '''
    
    
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    
    
    def __init__(self, soft_limit, hard_limit, poll_interval) -> None:
        '''
        Initializes a MemoryThread instance

        Parameters:
            soft_limit (str): the soft memory limit in a human-readable format (e.g., "512M").
            hard_limit (str): the hard memory limit in a human-readable format (e.g., "1.5G").
            poll_interval (float): the time interval (in seconds) for checking memory usage

        Returns: None
        '''
        
        
        super().__init__()
        self.soft_limit = soft_limit
        self.hard_limit = hard_limit
        self.poll_interval = poll_interval
        self.broken_soft = False
        if soft_limit:
            self._bytes_soft_limit = self.human_readble_to_bytes(self.soft_limit)
        if hard_limit:
            self._bytes_hard_limit = self.human_readble_to_bytes(self.hard_limit)
                   
        
    def get_memory_usage(self) -> int: # Показывает текущее потребление памяти процессом
        '''Returns the current memory usage of the process in bytes'''
        
        
        process = psutil.Process(os.getpid())
        mem_info = process.memory_info()
        return mem_info.rss


    def human_readble_to_bytes(self, limit: str) -> int:
        '''Converts a human-readable memory limit (e.g., "512M") to an integer in bytes
        
        Parameters:
            limit (str): a human-readable memory limit to be converted
            
        Returns: memory limit in bytes (int)
        '''
        
        
        for s in MemoryThread.symbols:
            if limit[-1] == s:
                value = int(float(limit[:-1]) * MemoryThread.prefix[s])
                return value
        return int(limit)


    def bytes_to_human_readable(self, n_bytes: int) -> str:
        '''Converts a number of bytes to a human-readable memory size (e.g., "100M")

        Parameters:
            n_bytes (int): the number of bytes

        Returns: a string representing the memory size in a human-readable format (str)
        '''
        
        
        for s in reversed(MemoryThread.symbols):
            if n_bytes >= MemoryThread.prefix[s]:
                value = float(n_bytes) / MemoryThread.prefix[s]
                return f"{value:.2f}{s}"
        return f"{n_bytes}B"

        
    def run(self) -> None:
        '''The main run method of the thread'''
        
        
        while True:
            memory_usage = self.get_memory_usage()
            self.memory_usage = self.bytes_to_human_readable(memory_usage)
            if (not self.broken_soft) and self.soft_limit:
                self.check_soft(memory_usage)
            if self.hard_limit:
                self.check_hard(memory_usage)
            if hasattr(self, 'event'):
                return
            time.sleep(self.poll_interval)
        return
            
                      
    def check_soft(self, memory_usage: int) -> None:
        '''Check if the soft limit has been exceeded

        Parameters:
            memory_usage (int): the current memory usage in bytes

        Returns: None
        '''
        
        
        def custom_formatwarning(msg: str, *args, **kwargs) -> str:
            '''Formats UserWarning message output
            
            Parameters:
                msg (str): message to raise
                *args, **kwargs contains other parameters of UserWarning
                
            Returns: UserWarning message (str)    
            '''
            
            
            # ignore everything except the message
            return  f"Warning! {str(msg)}"
        
        if memory_usage >= self._bytes_soft_limit:
            self.broken_soft = True
            warnings.formatwarning = custom_formatwarning
            warnings.warn(
                f"Soft limit {self.soft_limit} is broken, current memory usage - {self.memory_usage}!", 
                UserWarning
            )
            
            
    def check_hard(self, memory_usage: int) -> None:
        '''Check if the hard limit has been exceeded

        Parameters:
            memory_usage (int): The current memory usage in bytes.

        Returns: None
        '''
        
        
        if memory_usage >= self._bytes_hard_limit:
            _thread.interrupt_main() # I tried to catch KeybordInterrupt error here too
            time.sleep(2)
            # I used it here to restart kernel for cleaning memory
            # for some reason, gc.collect() could not cope with this task 
            os._exit(0)
            
            
class MemLimitException(BaseException):
    '''A custom exception class for raising when memory limit is exceeded'''
    
    
    def __init__(self, obj: MemoryThread) -> None:
        '''Initializes a MemLimitException instance

        Parameters:
            obj (MemoryThread): the instance of MemoryThread where the exception occurred

        Returns: None
        '''
        
        
        self.obj = obj
    
    
    def __str__(self) -> str:
        '''Returns a string representation of the MemLimitException'''
        
        
        return f"Hard limit {self.obj.hard_limit} is broken, current memory usage - {self.obj.memory_usage}!"

            
def memory_limit(
    soft_limit: Optional[str] = "512M",
    hard_limit: Optional[str] = "1.5G",
    poll_interval: float = 0.1):
    '''Decorator function to limit the memory usage of a function

    Parameters:
        soft_limit (str): the soft memory limit in a human-readable format (e.g., "512M")
        hard_limit (str): the hard memory limit in a human-readable format (e.g., "1.5G")
        poll_interval (float): the time interval (in seconds) for checking memory usage

    Returns: the decorated function
    '''
    
    
    def decor(func):
        def inner_func(*args, **kwargs):
            thread_started = False
            if soft_limit or hard_limit:
                thread_started = True
                mem_check = MemoryThread(soft_limit, hard_limit, poll_interval)
                mem_check.start()
            try:
                result = func(*args, **kwargs)
                if thread_started:
                    mem_check.event = True
            except KeyboardInterrupt as e: # error is not catched
                raise MemLimitException(mem_check)
            return result
        return inner_func
    return decor


@memory_limit(soft_limit="512M", hard_limit="1.5G", poll_interval=0.1)
def memory_increment():
    """
    Test function
    
     Reaches 1.89G memory usage within seconds
     Memory consumption and accumulation speed can be varied by changing the code
    """
    lst = []
    for i in range(50000000):
        if i % 500000 == 0:
            time.sleep(0.1)
        lst.append(i)
    return lst

memory_increment()



MemLimitException: Hard limit 1.5G is broken, current memory usage - 1.50G!

# Task 3

Write a `parallel_map` function. This should be a **universal** parallelization function that works effectively in any conditions.

The function must accept the following arguments:
1. `target_func` - target function (required argument)
2. `args_container` - container with positional arguments for `target_func` (by default `None` - positional arguments are not passed)
3. `kwargs_container` - container with named arguments for `target_func` (by default `None` - named arguments are not passed)
4. `n_jobs` - the number of workers that will be used for execution (by default `None` is the number of logical CPU cores in the system)

The function should work similarly to `***PoolExecutor.map`, applying the function to the passed set of arguments, but with some additions and improvements
    
Since we are writing a **universal** function, we will need to meet a number of requirements so that it can work logically and efficiently in most situations

1. `target_func` can accept arguments of any kind in any number
2. Any data types in `args_container`, except `tuple`, are passed to `target_func` as a single positional argument. `tuple` are unpacked into multiple arguments
3. The number of elements in `args_container` must match the number of elements in `kwargs_container` and vice versa, also the value of one of them or both can be equal to `None`, in other cases an error should be thrown (both arguments are passed, but the sizes do not match)

4. The function must perform a certain number of parallel calls to `target_func`, this number depends on the number of arguments passed and the value of `n_jobs`. Scenarios could be as follows
     + `args_container=None`, `kwargs_container=None`, `n_jobs=None`. In this case, the `target_func` function will be executed in parallel as many times as there are logical CPU cores on your device
     + `args_container=None`, `kwargs_container=None`, `n_jobs=5`. In this case, the `target_func` function will be executed in parallel **5** times
     + `args_container=[1, 2, 3]`, `kwargs_container=None`, `n_jobs=5`. In this case, the `target_func` function will be executed in parallel **3** times, despite the fact that `n_jobs=5` (since there are only 3 sets of arguments for which we need to get the result, and there is no point in creating extra workers )
     + `args_container=None`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. This case is similar to the previous one, but here we use named arguments
     + `args_container=[1, 2, 3]`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. This case is similar to the previous one, but here we use both positional and named arguments
     + `args_container=[1, 2, 3, 4]`, `kwargs_container=None`, `n_jobs=2`. In this case, at any given time, **no more than 2** `target_func` functions will be executed in parallel, since we need to execute it 4 times, but we only have 2 workers.
     + In such cases (from the example above), the execution time should be optimized. If these 4 calls are executed in 5, 1, 2 and 1 seconds, then parallel execution with `n_jobs=2` should take **5 seconds** (not 7 and certainly not 10)

5. `parallel_map` returns the results of `target_func` execution **in the same order** in which the corresponding arguments were passed
6. Works with functions created inside other functions

For a basic solution, you are not expected to do **super-good** optimization in terms of time and memory for all possible cases. However, for well-optimized operating logic you can get up to **+3 extra points**

You can make a class instead of a function if you prefer

The task can only use modules from the **standard library** of Python

Below are test cases for each of the requirements

In [12]:
import multiprocessing
from typing import Dict, List, Tuple, Any


class ParallelMap:
    '''A class that parallelizes the map function using multiprocessing

    Attributes:
        target_func (function): the function to be applied to the data
        args_container (list): a list contained positional arguments for the target function
        kwargs_container (list): a list of dictionaries containing keyword arguments for the target function
        n_jobs (int): the number of CPUs used for multiprocessing
        l (int): the length of the argument containers
        _cont_mask (list): a list indicating the presence of argument containers
    '''
    
    
    
    def __init__(
        self,
        target_func: callable,
        args_container: List[Tuple] = None,
        kwargs_container: List[Dict] = None,
        n_jobs: int = None
    ) -> None:        
        '''Initializes the ParallelMap instance with the target function, positional and keyword arguments,
        and the number of CPUs to be used

        Parameters:
            target_func (function): the function to be applied to the data
            args_container (list): a list contained positional arguments for the target function
            kwargs_container (list): a list of dictionaries containing keyword arguments for the target function
            n_jobs (int): the number of CPUs used for multiprocessing

        Raises:
            ValueError: if the number of positional arguments does not match the number of keyword arguments

        Returns: None
        '''
        
        
        self.target_func = target_func
        self.args_container = args_container
        self.kwargs_container = kwargs_container
        if not n_jobs:
            self.n_jobs = multiprocessing.cpu_count()
        else:
            self.n_jobs = n_jobs
        self.__check_containers()
        self.n_jobs = min(self.n_jobs, self.l)
        
    
    def __check_containers(self) -> None:
        '''Private method that verifies that the argument containers are passed,
        sets the length of the containers, and adjusts the empy containers of required length as needed
        '''
        
        
        self._cont_mask = [bool(cont) for cont in [self.args_container, self.kwargs_container]]
        if self._cont_mask == [True, True]:
            arg_l, kwarg_l = len(self.args_container), len(self.kwargs_container)
            if arg_l != kwarg_l:
                raise ValueError(
                    f"Numbers of positional arguments and keyword arguments do not match: {arg_l} and {kwarg_l}"
                )
            self.l = arg_l
            self.args_container = self.__adj_args(self.args_container)
        elif self._cont_mask == [True, False]:
            self.l = len(self.args_container)
            self.args_container = self.__adj_args(self.args_container)
            self.kwargs_container = self.__adj_kwargs(self.l)
        elif self._cont_mask == [False, True]:
            self.l = len(self.kwargs_container)
            self.args_container = self.__adj_args(cont_l = self.l)
        else:
            self.l = self.n_jobs
            self.args_container = self.__adj_args(cont_l = self.l)
            self.kwargs_container = self.__adj_kwargs(cont_l = self.l)


    def __adj_args(self, cont: List[Any] = None, cont_l: int = None) -> List[Tuple]:
        '''Private method that transforms elements of the positional argument container to tuples,
        if the positional argument container is not passed, creates list of empty tuples with required length

        Parameters:
            cont (list): the positional argument container
            cont_l (int): the length of the positional argument container

        Returns: 
            cont (list):  a list of tuples with the correct format for positional arguments
        '''
        
        
        if not cont_l:
            cont = [tuple([i]) if (type(i) is not tuple) else i for i in cont]
        else:
            cont = [()]*cont_l
        return cont


    def __adj_kwargs(self, cont_l: int) -> List[Dict]:
        '''Private method that adjusts the list of empty dictionaries with required length

        Parameters:
            cont_l (int): the length of the keyword argument container

        Returns: a list of empty dictionaries with the correct format for keyword arguments
        '''
        
        
        return [{}]*cont_l

    
    def process_tasks(
        self,
        task_queue: multiprocessing.Queue,
        result_dict: dict
    ) -> None:
        '''Processes tasks from a multiprocessing queue

        Parameters:
            task_queue (multiprocessing.Queue): The queue of tasks to be processed
            result_dict (multiprocessing.Manager.dict): a shared dictionary to store the results

        Returns: None
        '''
        
        
        while not task_queue.empty():
            i, args, kwargs = task_queue.get()
            result = self.target_func(*args, **kwargs)
            result_dict[i] = result
        return 


    def parallel_map(self) -> list:
        '''Runs the parallel map function using multiple CPUs

        Returns: a list of results from the map function
        '''
        
        
        task_queue = multiprocessing.Queue()
        for i in range(self.l):
            task_queue.put((i, self.args_container[i], self.kwargs_container[i]))
        manager = multiprocessing.Manager()
        result_dict = manager.dict()
        processes = []
        for i in range(self.n_jobs):
            p = multiprocessing.Process(target=self.process_tasks,
                                        args=(task_queue, result_dict))
            processes.append(p)
            p.start()
        for p in processes:
            p.join()
        return [result_dict[i] for i in sorted(result_dict)]

In [10]:
import time


# Это только один пример тестовой функции, ваша parallel_map должна уметь эффективно работать с ЛЮБЫМИ функциями
# Поэтому обязательно протестируйте код на чём-нибудбь ещё
def test_func(x=1, s=2, a=1, b=1, c=1):
    time.sleep(s)
    return a*x**2 + b*x + c

In [None]:
%%time

# Пример 2.1
# Отдельные значения в args_container передаются в качестве позиционных аргументов
parallel_map(test_func, args_container=[1, 2.0, 3j-1, 4])   # Здесь происходят параллельные вызовы: test_func(1) test_func(2.0) test_func(3j-1) test_func(4)

CPU times: user 395 µs, sys: 8.26 ms, total: 8.65 ms
Wall time: 2.01 s


[3, 7.0, (-8-3j), 21]

In [3]:
%%time

parl_map = ParallelMap(test_func, args_container=[1, 2.0, 3j-1, 4])
parl_map.parallel_map()

CPU times: user 22.3 ms, sys: 1.1 ms, total: 23.4 ms
Wall time: 2.06 s


[3, 7.0, (-8-3j), 21]

In [None]:
%%time

# Пример 2.2
# Элементы типа tuple в args_container распаковываются в качестве позиционных аргументов
parallel_map(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])    # Здесь происходят параллельные вызовы: test_func(1, 1) test_func(2.0, 2) test_func(3j-1, 3) test_func(4)

CPU times: user 7.18 ms, sys: 7.73 ms, total: 14.9 ms
Wall time: 3.01 s


[3, 7.0, (-8-3j), 21]

In [4]:
%%time

parl_map = ParallelMap(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])
parl_map.parallel_map()

CPU times: user 0 ns, sys: 22.9 ms, total: 22.9 ms
Wall time: 3.03 s


[3, 7.0, (-8-3j), 21]

In [None]:
%%time

# Пример 3.1
# Возможна одновременная передача args_container и kwargs_container, но количества элементов в них должны быть равны
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

# Здесь происходят параллельные вызовы: test_func(1, s=3) test_func(2, s=3) test_func(3, s=3) test_func(4, s=3)

CPU times: user 5.89 ms, sys: 8.84 ms, total: 14.7 ms
Wall time: 3.02 s


[3, 7, 13, 21]

In [5]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3, 4],
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])
parl_map.parallel_map()

CPU times: user 10.5 ms, sys: 13.2 ms, total: 23.7 ms
Wall time: 3.04 s


[3, 7, 13, 21]

In [None]:
%%time

# Пример 3.2
# args_container может быть None, а kwargs_container задан явно
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

CPU times: user 6.54 ms, sys: 6.06 ms, total: 12.6 ms
Wall time: 3.02 s


[3, 3, 3, 3]

In [6]:
%%time

parl_map = ParallelMap(test_func,
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])
parl_map.parallel_map()

CPU times: user 1.62 ms, sys: 21.6 ms, total: 23.3 ms
Wall time: 3.03 s


[3, 3, 3, 3]

In [None]:
%%time

# Пример 3.3
# kwargs_container может быть None, а args_container задан явно
parallel_map(test_func,
             args_container=[1, 2, 3, 4])

CPU times: user 4.11 ms, sys: 9.2 ms, total: 13.3 ms
Wall time: 2.01 s


[3, 7, 13, 21]

In [7]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3, 4])
parl_map.parallel_map()

CPU times: user 0 ns, sys: 23.2 ms, total: 23.2 ms
Wall time: 2.03 s


[3, 7, 13, 21]

In [None]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 500 µs, sys: 43.3 ms, total: 43.8 ms
Wall time: 2.04 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [8]:
%%time

parl_map = ParallelMap(test_func)
parl_map.parallel_map()

CPU times: user 0 ns, sys: 69 ms, total: 69 ms
Wall time: 2.08 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [None]:
%%time

# Пример 3.5
# При несовпадении количеств позиционных и именованных аргументов кидается ошибка
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])

ValueError: Numbers of positional arguments and keyword arguments do not match: 4 and 3

In [9]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3, 4],
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])
parl_map.parallel_map()

ValueError: Numbers of positional arguments and keyword arguments do not match: 4 and 3

In [None]:
%%time

# Пример 4.1
# Если функция не имеет обязательных аргументов и аргумент n_jobs не был передан, то она выполняется параллельно столько раз, сколько ваш CPU имеет логических ядер
# В моём случае это 24, у вас может быть больше или меньше
parallel_map(test_func)

CPU times: user 9.3 ms, sys: 51.2 ms, total: 60.5 ms
Wall time: 2.06 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [10]:
multiprocessing.cpu_count()

16

In [11]:
%%time

parl_map = ParallelMap(test_func)
parl_map.parallel_map()

CPU times: user 21.5 ms, sys: 42.7 ms, total: 64.2 ms
Wall time: 2.08 s


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

In [47]:
%%time

# Пример 4.2
# Если функция не имеет обязательных аргументов и передан только аргумент n_jobs, то она выполняется параллельно n_jobs раз
parallel_map(test_func, n_jobs=2)

CPU times: user 2.06 ms, sys: 5.92 ms, total: 7.99 ms
Wall time: 2.01 s


[3, 3]

In [13]:
%%time

parl_map = ParallelMap(test_func, n_jobs=2)
parl_map.parallel_map()

CPU times: user 2.18 ms, sys: 11.9 ms, total: 14 ms
Wall time: 2.03 s


[3, 3]

In [48]:
%%time

# Пример 4.3
# Если аргументов для target_func указано МЕНЬШЕ, чем n_jobs, то используется такое же количество worker'ов, сколько было передано аргументов
parallel_map(test_func,
             args_container=[1, 2, 3],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 314 µs, sys: 8.69 ms, total: 9 ms
Wall time: 2.01 s


[3, 7, 13]

In [14]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3],
                       n_jobs=5)
parl_map.parallel_map()

CPU times: user 0 ns, sys: 17.8 ms, total: 17.8 ms
Wall time: 2.03 s


[3, 7, 13]

In [None]:
%%time

# Пример 4.4
# Аналогичный предыдущему случай, но с именованными аргументами
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 1.26 ms, sys: 9.47 ms, total: 10.7 ms
Wall time: 3.01 s


[3, 3, 3]

In [15]:
%%time

parl_map = ParallelMap(test_func,
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
                       n_jobs=5)
parl_map.parallel_map()

CPU times: user 5.45 ms, sys: 13.9 ms, total: 19.4 ms
Wall time: 3.03 s


[3, 3, 3]

In [None]:
%%time

# Пример 4.5
# Комбинация примеров 4.3 и 4.4 (переданы и позиционные и именованные аргументы)
parallel_map(test_func,
             args_container=[1, 2, 3],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 7.88 ms, sys: 0 ns, total: 7.88 ms
Wall time: 3.01 s


[3, 7, 13]

In [16]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3],
                       kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
                       n_jobs=5)
parl_map.parallel_map()

CPU times: user 17.1 ms, sys: 1.5 ms, total: 18.6 ms
Wall time: 3.03 s


[3, 7, 13]

In [None]:
%%time

# Пример 4.6
# Если аргументов для target_func указано БОЛЬШЕ, чем n_jobs, то используется n_jobs worker'ов
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=None,
             n_jobs=2)   # Здесь используется 2 worker'a

CPU times: user 7.88 ms, sys: 0 ns, total: 7.88 ms
Wall time: 3.01 s


[3, 7, 13]

In [17]:
%%time

parl_map = ParallelMap(test_func,
                       args_container=[1, 2, 3, 4],
                       kwargs_container=None,
                       n_jobs=2)
parl_map.parallel_map()

CPU times: user 3.25 ms, sys: 10.5 ms, total: 13.8 ms
Wall time: 4.03 s


[3, 7, 13, 21]

In [None]:
%%time

# Пример 4.7
# Время выполнения оптимизируется, данный код должен отрабатывать за 5 секунд
parallel_map(test_func,
             kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
             n_jobs=2)

CPU times: user 3.03 ms, sys: 11 ms, total: 14 ms
Wall time: 5.01 s


[3, 3, 3, 3]

In [18]:
%%time

parl_map = ParallelMap(test_func,
                       kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
                       n_jobs=2)
parl_map.parallel_map()

CPU times: user 15.6 ms, sys: 1.11 ms, total: 16.7 ms
Wall time: 5.03 s


[3, 3, 3, 3]

In [None]:
def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string

# Пример 5
# Результаты возвращаются в том же порядке, в котором были переданы соответствующие аргументы вне зависимости от того, когда завершился worker
arguments = ["first", "second", "third", "fourth", "fifth"]
parallel_map(test_func2,
             args_container=arguments,
             kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])

['first', 'second', 'third', 'fourth', 'fifth']

In [19]:
%%time

def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string


arguments = ["first", "second", "third", "fourth", "fifth"]
parl_map = ParallelMap(test_func2,
                       args_container=arguments,
                       kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])
print(parl_map.parallel_map())

['first', 'second', 'third', 'fourth', 'fifth']
CPU times: user 6.65 ms, sys: 23.7 ms, total: 30.3 ms
Wall time: 5.03 s


In [None]:
%%time


def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    return parallel_map(inner_test_func, args_container=[1, 2, 3])

# Пример 6
# Работает с функциями, созданными внутри других функций
test_func3()

[None, None, None]

In [20]:
%%time

def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    parl_map = ParallelMap(inner_test_func, args_container=[1, 2, 3])
    return parl_map.parallel_map()


test_func3()

CPU times: user 1.89 ms, sys: 16.6 ms, total: 18.5 ms
Wall time: 3.03 s


[None, None, None]

In [21]:
%%time

def calc_distance(x1=1, y1=1, x2=1, y2=1, s=1):
    distance = ((x1 - x2)**2 + (y1 - y2)**2)**0.5
    time.sleep(s)
    return distance


parl_map = ParallelMap(calc_distance, 
                       args_container=[(1, 5, 2, 3), (2, 2, 6, 8), (25, 11, 13, 16)],
                       kwargs_container=[{"s": 6}, {"s": 1}, {"s": 10}], 
                       n_jobs=1)
parl_map.parallel_map()

CPU times: user 9.08 ms, sys: 3.9 ms, total: 13 ms
Wall time: 17 s


[2.23606797749979, 7.211102550927978, 13.0]

In [22]:
%%time

parl_map = ParallelMap(calc_distance, 
                       args_container=[(1, 5, 2, 3), (2, 2, 6, 8), (25, 11, 13, 16)],
                       kwargs_container=[{"s": 6}, {"s": 1}, {"s": 10}], 
                       n_jobs=3)
parl_map.parallel_map()

CPU times: user 0 ns, sys: 19.2 ms, total: 19.2 ms
Wall time: 10 s


[2.23606797749979, 7.211102550927978, 13.0]