В формулировке заданий будет использоваться понятие **worker**. Это слово обозначает какую-то единицу параллельного выполнения, в случае питона это может быть **поток** или **процесс**, выбирайте то, что лучше будет подходить к конкретной задаче

В каждом задании нужно писать подробные аннотиции типов для:
1. Аргументов функций и классов
2. Возвращаемых значений
3. Классовых атрибутов (если такие есть)

В каждом задании нужно писать докстроки в определённом стиле (какой вам больше нравится) для всех функций, классов и методов

# Задание 1 (7 баллов)

В одном из заданий по ML от вас требовалось написать кастомную реализацию Random Forest. Её проблема состоит в том, что она работает медленно, так как использует всего один поток для работы. Добавление параллельного программирования в код позволит получить существенный прирост в скорости обучения и предсказаний.

В данном задании от вас требуется добавить возможность обучать случайный лес параллельно и использовать параллелизм для предсказаний. Для этого вам понадобится:
1. Добавить аргумент `n_jobs` в метод `fit`. `n_jobs` показывает количество worker'ов, используемых для распараллеливания
2. Добавить аргумент `n_jobs` в методы `predict` и `predict_proba`
3. Реализовать функционал по распараллеливанию в данных методах

В результате код `random_forest.fit(X, y, n_jobs=2)` и `random_forest.predict(X, y, n_jobs=2)` должен работать в ~1.5-2 раза быстрее, чем `random_forest.fit(X, y, n_jobs=1)` и `random_forest.predict(X, y, n_jobs=1)` соответственно

Если у вас по каким-то причинам нет кода случайного леса из ДЗ по ML, то вы можете написать его заново или попросить у однокурсника. *Детали* реализации ML части оцениваться не будут, НО, если вы поломаете логику работы алгоритма во время реализации параллелизма, то за это будут сниматься баллы

В задании можно использовать только модули из **стандартной библиотеки** питона, а также функции и классы из **sklearn** при помощи которых вы изначально писали лес

In [85]:
from typing import List, Union
from sklearn.base import BaseEstimator
from sklearn.datasets import make_classification
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from concurrent.futures import ThreadPoolExecutor

class RandomForestClassifierCustom(BaseEstimator):
    def __init__(self, n_estimators: int = 100, max_depth: int = None, 
                 max_features: Union[int, float, str] = "auto", 
                 random_state: int = None) -> None:
        """
        Custom Random Forest Classifier.

        Parameters
        ----------
        n_estimators : int, optional (default=100)
            The number of trees in the forest.

        max_depth : int, optional (default=None)
            The maximum depth of the tree. If None, then nodes are expanded until
            all leaves are pure or until all leaves contain less than
            min_samples_split samples.

        max_features : int, float or str, optional (default="auto")
            The number of features to consider when looking for the best split.
            - If int, then consider `max_features` features at each split.
            - If float, then `max_features` is a fraction and
              `int(max_features * n_features)` features are considered at each split.
            - If "auto", then `max_features=sqrt(n_features)`.

        random_state : int, optional (default=None)
            If int, random_state is the seed used by the random number generator.

        Attributes
        ----------
        estimators_ : list
            The collection of fitted trees.
        """
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.random_state = random_state
        self.estimators_ = []

    def _fit_one_tree(self, X: np.ndarray, y: np.ndarray) -> DecisionTreeClassifier:
        tree = DecisionTreeClassifier(max_depth=self.max_depth, max_features=self.max_features, random_state=self.random_state)
        tree.fit(X, y)
        return tree

    def fit(self, X: np.ndarray, y: np.ndarray, n_jobs: int = 1) -> None:
        if n_jobs == 1:
            for _ in range(self.n_estimators):
                tree = self._fit_one_tree(X, y)
                self.estimators_.append(tree)
        else:
            with ThreadPoolExecutor(max_workers=n_jobs) as executor:
                self.estimators_ = list(executor.map(self._fit_one_tree, [X] * self.n_estimators, [y] * self.n_estimators))

    def predict(self, X: np.ndarray, n_jobs: int = 1) -> np.ndarray:
        if n_jobs == 1:
            preds = np.array([tree.predict(X) for tree in self.estimators_])
        else:
            with ThreadPoolExecutor(max_workers=n_jobs) as executor:
                preds = np.array(list(executor.map(lambda tree: tree.predict(X), self.estimators_)))

        return np.apply_along_axis(lambda x: np.argmax(np.bincount(x)), axis=0, arr=preds)

    def predict_proba(self, X: np.ndarray, n_jobs: int = 1) -> np.ndarray:
        if n_jobs == 1:
            preds = np.array([tree.predict_proba(X) for tree in self.estimators_])
        else:
            with ThreadPoolExecutor(max_workers=n_jobs) as executor:
                preds = np.array(list(executor.map(lambda tree: tree.predict_proba(X), self.estimators_)))

        return np.mean(preds, axis=0)
    
X, y = make_classification(n_samples=100000)

In [86]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [87]:
%%time

_ = random_forest.fit(X, y, n_jobs=1)

CPU times: user 7.37 s, sys: 37.3 ms, total: 7.41 s
Wall time: 8.05 s


In [6]:
%%time

preds_1 = random_forest.predict(X, n_jobs=1)

CPU times: user 1.43 s, sys: 0 ns, total: 1.43 s
Wall time: 1.44 s


In [7]:
random_forest = RandomForestClassifierCustom(max_depth=30, n_estimators=10, max_features=2, random_state=42)

In [8]:
%%time

_ = random_forest.fit(X, y, n_jobs=2)

CPU times: user 10.9 s, sys: 122 ms, total: 11 s
Wall time: 5.56 s


In [9]:
%%time

preds_2 = random_forest.predict(X, n_jobs=2)

CPU times: user 1.44 s, sys: 0 ns, total: 1.44 s
Wall time: 1.27 s


In [10]:
(preds_1 == preds_2).all()   # Количество worker'ов не должно влиять на предсказания

True

#### Какие есть недостатки у вашей реализации параллельного Random Forest (если они есть)? Как это можно исправить? Опишите словами, можно без кода (+1 дополнительный балл)

Поскольку завтра (сегодня) ещё контрольная по статистике и финальное занятие по молекулярной эволюции, всё задание сделано при помощи двух нейронов в моем мозгу, выживших после трёх дней непрерывного выполнения задания по интернету, ChatGPT, чафиря и матери пресвятого Иезикииля Бермудского. Таким образом, главный недостаток реализации заключается в том, что на данный момент в мире не существует ни одного человека или компьютера, который полностью понимает, как это всё работает. Проблема ~~не лечится~~ лечится переписыванием кода на свежую голову. 

# Задание 2 (9 баллов)

Напишите декоратор `memory_limit`, который позволит ограничивать использование памяти декорируемой функцией.

Декоратор должен принимать следующие аргументы:
1. `soft_limit` - "мягкий" лимит использования памяти. При превышении функцией этого лимита должен будет отображён **warning**
2. `hard_limit` - "жёсткий" лимит использования памяти. При превышении функцией этого лимита должно будет брошено исключение, а функция должна немедленно завершить свою работу
3. `poll_interval` - интервал времени (в секундах) между проверками использования памяти

Требования:
1. Потребление функцией памяти должно отслеживаться **во время выполнения функции**, а не после её завершения
2. **warning** при превышении `soft_limit` должен отображаться один раз, даже если функция переходила через этот лимит несколько раз
3. Если задать `soft_limit` или `hard_limit` как `None`, то соответствующий лимит должен быть отключён
4. Лимиты должны передаваться и отображаться в формате `<number>X`, где `X` - символ, обозначающий порядок единицы измерения памяти ("B", "K", "M", "G", "T", ...)
5. В тексте warning'ов и исключений должен быть указан текщий объём используемой памяти и величина превышенного лимита

В задании можно использовать только модули из **стандартной библиотеки** питона, можно писать вспомогательные функции и/или классы

В коде ниже для вас предопределены некоторые полезные функции, вы можете ими пользоваться, а можете не пользоваться

In [20]:
import os
import psutil
import time
import warnings
from threading import Thread
from functools import wraps
from typing import Callable, Any, Optional


def get_memory_usage() -> int:
    """
    Get the current memory usage of the process.
    
    Returns
    -------
    int
        The current memory usage in bytes.
    """
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return mem_info.rss


def bytes_to_human_readable(n_bytes: int) -> str:
    """
    Convert bytes to human-readable format.
    
    Parameters
    ----------
    n_bytes : int
        Number of bytes to convert.
    
    Returns
    -------
    str
        A human-readable representation of the number of bytes.
    """
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for idx, s in enumerate(symbols):
        prefix[s] = 1 << (idx + 1) * 10
    for s in reversed(symbols):
        if n_bytes >= prefix[s]:
            value = float(n_bytes) / prefix[s]
            return f"{value:.2f}{s}"
    return f"{n_bytes}B"


def memory_limit(soft_limit: Optional[str] = None, 
                 hard_limit: Optional[str] = None, 
                 poll_interval: int = 1) -> Callable:
    """
    Decorator for setting memory limits on a function.
    
    Parameters
    ----------
    soft_limit : Optional[str], optional
        Soft memory limit (e.g., '1G', '512M'). If exceeded, a warning is issued.
        None by default.
    
    hard_limit : Optional[str], optional
        Hard memory limit (e.g., '2G', '1G'). If exceeded, a MemoryError is raised
        and the process is terminated. None by default.
    
    poll_interval : int, optional
        Memory usage polling interval in seconds. Default is 1.

    Returns
    -------
    Callable
        The decorated function.
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            def memory_checker() -> None:
                nonlocal soft_limit, hard_limit
                soft_limit_warned = False
                if soft_limit:
                    soft_limit_bytes = int(soft_limit[:-1]) * (1024 ** (symbols.index(soft_limit[-1]) + 1))
                if hard_limit:
                    hard_limit_bytes = int(float(hard_limit[:-1]) * (1024 ** (symbols.index(hard_limit[-1]) + 1)))

                while True:
                    mem_usage = get_memory_usage()
                    if soft_limit and not soft_limit_warned and mem_usage > soft_limit_bytes:
                        soft_limit_warned = True
                        warnings.warn(
                            f"Memory usage exceeded soft limit ({bytes_to_human_readable(mem_usage)} > {soft_limit})"
                        )
                    if hard_limit and mem_usage > hard_limit_bytes:
                        os._exit(1)
                        raise MemoryError(
                            f"Memory usage exceeded hard limit ({bytes_to_human_readable(mem_usage)} > {hard_limit})"
                        )
                    time.sleep(poll_interval)

            checker_thread = Thread(target=memory_checker, daemon=True)
            checker_thread.start()
            result = func(*args, **kwargs)
            checker_thread.join()
            return result

        return wrapper

    return decorator

In [21]:
@memory_limit(soft_limit="512M", hard_limit="1.5G", poll_interval=0.1)
def memory_increment():
    """
    Функция для тестирования
    
    В течение нескольких секунд достигает использования памяти 1.89G
    Потребление памяти и скорость накопления можно варьировать, изменяя код
    """
    lst = []
    for i in range(50000000):
        if i % 500000 == 0:
            time.sleep(0.1)
        lst.append(i)
    return lst

# Задание 3 (11 баллов)

Напишите функцию `parallel_map`. Это должна быть **универсальная** функция для распараллеливания, которая эффективно работает в любых условиях.

Функция должна принимать следующие аргументы:
1. `target_func` - целевая функция (обязательный аргумент)
2. `args_container` - контейнер с позиционными аргументами для `target_func` (по-умолчанию `None` - позиционные аргументы не передаются)
3. `kwargs_container` - контейнер с именованными аргументами для `target_func` (по-умолчанию `None` - именованные аргументы не передаются)
4. `n_jobs` - количество workers, которые будут использованы для выполнения (по-умолчанию `None` - количество логических ядер CPU в системе)

Функция должна работать аналогично `***PoolExecutor.map`, применяя функцию к переданному набору аргументов, но с некоторыми дополнениями и улучшениями
    
Поскольку мы пишем **универсальную** функцию, то нам нужно будет выполнить ряд требований, чтобы она могла логично и эффективно работать в большинстве ситуаций

1. `target_func` может принимать аргументы любого вида в любом количестве
2. Любые типы данных в `args_container`, кроме `tuple`, передаются в `target_func` как единственный позиционный аргумент. `tuple` распаковываются в несколько аргументов
3. Количество элементов в `args_container` должно совпадать с количеством элементов в `kwargs_container` и наоборот, также значение одного из них или обоих может быть равно `None`, в иных случаях должна кидаться ошибка (оба аргумента переданы, но размеры не совпадают)

4. Функция должна выполнять определённое количество параллельных вызовов `target_func`, это количество зависит от числа переданных аргументов и значения `n_jobs`. Сценарии могут быть следующие
    + `args_container=None`, `kwargs_container=None`, `n_jobs=None`. В таком случае функция `target_func` выполнится параллельно столько раз, сколько на вашем устройстве логических ядер CPU
    + `args_container=None`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **5** раз
    + `args_container=[1, 2, 3]`, `kwargs_container=None`, `n_jobs=5`. В таком случае функция `target_func` выполнится параллельно **3** раза, несмотря на то, что `n_jobs=5` (так как есть всего 3 набора аргументов для которых нам нужно получить результат, а лишние worker'ы создавать не имеет смысла)
    + `args_container=None`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем именованные аргументы
    + `args_container=[1, 2, 3]`, `kwargs_container=[{"s": 1}, {"s": 2}, {"s": 3}]`, `n_jobs=5`. Данный случай аналогичен предыдущему, но здесь мы используем и позиционные, и именованные аргументы
    + `args_container=[1, 2, 3, 4]`, `kwargs_container=None`, `n_jobs=2`. В таком случае в каждый момент времени параллельно будет выполняться **не более 2** функций `target_func`, так как нам нужно выполнить её 4 раза, но у нас есть только 2 worker'а.
    + В подобных случаях (из примера выше) должно оптимизироваться время выполнения. Если эти 4 вызова выполняются за 5, 1, 2 и 1 секунды, то параллельное выполнение с `n_jobs=2` должно занять **5 секунд** (не 7 и тем более не 10)

5. `parallel_map` возвращает результаты выполнения `target_func` **в том же порядке**, в котором были переданы соответствующие аргументы
6. Работает с функциями, созданными внутри других функций

Для базового решения от вас не ожидается **сверххорошая** оптимизация по времени и памяти для всех возможных случаев. Однако за хорошо оптимизированную логику работы можно получить до **+3 дополнительных баллов**

Вы можете сделать класс вместо функции, если вам удобнее

В задании можно использовать только модули из **стандартной библиотеки** питона

Ниже приведены тестовые примеры по каждому из требований

In [66]:
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, List, Dict, Union
import multiprocessing

def parallel_map(target_func: Callable, args_container: List = None, 
                 kwargs_container: List[Dict] = None,
                 n_jobs: int = None) -> List:

    if args_container is None and kwargs_container is None:
        args_container = [tuple()]
        kwargs_container = [{}]
    elif args_container is None:
        args_container = [tuple()] * len(kwargs_container)
    elif kwargs_container is None:
        kwargs_container = [{}] * len(args_container)
    
    if len(args_container) != len(kwargs_container):
        raise ValueError("args_container and kwargs_container must have the same length.")
    
    # Ensure that args_container elements are tuples
    args_container = [arg if isinstance(arg, tuple) else (arg,) for arg in args_container]
    
    if n_jobs is None:
        n_jobs = min(multiprocessing.cpu_count(), len(args_container))
    
    with ThreadPoolExecutor(max_workers=n_jobs) as executor:
        combined_args = [(a, k) for a, k in zip(args_container, kwargs_container)]
        results = list(executor.map(lambda x: target_func(*(x[0]), **x[1]), combined_args))
    return results

In [67]:
import time


# Это только один пример тестовой функции, ваша parallel_map должна уметь эффективно работать с ЛЮБЫМИ функциями
# Поэтому обязательно протестируйте код на чём-нибудбь ещё
def test_func(x=1, s=2, a=1, b=1, c=1):
    time.sleep(s)
    return a*x**2 + b*x + c

In [68]:
%%time

# Пример 2.1
# Отдельные значения в args_container передаются в качестве позиционных аргументов
parallel_map(test_func, args_container=[1, 2.0, 3j-1, 4])   # Здесь происходят параллельные вызовы: test_func(1) test_func(2.0) test_func(3j-1) test_func(4)

CPU times: user 2.54 ms, sys: 4.15 ms, total: 6.68 ms
Wall time: 2.01 s


[3, 7.0, (-8-3j), 21]

In [69]:
%%time

# Пример 2.2
# Элементы типа tuple в args_container распаковываются в качестве позиционных аргументов
parallel_map(test_func, [(1, 1), (2.0, 2), (3j-1, 3), 4])    # Здесь происходят параллельные вызовы: test_func(1, 1) test_func(2.0, 2) test_func(3j-1, 3) test_func(4)

CPU times: user 5.36 ms, sys: 268 µs, total: 5.63 ms
Wall time: 3.01 s


[3, 7.0, (-8-3j), 21]

In [70]:
%%time

# Пример 3.1
# Возможна одновременная передача args_container и kwargs_container, но количества элементов в них должны быть равны
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

# Здесь происходят параллельные вызовы: test_func(1, s=3) test_func(2, s=3) test_func(3, s=3) test_func(4, s=3)

CPU times: user 3.27 ms, sys: 4.19 ms, total: 7.45 ms
Wall time: 3.01 s


[3, 7, 13, 21]

In [71]:
%%time

# Пример 3.2
# args_container может быть None, а kwargs_container задан явно
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}, {"s": 3}])

CPU times: user 2.9 ms, sys: 3.96 ms, total: 6.86 ms
Wall time: 3.01 s


[3, 3, 3, 3]

In [72]:
%%time

# Пример 3.3
# kwargs_container может быть None, а args_container задан явно
parallel_map(test_func,
             args_container=[1, 2, 3, 4])

CPU times: user 6.03 ms, sys: 101 µs, total: 6.13 ms
Wall time: 2 s


[3, 7, 13, 21]

In [73]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 5.46 ms, sys: 276 µs, total: 5.74 ms
Wall time: 2 s


[3]

In [74]:
%%time

# Пример 3.4
# И kwargs_container, и args_container могут быть не заданы
parallel_map(test_func)

CPU times: user 5.71 ms, sys: 3.7 ms, total: 9.41 ms
Wall time: 2 s


[3]

In [75]:
%%time

# Пример 3.5
# При несовпадении количеств позиционных и именованных аргументов кидается ошибка
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}])

ValueError: args_container and kwargs_container must have the same length.

In [76]:
%%time

# Пример 4.1
# Если функция не имеет обязательных аргументов и аргумент n_jobs не был передан, то она выполняется параллельно столько раз, сколько ваш CPU имеет логических ядер
# В моём случае это 24, у вас может быть больше или меньше
parallel_map(test_func)

CPU times: user 1.57 ms, sys: 3.75 ms, total: 5.32 ms
Wall time: 2 s


[3]

In [77]:
%%time

# Пример 4.2
# Если функция не имеет обязательных аргументов и передан только аргумент n_jobs, то она выполняется параллельно n_jobs раз
parallel_map(test_func, n_jobs=2)

CPU times: user 4.62 ms, sys: 31 µs, total: 4.65 ms
Wall time: 2.01 s


[3]

In [78]:
%%time

# Пример 4.3
# Если аргументов для target_func указано МЕНЬШЕ, чем n_jobs, то используется такое же количество worker'ов, сколько было передано аргументов
parallel_map(test_func,
             args_container=[1, 2, 3],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 2.29 ms, sys: 3.94 ms, total: 6.23 ms
Wall time: 2.01 s


[3, 7, 13]

In [79]:
%%time

# Пример 4.4
# Аналогичный предыдущему случай, но с именованными аргументами
parallel_map(test_func,
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 3.62 ms, sys: 3.88 ms, total: 7.5 ms
Wall time: 3.01 s


[3, 3, 3]

In [80]:
%%time

# Пример 4.5
# Комбинация примеров 4.3 и 4.4 (переданы и позиционные и именованные аргументы)
parallel_map(test_func,
             args_container=[1, 2, 3],
             kwargs_container=[{"s": 3}, {"s": 3}, {"s": 3}],
             n_jobs=5)   # Здесь используется 3 worker'a

CPU times: user 4.53 ms, sys: 0 ns, total: 4.53 ms
Wall time: 3 s


[3, 7, 13]

In [81]:
%%time

# Пример 4.6
# Если аргументов для target_func указано БОЛЬШЕ, чем n_jobs, то используется n_jobs worker'ов
parallel_map(test_func,
             args_container=[1, 2, 3, 4],
             kwargs_container=None,
             n_jobs=2)   # Здесь используется 2 worker'a

CPU times: user 4.08 ms, sys: 3.81 ms, total: 7.89 ms
Wall time: 4.01 s


[3, 7, 13, 21]

In [82]:
%%time

# Пример 4.7
# Время выполнения оптимизируется, данный код должен отрабатывать за 5 секунд
parallel_map(test_func,
             kwargs_container=[{"s": 5}, {"s": 1}, {"s": 2}, {"s": 1}],
             n_jobs=2)

CPU times: user 1.67 ms, sys: 3.9 ms, total: 5.56 ms
Wall time: 5.01 s


[3, 3, 3, 3]

In [83]:
def test_func2(string, sleep_time=1):
    time.sleep(sleep_time)
    return string

# Пример 5
# Результаты возвращаются в том же порядке, в котором были переданы соответствующие аргументы вне зависимости от того, когда завершился worker
arguments = ["first", "second", "third", "fourth", "fifth"]
parallel_map(test_func2,
             args_container=arguments,
             kwargs_container=[{"sleep_time": 5}, {"sleep_time": 4}, {"sleep_time": 3}, {"sleep_time": 2}, {"sleep_time": 1}])

['first', 'second', 'third', 'fourth', 'fifth']

In [84]:
%%time


def test_func3():
    def inner_test_func(sleep_time):
        time.sleep(sleep_time)
    return parallel_map(inner_test_func, args_container=[1, 2, 3])

# Пример 6
# Работает с функциями, созданными внутри других функций
test_func3()

CPU times: user 6.88 ms, sys: 149 µs, total: 7.03 ms
Wall time: 3.01 s


[None, None, None]