#### ПАРАЛЛЕЛЬНЫЕ ВЫЧИСЛЕНИЯ

При большом количестве вычислений, которые не зависят друг от друга, можно использовать техники распараллеливания вычислений. Если запустить, например, python скрипт, выполняющий последовательно набор операций и посмотреть загрузку ЦП, то вы увидите, что есть некоторый процесс, запущенный ОС для выполнения скрипта, и он потребляет определенное кол-во процессорного времени только на одном логическом ядре. Распараллеленный алгоритм позволяет выполнять действия сразу на заданном количестве потоков, что отобразится в виде загрузки сразу нескольких логических ядер.

Параллельные вычисления реализуют 3 техниками:
1. в многопроцессорном режиме (создаются независимые процессы с собственной памятью);
2. в многопоточеном режиме (вычисления происходят в рамках одного процесса с общей памятью);
3. в асинхронном режиме (это псевдопараллелизация т.к. вычисления происходят последовательно, но каждый блок периодически отпускает управление для передачи другому блоку, и это происходит достаточно быстро, чтобы создать иллюзию, что все блоки обрабатываются параллельно)

#### MULTIPROCESSING

In [1]:
'''
    multiprocessing -- это пакет, который позволяет работать с процессами
'''
import multiprocessing
print('\n'.join(list(filter(lambda x: not x.startswith('_') and x != 'abc', multiprocessing.__dict__.keys()))))

sys
process
reduction
context
Array
AuthenticationError
Barrier
BoundedSemaphore
BufferTooShort
Condition
Event
JoinableQueue
Lock
Manager
Pipe
Pool
Process
ProcessError
Queue
RLock
RawArray
RawValue
Semaphore
SimpleQueue
TimeoutError
Value
active_children
allow_connection_pickling
cpu_count
current_process
freeze_support
get_all_start_methods
get_context
get_logger
get_start_method
log_to_stderr
parent_process
reducer
set_executable
set_forkserver_preload
set_start_method
SUBDEBUG


In [None]:
# Process
import os
from multiprocessing import Process

from typing import List

def function_to_separated_process():
    print('Hi! I am a process with PID', pid:=os.getpid())

processes: List[Process] = [
    Process(target=function_to_separated_process, args=())
    for _ in range(5)
]
print('Здесь упорядоченно')
for p in processes:
    p.start()   # сначала мы запускам процесс
    p.join()    # затем ждем его завершения прежде чем начать следующий

print('\nЗдесь хаотично')

processes: List[Process] = [
    Process(target=function_to_separated_process, args=())
    for _ in range(5)
]
# run processes
_ = list(map(lambda p: p.start(), processes))   # сначала запускаем все процессы (так что выполняться они начинают параллельно)
# wait till the end, the order is chaotic
_ = list(map(lambda p: p.join(), processes))    # и ждем когда они завершатся

Здесь упорядоченно
Hi! I am a process with PID 111967
Hi! I am a process with PID 111976
Hi! I am a process with PID 111985
Hi! I am a process with PID 111994
Hi! I am a process with PID 112003

Здесь хаотично
Hi! I am a process with PID 112012
Hi! I am a process with PID 112015
Hi! I am a process with PID Hi! I am a process with PID112026
 112033
Hi! I am a process with PID 112040


In [None]:
'''
    В зависимости от платформы, multiprocessing поддерживает 3 способа создания процесса:
    1. "spawn" -- родительский процесс создает новый процесс с интерпретатором python. 
        Дочерний процесс наследует только необходимые ресурсы, так что эта процедура
        более щадаще расходует ресурсы системы, но требует больше времени для своей инициализации.
        Это дефолтный метод в Windows, macOS
    2. "fork" -- родительский процесс вызывает os.fork() в интерпретаторе. 
        Созданный дочерний процесс копия родительского, и используются все аналогичные ресурсы.
        Этот метод быстрее, но более дорогостоящий по ресурсам, чем spawn. 
        Дефолтный в POSIX (UNIX) системах
    3. "forkserver" -- спавнится процесс-сервер. Каждый раз, когда нужен новый процесс,
        запрос отправляется процессу-серверу, чтобы он создал новый процесс.

    Для выбора метода нужно вызвать multiprocessing.set_start_method(*method*).
    Этот метод вызывается в начале программы, до того, как что-то будет распараллелено
    (то есть до создания новых процессов)

    Вероятно, код ниже выдаст ошибку в интерпретаторе. Рекомендуется запускать .py скрипт
'''
# Process
import os
import multiprocessing
from multiprocessing import Process

def function_to_separated_process():
    print('Hi! I am a process with PID', pid:=os.getpid())

# давайте спавнить
multiprocessing.set_start_method('spawn')   # вызывается единожды в программе до начала всего!

if __name__ == '__main__':
    processes: List[Process] = [
        Process(target=function_to_separated_process, args=())
        for _ in range(5)
    ]
    print('Здесь упорядоченно')
    for p in processes:
        p.start()   # сначала мы запускам процесс
        p.join()    # затем ждем его завершения прежде чем начать следующий

In [None]:
'''
    Альтернативно можно не устанавливать метод создания процессов,
    а вытягивать контекст с помощью метода get-context
'''

# Process
import os
import multiprocessing
from multiprocessing import Process

def function_to_separated_process():
    print('Hi! I am a process with PID', pid:=os.getpid())

_context: multiprocessing.context = multiprocessing.get_context('fork') # spawn 
if __name__ == '__main__':
    p = _context.Process(target=function_to_separated_process, args=())
    p.start()
    p.join()

Hi! I am a process with PID 132569


In [None]:
'''
    Обмен данными между процессами (вам действительно это нужно?)
    Обмен данными между процессами осуществияется с помощью очередей Queue и
    каналов Pipe.
'''
# Process
import os
import multiprocessing
from multiprocessing import Process, Queue, Pipe

print('Queue')

def use_queue_to_send_pid(q: Queue):
    q.put(os.getpid())
    return None

q = Queue()
processes: List[Process] = [
    Process(target=use_queue_to_send_pid, args=(q,))
    for _ in range(5)
]

# run processes
_ = list(map(lambda p: p.start(), processes))   # сначала запускаем все процессы (так что выполняться они начинают параллельно)
# wait till the end, the order is chaotic
_ = list(map(lambda p: p.join(), processes))    # и ждем когда они завершатся

print('Process ID')
for p in processes:
    print(msg:=q.get())

print('\nPipe')

def use_pipe_to_send_pid(pipe):
    pipe.send(os.getgid())
    pipe.close()
    return None

main_pipe, child_pipe = Pipe()
p = Process(target=use_pipe_to_send_pid, args=(child_pipe, ))
p.start()
print('msg from pipe:', main_pipe.recv())
p.join()


Queue


Process ID
29004
29006
29008
29010
29012

Pipe
msg from pipe: 1000


In [None]:
'''
    Есть механика блокировки управления, реализуемая через Lock
'''

from multiprocessing import Lock

def function_to_separated_process(lock: Lock):
    lock.acquire()
    print('Hi! I am a process with PID', pid:=os.getpid())
    lock.release()

lock = Lock()
processes: List[Process] = [
    Process(target=function_to_separated_process, args=(lock,))
    for _ in range(5)
]
print('Здесь упорядоченно')
for p in processes:
    p.start()   # сначала мы запускам процесс
    p.join()    # затем ждем его завершения прежде чем начать следующий

print('\nЗдесь хаотично (нет, не хаотично, процесс блокирует остальные и выполняет свой блок кода)')

lock = Lock()
processes: List[Process] = [
    Process(target=function_to_separated_process, args=(lock,))
    for _ in range(5)
]
# run processes
_ = list(map(lambda p: p.start(), processes))   # сначала запускаем все процессы (так что выполняться они начинают параллельно)
# wait till the end, the order is chaotic
_ = list(map(lambda p: p.join(), processes))    # и ждем когда они завершатся

Здесь упорядоченно
Hi! I am a process with PID 29114
Hi! I am a process with PID 29123
Hi! I am a process with PID 29132
Hi! I am a process with PID 29141
Hi! I am a process with PID 29150

Здесь хаотично (нет, не хаотично, процесс блокирует остальные и выполняет свой блок кода)
Hi! I am a process with PID 29159
Hi! I am a process with PID 29167
Hi! I am a process with PID 29162
Hi! I am a process with PID 29172
Hi! I am a process with PID 29177


In [None]:
'''
    Shared Memory -- это память, разделяемая между процессами. 
        Если большой объем данных используется во всех (многих) процессах без изменения содержимого,
        имеет смысл поместить блок данных в общую память.
'''
from multiprocessing import Value, Array

def put_pid(shared_array: Array, i: int) -> None:
    pid = os.getpid()
    shared_array[i] = pid
    print('process', pid)
    return None

n_processes: int = 10
shared_array: Array = Array(typecode_or_type='d', size_or_initializer=n_processes)
processes = [Process(target=put_pid, args=(shared_array, i)) for i in range(n_processes)]

# run processes
_ = list(map(lambda p: p.start(), processes))   # сначала запускаем все процессы (так что выполняться они начинают параллельно)
# wait till the end, the order is chaotic
_ = list(map(lambda p: p.join(), processes))    # и ждем когда они завершатся

print('\nArray', list(shared_array))

process 176666


process process176669 
176678process
 176683process
 176694
process 176703process
 176714process
 176721process
 176730process
 176739

Array [176666.0, 176669.0, 176678.0, 176683.0, 176694.0, 176703.0, 176714.0, 176721.0, 176730.0, 176739.0]


In [None]:
'''
    Часто вместо процессов удобно использовать пул процессов (Pool)
'''

from multiprocessing import Pool, Lock
from typing import Any

def function_to_separated_process(_):
    print('\nHi! I am a process with PID', pid:=os.getpid())

# используя контекстный менеджер, создаем пул, передавая количество процессов
with Pool(processes=10) as pool:
    res: List[Any] = pool.map(function_to_separated_process, range(10))



Hi! I am a process with PID
Hi! I am a process with PID
Hi! I am a process with PID
Hi! I am a process with PID
Hi! I am a process with PID
Hi! I am a process with PID
Hi! I am a process with PID
Hi! I am a process with PID
Hi! I am a process with PID 
Hi! I am a process with PID       202749  202754202751202757202750202752202748
202755202756202753










In [None]:
# TASKS:
'''
    Имплементируйте в многопроцессорном режиме генерацию траекторий для разных
    стохастических процессов.
'''

#### MULTITHREADING

In [1]:
'''
    Далее изучим многопоточные операции. Основные проблемы, возникающая с многопоточностью:
        race condition и deadlock
    1е - состояние гонки, когда 2 потока претендуют за один ресурс (ячейку памяти),
        что приводит к
    2е - блокировка ресурса (ячейки памяти), когда один поток взял в работу данные, и заблокировал их,
        чтобы другие потоки не могли с ними ничего делать. Параллельно этот поток ждет данные от, скажем, другого потока,
        который в свою очередь для завершения своих операций требует заблокированный 1м потоком ресурс.
        Получается следующее: 
        1й поток не может завершиться, 
            потому что 2й поток не может передать ему данные,
                потому что 1й поток не может завершиться,
                    потому что 2й поток не может завершиться ...
                        и это называется deadlock.
        Ему предшествует состояние гонки, которое абсолютно нежелательно допускать никогда.
    К счастью, интерпретатор python оснащен механизмом GIL (Global Interpreter Lock), который
        гарантирует, что в каждый момент времени исполняется только 1 поток - потрясающе!,
        это почти сводит на нет пользу многопоточного исполнения - потоки все равно будут исполняться последовательно.
        Тем не менее, ускорения добиться можно.
    В python 3.13, однако, допускается выключить GIL - https://www.infoworld.com/article/3552750/get-started-with-the-free-threaded-build-of-python-3-13.html
'''


'\n    Далее изучим многопоточные операции. Основные проблемы, возникающая с многопоточностью:\n        race condition и deadlock\n    1е - состояние гонки, когда 2 потока претендуют за один ресурс (ячейку памяти),\n        что приводит к\n    2е - блокировка ресурса (ячейки памяти), когда один поток взял в работу данные, и заблокировал их,\n        чтобы другие потоки не могли с ними ничего делать. Параллельно этот поток ждет данные от, скажем, другого потока,\n        который в свою очередь для завершения своих операций требует заблокированный 1м потоком ресурс.\n        Получается следующее: \n        1й поток не может завершиться, \n            потому что 2й поток не может передать ему данные,\n                потому что 1й поток не может завершиться,\n                    потому что 2й поток не может завершиться ...\n                        и это называется deadlock.\n        Ему предшествует состояние гонки, которое абсолютно нежелательно допускать никогда.\n    К счастью, интерпр

In [None]:
import os
os.environ['PYTHON_GIL'] = '0'  # в теории если поставить эту переменную окружения == 0, то GIL отключится
                                # если нет, то в интернете написано, как выключить

def if_prime(x: int):
    if x <= 1:
        return 0
    elif x <= 3:
        return 1
    elif x % 2 == 0 or x % 3 == 0:
        return 0
    i = 5
    while i**2 <= x:
        if x % i == 0 or x % (i + 2) == 0:
            return 0
        i += 6
    return 1


In [None]:
def check_performance():
    ans: int = 0
    for i in range(1_000_000):
        ans += if_prime(i)

In [3]:
%timeit check_performance()

858 ms ± 7.21 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
import concurrent
from concurrent.futures import ThreadPoolExecutor

# будет работать медленно
def check_performance_with_pool():
    with ThreadPoolExecutor(max_workers=128) as pool:
        ans: List[Any] = pool.map(if_prime, list(range(1_000_000)))

In [None]:
# будет работать медленно
def check_performance_with_pool():
    with ThreadPoolExecutor(max_workers=12) as pool:
        futures: List[concurrent.futures.Future] = [pool.submit(if_prime, i) for i in range(1_000_000)]
        ans = sum(future.result() for future in futures)

In [None]:
'''
    В действительности иногда сложно найти ситуации, где лучше использовать потоки.
    Немного более дорогой вариант в плане ресурсов, но более надежный -- использовать процессы.

    В заключение представим список библиотек для распараллеливания вычислений в python:
        joblib - https://joblib.readthedocs.io/en/stable/
        parsl - https://parsl-project.org/
        ipyparallel - https://ipyparallel.readthedocs.io/en/latest/     
'''

#### ASYNCHRONOUS CALCULATING

In [None]:
'''
    Начнем с знакомства с корутинами (coroutines) -- подпрограммами, которые можно приостановить для выполнения
    других команд с возобновлением их работы с последнего состояния.

    AWAITABLES
    Объект назывется ожидаемым (awaitable), если к нему можно применить выражение "await".
    Множество API веб-ресурсов написаны с поддержкой awaitables объектов.
    Есть 3 основных объекта : coroutines, Tasks и Futures(не путать с фьючерсами из курса деривативов)
    docs: https://docs.python.org/3/library/asyncio.html
'''

# COROUTINES
async def my_coroutine():
    print('Hello, coroutine\n')

print(type(my_coroutine()))
print(my_coroutine())        # <coroutine object my_coroutine at 0x7c6eac5efe80>
await my_coroutine()         # выполнить корутину

# TASKS
'''
    Задачи (Tasks) используются для выполнения блоков кода concurrently (одновременно, параллельно)
    Когда задачу оборачивают функциями наподобие asyncio.create_task(), корутина автоматически начинает исполняться в фоне,
    без требования ожидания возвращения управления в основной блок кода
'''

import time
import asyncio
from asyncio.tasks import Task

# для работы в ноутбуках
import nest_asyncio
nest_asyncio.apply()

print()

asyncio.create_task( my_coroutine() )  # начнет выполняться сразу после вызова

async def sleep_and_print(time: float) -> None:
    await asyncio.sleep(time)
    print('Awaken')

# запуск корутин consequently (последовательно)
start = time.time()
for _ in range(1, 3):
    sleep_time = _ #random.random()
    await sleep_and_print(sleep_time)
end = time.time()
print('total time =', round(end-start, 3), '\n') # 3 секунды на выполнение

# запуск корутин concurrently (параллельно)
start: float = time.time()
task_1: Task = asyncio.create_task(sleep_and_print(1))
task_2: Task = asyncio.create_task(sleep_and_print(2))
await task_1
await task_2
end: float = time.time()
print('asynchronously performed time =', round(end-start, 3)) # уже 2 секунды на выполнение

# более современный вариант с использованием TaskGroup и контекстным менеджером
async with asyncio.TaskGroup() as tg:
    start: float = time.time()
    task_1: Task = tg.create_task(
        sleep_and_print(1)
    )
    task_2: Task = tg.create_task(
        sleep_and_print(2)
    )
end: float = time.time()
print('with TaskGroup time elapse =', round(end-start, 3)) # те же 2 секунды на выполнение

  print(type(my_coroutine()))
  print(my_coroutine())        # <coroutine object my_coroutine at 0x7c6eac5efe80>


<class 'coroutine'>
<coroutine object my_coroutine at 0x7c6eac464700>
Hello, coroutine


Hello, coroutine

Awaken
Awaken
total time = 3.005 

Awaken
Awaken
asynchronously performed time = 2.002


In [None]:
# FUTURES
'''
    Фьючерсы это специальный низко-уровневый ожидаемый объект, который хранит конечный результат корутины.
    Когда код ожидает фьючерс ( await future() ), это означает, что корутина в ожидаении пока фьючерс будет 
    выполнен и возвращен в некотором месте кода. Фьючерсы в основном нужны для реализации callback'ов в коде,
    написанном в асинхронном стиле. Обычно фьючерсы используются редко.
'''

# для демонстрации используем дополнительную библиотеку для работы с фьючерсами
from concurrent.futures import ThreadPoolExecutor
import random

def to_exec(_: int):
    time.sleep( random.random() )
    print('Awaken', _)
    return _

loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=2) as executor:
    res: List[Any] = await loop.run_in_executor(
        executor, to_exec, 1
    )

Awaken 1


In [None]:
'''
    СОЗДАНИЕ И ОБРАБОТКА ЗАДАЧ
    asyncio.create_task(coro, *, name=None, context=None) - создание задачи
    asyncio.get_running_loop() - возвращает объект цикла, в котором хранятся задачи
    task.cancel() - отменить задачу
'''

import asyncio
from asyncio.tasks import Task

import time
from typing import Dict, Any

# пусть корутина возвращает значение, нужно получить к нему доступ
# создадим словарь и используем его для сбора ответов
_global_coro_ans: Dict[str, Any] = dict()

async def sleep_and_print(time_: float) -> None:
    global _global_coro_ans
    await asyncio.sleep(time_)
    print('awaken after', time_)
    # переведем аргумент в строку, потому что ключ должен быть неизменяемым объектом
    _global_coro_ans[str(time_)] = time_
    return time_

my_loop = asyncio.get_event_loop()
for i in range(10):
    _ = my_loop.create_task(
        sleep_and_print(i)
    )

awaken after 0
awaken after 1
awaken after 2
awaken after 3
awaken after 4
awaken after 5
awaken after 6
awaken after 7
awaken after 8
awaken after 9


In [None]:
from typing import Set

await asyncio.sleep(3) # подождем 3 секунды, некоторые задачи успеют выполниться

# # проверим какие задачи еще не выполнены и отменим их
pending: Set[Task] = asyncio.all_tasks() # all_tasks возвращает множество (set) выполняющихся задач
'''
    ВАЖНО: asyncio.all_tasks() возвращает список задач, которые в процессе выполнения
    И этот список автоматически обновляется, поэтому выполненные задачи в нем не отображаются.
'''
num_of_canceled_tasks: int = 0
for task in pending:
    print(task_name := task.get_coro().__name__)  # выключим только целевые задачи (т.е. 'sleep_and_print')
    if task_name == 'sleep_and_print':
        task.cancel()
        num_of_canceled_tasks += 1
print('Кол-во отмененных задач:', num_of_canceled_tasks)
# ожидаемо мы отменили 6 задач, потому что они ожидали до выполнения больше 3 секунд
# которые мы выделили им в качестве форы до отмены

dispatch_queue
sleep_and_print
run_cell_async
sleep_and_print
sleep_and_print
sleep_and_print
sleep_and_print
sleep_and_print
Кол-во отмененных задач: 6


In [61]:
# проверим наличие ответов корутин в глобальном словаре
print(_global_coro_ans)

{'0': 0, '1': 1, '2': 2, '3': 3}


In [None]:
'''
    Возвращаясь к asyncio.all_tasks()
    Чтобы хранить ссылки на задачи, которые были выполнены,
    можно создавать множества с ссылками на эти задачи:
'''
my_loop = asyncio.get_event_loop()
tasks: Set[Task] = set()
for i in range(10):
    task = my_loop.create_task(
        sleep_and_print(i)
    )
    tasks.add(task)

awaken after 0
awaken after 1
awaken after 2
awaken after 3
awaken after 4
awaken after 5
awaken after 6
awaken after 7
awaken after 8
awaken after 9


In [None]:
# callback функция, ниже объяснено зачем
def print_task_argument(_: Any) -> None:
    print('object type', type(_))
    print('object itself', _)
    return None

await asyncio.sleep(3)

'''
    Данный подход позволяет не использовать костыли наподобие глобальных переменных
'''
for task in tasks:
    print(task_name := task.get_coro().__name__)
    if task_name == 'sleep_and_print':
        print(done:=task.done())
        if done:
            print('result of the coro', task_name, ':', task.result())

        # мы добавляем в задачу callback, который триггерится при завершении задачи
        # в данном примере мы просим распечатать аргумент коллбека
        # а потом удалить задачу из списка задач
        task.add_done_callback(print_task_argument)
        task.add_done_callback(tasks.discard)
'''
    User cases? 
        Допустим, вы работаете над исследовательским проектом в ipynb 
        и хотите запустить долго выполняющийся алгоритм. 
        При этом вы не хотите ждать пока он отработает, а продолжить
        выполнять другие задачи в рамках ноутбука. Выполняя блок кода
        синхронно, вам придется ждать высвобождения им управления.
        Альтернативно, если вы обернете его в асинхронную задачу и отправите
        выполняться в фоновом отдельном процессе, то сможете и дальше использовать
        ресурсы (переменные, память, процессорное время) ноутбука дальше,
        таким образом эффективно распараллелив ваш рабочий процесс.
        Как видите, это очень удобный и мощный инструмент!
'''

sleep_and_print
True
result of the coro sleep_and_print : 0
sleep_and_print
True
result of the coro sleep_and_print : 3
sleep_and_print
True
result of the coro sleep_and_print : 2
sleep_and_print
False
sleep_and_print
False
sleep_and_print
False
sleep_and_print
False
sleep_and_print
False
sleep_and_print
False
sleep_and_print
True
result of the coro sleep_and_print : 1


'\n    User cases? \n        Допустим, вы работаете над исследовательским проектом в ipynb \n        и хотите запустить долго выполняющийся алгоритм. \n        При этом вы не хотите ждать пока он отработает, а продолжить\n        выполнять другие задачи в рамках ноутбука. Выполняя блок кода\n        синхронно, вам придется ждать высвобождения им управления.\n        Альтернативно, если вы обернете его в асинхронную задачу и отправите\n        выполняться в фоновом отдельном процессе, то сможете и дальше использовать\n        ресурсы (переменные, память, процессорное время) ноутбука дальше,\n        таким образом эффективно распараллелив ваш рабочий процесс.\n'

object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-79' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=0>
object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-82' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=3>
object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-81' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=2>
object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-80' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=1>


object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-83' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=4>
object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-84' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=5>
object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-85' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=6>
object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-86' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=7>
object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-87' coro=<sleep_and_print() done, defined at /tmp/ipykernel_6005/1206983598.py:18> result=8>
object type <class 'asyncio.tasks.Task'>
object itself <Task finished name='Task-88' coro=<sleep_and_print() done, defined at /tmp

In [None]:
'''
    При работе с группой связанных (или не связанных) корутин,
    используется TaskGroup, который имплементирует реализацию контекстного менеджера,
    внутри которого гарантируется, что все задачи "в ожидании", т.е. 
    выход из контекстного менеджера означает, что все задачи в группе завершены (успешно, неуспешно или неактивны)
'''


In [None]:

import asyncio
from asyncio.tasks import Task
from asyncio import TaskGroup

import time
from typing import Set, Dict, Any

async def sleep_and_print(time_: float) -> None:
    global _global_coro_ans
    await asyncio.sleep(time_)
    print('awaken after', time_)
    return time_

tasks: Set[Task] = set()
async with TaskGroup() as tg:
    for tnum in range(5):
        tasks.add(
            tg.create_task(sleep_and_print(tnum))
        )
print('Все задачи завершены')
for t in tasks:
    print(t.done(), t.result())

In [None]:
'''
    В случае, если внутри группы задач возникает ошибка, то менеджер завершает свою работу,
    завершив все остальные задачи.
    Пример из https://docs.python.org/3/library/asyncio-task.html#coroutine
'''

class TerminateTaskGroup(Exception):
    """Exception raised to terminate a task group."""

async def force_terminate_task_group():
    """Used to force termination of a task group."""
    raise TerminateTaskGroup()

async def job(task_id, sleep_time):
    print(f'Task {task_id}: start')
    await asyncio.sleep(sleep_time)
    print(f'Task {task_id}: done')

try:
    async with TaskGroup() as group:
        # spawn some tasks
        group.create_task(job(1, 0.5))
        group.create_task(job(2, 1.5))
        # sleep for 1 second
        await asyncio.sleep(1)
        # add an exception-raising task to force the group to terminate
        group.create_task(force_terminate_task_group())
except* TerminateTaskGroup:
    pass

##### РАБОТА С API

In [1]:
'''
    Асинхронные вычисления часто используются при работе с веб-ресурсами,
    поскольку часто приходится просто ждать ответ на запрос с сервера.
    Есть много задач, где приходится иметь дело с подключением к API 
    какого-нибудь интернет ресурса. 
    Здесь мы будем подключаться к крипто-биржам и
    скачивать исторические данные и данные в режиме реального времени
'''



'\n    Асинхронные вычисления часто используются при работе с веб-ресурсами,\n    поскольку часто приходится просто ждать ответ на запрос с сервера.\n    Есть много задач, где приходится иметь дело с подключением к API \n    какого-нибудь интернет ресурса. \n    Здесь мы будем подключаться к крипто-биржам и\n    скачивать исторические данные и данные в режиме реального времени\n'

In [None]:
'''
    DERIBIT API
    https://docs.deribit.com/?python
    На сайте с документацией приведены примеры запросов на python и описание ответов на эти запросы.
    Для работы потребуется установить библиотеку websockets
'''

import json
import websockets

import asyncio
# для работы в ноутбуках
import nest_asyncio
nest_asyncio.apply()

from typing import Dict, Any


# формируем запрос в виде словаря
msg = \
{
  "method" : "public/get_instruments",
  "params" : {
    "currency" : "BTC",
    "kind" : "future"
  },
  "jsonrpc" : "2.0",
  "id" : 1
}

# асинхронная функция для отправки запроса по апи
async def call_api(msg: str) -> Dict[str, Any]:
   # подключаем вебсокет коннект
   async with websockets.connect('wss://test.deribit.com/ws/api/v2') as websocket:
      # отправляем запрос
      await websocket.send(msg)
      # ожидаем ответ
      response: str = await websocket.recv() 
      # возвращаем ответ
      return json.loads(response)

request = json.dumps(msg) # дампит словарь в строку
ans: Dict[str, Any] = asyncio.get_event_loop().run_until_complete(call_api(request))

# в полученном ответе мы получаем список торгуемых инструментов
print(len(ans['result']))

instrument = ans['result'][0]['instrument_name']

# получим LOB по инструменту
msg = \
{
  "jsonrpc" : "2.0",
  "id" : 8772,
  "method" : "public/get_order_book",
  "params" : {
    "instrument_name" : instrument,
    "depth" : 50
  }
}
request = json.dumps(msg) # дампит словарь в строку
ans: Dict[str, Any] = asyncio.get_event_loop().run_until_complete(call_api(request))

# парсим полученный снапшот
bids = ans['result']['bids']
asks = ans['result']['asks']

9


Задача.

Написать скрипт, который в режиме реального времени с шагом в 1 секунду выгружает снапшоты книги лимитных приказов
с биржи deribit, а также список последних сделок по следующим инструментам:
BTC-PERPETUAL, ETH-PERPETUAL
Создайте несколько задач asyncio.task, которые будут:
а) обращаться к бирже и выгружать данные
б) собирать их в единый форматированный датафрейм
в) сохранять данные на диск

все операции должны быть реализованы в асинхронном режиме

'BTC-2MAY25'