https://python-academy.org/ru/guide/intro-concurrency-async

https://senjun.ru/courses/python/chapters/python_chapter_0310/



Многозадачность (Конкурентность (Concurrency))
* Параллелизм (Parallelism)
* Асинхронность (Asynchronicity) - Многопоточность

Механизмы для достижения конкурентности:
* Процесс (Process)
* Поток (Thread)

Типы задач, обрабатываемые конкуррентно:
* I/O-bound задачи (задачи, ограниченные вводом-выводом)
    * Чтение/запись файлов на диск.
    * Сетевые запросы (к базам данных, API, веб-страницам).
    * Ожидание пользовательского ввода
* CPU-bound задачи (задачи, ограниченные производительностью процессора)

Многопоточность по планировщику различается на
* Вытесняющая многозадачность (preemptive multitasking) - в ОС
    * планировщик задач принудительно приостанавливает выполнение текущей задачи и отдает управление другой задаче 
* Кооперативная многозадачность (cooperative multitasking) - в ЯП (python asyncio)
    * Задачи самостоятельно отдают управление планировщику. А он решает, какая задача будет выполняться следующей.

In [1]:
import threading
import time

def worker(number, sleep_time):
    """Функция, которую будет выполнять наш поток."""
    print(f"Поток {number}: начинаю работу, буду спать {sleep_time} сек.")
    time.sleep(sleep_time)
    print(f"Поток {number}: завершаю работу.")

# Создаем потоки
thread1 = threading.Thread(target=worker, args=(1, 2)) # args передаются как кортеж
thread2 = threading.Thread(target=worker, args=(2, 1))

# Запускаем потоки
thread1.start()
thread2.start()

print("Все потоки запущены из основного потока.")

# Ожидаем завершения потоков (опционально, но часто нужно)
thread1.join() # Основной поток будет ждать, пока thread1 не завершится
thread2.join() # Основной поток будет ждать, пока thread2 не завершится

print("Все потоки завершили свою работу.")


Поток 1: начинаю работу, буду спать 2 сек.
Поток 2: начинаю работу, буду спать 1 сек.
Все потоки запущены из основного потока.
Поток 2: завершаю работу.
Поток 1: завершаю работу.
Все потоки завершили свою работу.


In [3]:
import threading
import time

shared_resource = 0
lock = threading.Lock()

def increment_shared_resource():
    global shared_resource
    for _ in range(100000):
        lock.acquire() # Захватываем блокировку
        try:
            shared_resource += 1
        finally:
            lock.release() # Освобождаем блокировку в любом случае

threads = []
for i in range(5):
    thread = threading.Thread(target=increment_shared_resource())
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Итоговое значение shared_resource: {shared_resource}") # Ожидается 500000

Итоговое значение shared_resource: 500000


In [4]:
import threading
import time

event = threading.Event()

def waiter():
    print("Ожидающий поток: жду события...")
    event.wait() # Блокируется, пока событие не будет установлено
    print("Ожидающий поток: событие получено, продолжаю работу!")

def setter():
    print("Устанавливающий поток: немного подожду...")
    time.sleep(2)
    print("Устанавливающий поток: устанавливаю событие!")
    event.set()

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)

t1.start()
t2.start()

t1.join()
t2.join()


Ожидающий поток: жду события...
Устанавливающий поток: немного подожду...
Устанавливающий поток: устанавливаю событие!
Ожидающий поток: событие получено, продолжаю работу!


In [5]:
import threading
import time
import random

MAX_CONNECTIONS = 3
semaphore = threading.BoundedSemaphore(MAX_CONNECTIONS)

def use_resource(thread_id):
    print(f"Поток {thread_id}: пытаюсь захватить ресурс...")
    with semaphore: # Контекстный менеджер автоматически вызывает acquire/release
        print(f"Поток {thread_id}: ресурс захвачен.")
        sleep_time = random.randint(1, 3)
        time.sleep(sleep_time)
        print(f"Поток {thread_id}: освобождаю ресурс после {sleep_time} сек.")

threads = []
for i in range(10):
    thread = threading.Thread(target=use_resource, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

Поток 0: пытаюсь захватить ресурс...
Поток 0: ресурс захвачен.
Поток 1: пытаюсь захватить ресурс...
Поток 1: ресурс захвачен.
Поток 2: пытаюсь захватить ресурс...
Поток 2: ресурс захвачен.
Поток 3: пытаюсь захватить ресурс...
Поток 4: пытаюсь захватить ресурс...
Поток 5: пытаюсь захватить ресурс...
Поток 6: пытаюсь захватить ресурс...
Поток 7: пытаюсь захватить ресурс...
Поток 8: пытаюсь захватить ресурс...
Поток 9: пытаюсь захватить ресурс...
Поток 2: освобождаю ресурс после 1 сек.
Поток 3: ресурс захвачен.
Поток 0: освобождаю ресурс после 3 сек.
Поток 4: ресурс захвачен.
Поток 1: освобождаю ресурс после 3 сек.
Поток 5: ресурс захвачен.
Поток 3: освобождаю ресурс после 2 сек.
Поток 6: ресурс захвачен.
Поток 4: освобождаю ресурс после 1 сек.
Поток 7: ресурс захвачен.
Поток 5: освобождаю ресурс после 3 сек.Поток 7: освобождаю ресурс после 2 сек.
Поток 6: освобождаю ресурс после 3 сек.

Поток 8: ресурс захвачен.
Поток 9: ресурс захвачен.
Поток 9: освобождаю ресурс после 1 сек.
Поток 8: о

In [6]:
import threading
import time

condition = threading.Condition()
items = []

def producer():
    for i in range(5):
        time.sleep(random.uniform(0.1, 0.5))
        with condition: # Захватываем блокировку
            item = f"Элемент-{i}"
            items.append(item)
            print(f"Производитель: добавил {item}, всего {len(items)} элементов. Оповещаю.")
            condition.notify() # Оповещаем одного потребителя

def consumer(name):
    while True:
        with condition:
            print(f"{name}: ожидаю элемент...")
            condition.wait() # Ожидаем, пока производитель что-то добавит и оповестит
            if not items:
                print(f"{name}: список пуст, но меня разбудили. Продолжаю ждать.")
                continue # Может быть ложное пробуждение или другой потребитель забрал
            item = items.pop(0)
            print(f"{name}: забрал {item}, осталось {len(items)} элементов.")
            if not items and threading.active_count() <= 3: # Пример условия выхода
                break
        time.sleep(random.uniform(0.2, 0.6)) # Имитация обработки

# Запускаем одного производителя и двух потребителей
threading.Thread(target=producer).start()
threading.Thread(target=consumer, args=("Потребитель-A",)).start()
threading.Thread(target=consumer, args=("Потребитель-B",)).start()

Потребитель-A: ожидаю элемент...
Потребитель-B: ожидаю элемент...
Производитель: добавил Элемент-0, всего 1 элементов. Оповещаю.
Потребитель-A: забрал Элемент-0, осталось 0 элементов.
Потребитель-A: ожидаю элемент...
Производитель: добавил Элемент-1, всего 1 элементов. Оповещаю.
Потребитель-B: забрал Элемент-1, осталось 0 элементов.
Производитель: добавил Элемент-2, всего 1 элементов. Оповещаю.
Потребитель-A: забрал Элемент-2, осталось 0 элементов.
Потребитель-B: ожидаю элемент...
Потребитель-A: ожидаю элемент...
Производитель: добавил Элемент-3, всего 1 элементов. Оповещаю.
Потребитель-B: забрал Элемент-3, осталось 0 элементов.
Производитель: добавил Элемент-4, всего 1 элементов. Оповещаю.
Потребитель-A: забрал Элемент-4, осталось 0 элементов.
Потребитель-B: ожидаю элемент...
Потребитель-A: ожидаю элемент...


In [7]:
import threading
import queue
import time
import random

q = queue.Queue()

def producer_q():
    for i in range(10):
        item = f"Элемент-{i}"
        time.sleep(random.uniform(0.1, 0.3))
        q.put(item)
        print(f"Производитель: добавил {item} в очередь (размер: {q.qsize()})")
    q.put(None) # Сигнал окончания для одного потребителя
    q.put(None) # Сигнал окончания для второго потребителя

def consumer_q(name):
    while True:
        item = q.get() # Блокируется, если очередь пуста
        if item is None: # Сигнал окончания
            q.task_done() # Сообщаем, что задача "None" обработана
            print(f"{name}: получил сигнал None, завершаю работу.")
            break
        print(f"{name}: обработал {item}")
        time.sleep(random.uniform(0.2, 0.5))
        q.task_done() # Сообщаем, что задача обработана

# Запускаем производителя и двух потребителей
threading.Thread(target=producer_q).start()
threading.Thread(target=consumer_q, args=("Потребитель-1",)).start()
threading.Thread(target=consumer_q, args=("Потребитель-2",)).start()

# q.join() # Можно использовать, если producer не шлет None и мы хотим дождаться обработки всех задач

Производитель: добавил Элемент-0 в очередь (размер: 1)
Потребитель-1: обработал Элемент-0
Производитель: добавил Элемент-1 в очередь (размер: 1)
Потребитель-2: обработал Элемент-1
Производитель: добавил Элемент-2 в очередь (размер: 1)Потребитель-1: обработал Элемент-2

Производитель: добавил Элемент-3 в очередь (размер: 1)Потребитель-2: обработал Элемент-3

Производитель: добавил Элемент-4 в очередь (размер: 1)
Потребитель-1: обработал Элемент-4
Производитель: добавил Элемент-5 в очередь (размер: 1)Потребитель-2: обработал Элемент-5

Производитель: добавил Элемент-6 в очередь (размер: 1)Потребитель-1: обработал Элемент-6

Производитель: добавил Элемент-7 в очередь (размер: 1)Потребитель-2: обработал Элемент-7

Производитель: добавил Элемент-8 в очередь (размер: 1)
Потребитель-1: обработал Элемент-8
Производитель: добавил Элемент-9 в очередь (размер: 1)Потребитель-2: обработал Элемент-9

Потребитель-1: получил сигнал None, завершаю работу.
Потребитель-2: получил сигнал None, завершаю ра

In [8]:
import multiprocessing
import os
import time

def worker_process(name):
    """Функция, выполняемая в отдельном процессе."""
    print(f"Процесс {name} (PID: {os.getpid()}): начинаю работу.")
    # time.sleep(2) # Имитация задержки (можно раскомментировать для наглядности)
    print(f"Процесс {name} (PID: {os.getpid()}): завершаю работу.")

if __name__ == "__main__":  # Важно для multiprocessing на некоторых платформах
    print(f"Основной процесс (PID: {os.getpid()})")

    # Создаем процессы
    process1 = multiprocessing.Process(target=worker_process, args=("Worker-A",))
    process2 = multiprocessing.Process(target=worker_process, args=("Worker-B",))

    # Запускаем процессы
    process1.start()
    process2.start()

    print("Все дочерние процессы запущены.")

    # Ожидаем завершения процессов
    process1.join()
    process2.join()

    print("Все дочерние процессы завершили свою работу.")

Основной процесс (PID: 23088)
Все дочерние процессы запущены.
Все дочерние процессы завершили свою работу.


In [9]:
import multiprocessing
import time

def sender(conn):
    print("Отправитель: отправляю данные.")
    conn.send("Привет от отправителя") # Одно сообщение
    conn.close()
    print("Отправитель: данные отправлены и канал закрыт.")

def receiver(conn):
    print("Получатель: ожидаю данные...")
    msg = conn.recv()
    print(f"Получатель: получил \"{msg}\"")
    conn.close()
    print("Получатель: канал закрыт.")

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()

    p_sender = multiprocessing.Process(target=sender, args=(parent_conn,))
    p_receiver = multiprocessing.Process(target=receiver, args=(child_conn,))

    p_sender.start()
    p_receiver.start()

    p_sender.join()
    p_receiver.join()
    print("Обмен через Pipe завершен.")

Обмен через Pipe завершен.


In [10]:
import multiprocessing
import time
import random

def producer_proc(q):
    for i in range(3):
        item = f"Элемент-{i}"
        time.sleep(random.uniform(0.1, 0.2)) # Небольшая случайная задержка
        q.put(item)
        print(f"Производитель: добавил {item} в очередь.")
    q.put(None) # Сигнал окончания для потребителя

def consumer_proc(name, q):
    while True:
        item = q.get()
        if item is None:
            print(f"{name}: получил сигнал None, завершаю.")
            break
        print(f"{name}: обработал {item}")
        time.sleep(random.uniform(0.1, 0.3))

if __name__ == "__main__":
    q = multiprocessing.Queue()

    p_prod = multiprocessing.Process(target=producer_proc, args=(q,))
    p_cons1 = multiprocessing.Process(target=consumer_proc, args=("Потребитель-A", q))

    p_prod.start()
    p_cons1.start()

    p_prod.join()
    p_cons1.join()
    print("Обмен через Queue завершен.")


Обмен через Queue завершен.


In [11]:
import multiprocessing

def worker_value(num, lock):
    for _ in range(500):
        with lock:
            num.value += 1

def worker_array(arr, index, lock):
    with lock:
        arr[index] -= index * 0.5 # Пример операции над элементом массива

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    shared_num = multiprocessing.Value('i', 0) # 'i' - тип integer
    shared_array = multiprocessing.Array('d', [10.0, 20.0, 30.0]) # 'd' - тип double

    processes = []
    # Процессы для Value
    for _ in range(2):
        p = multiprocessing.Process(target=worker_value, args=(shared_num, lock))
        processes.append(p)
        p.start()

    # Процессы для Array
    for i in range(len(shared_array)):
        p = multiprocessing.Process(target=worker_array, args=(shared_array, i, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Shared number: {shared_num.value}") # Ожидается 1000
    print(f"Shared array: {list(shared_array)}")


Shared number: 0
Shared array: [10.0, 20.0, 30.0]


In [12]:
import multiprocessing

def worker_manager_dict(shared_dict, key, value):
    shared_dict[key] = value
    print(f"Процесс {key}: установил {key}={value}")

def worker_manager_list(shared_list, value):
    shared_list.append(value)
    print(f"Процесс добавил {value} в список")

if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        shared_dict = manager.dict()
        shared_list = manager.list()

        processes = []
        # Демонстрация для словаря
        for i in range(2):
            p = multiprocessing.Process(target=worker_manager_dict, args=(shared_dict, f"key{i}", i*10))
            processes.append(p)
            p.start()

        # Демонстрация для списка
        for i in range(2):
            p = multiprocessing.Process(target=worker_manager_list, args=(shared_list, f"item_{i}"))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(f"Разделяемый словарь: {dict(shared_dict)}")
        print(f"Разделяемый список: {list(shared_list)}")


Разделяемый словарь: {}
Разделяемый список: []


In [None]:
import multiprocessing
import time

def square(x):
    time.sleep(0.1) # Имитация вычислений
    return x * x

if __name__ == "__main__":
    # Используем контекстный менеджер для автоматического close() и join()
    with multiprocessing.Pool(processes=2) as pool:
        numbers = [1,2,3,4,5]

        # Пример с map
        print("Используем pool.map():")
        results_map = pool.map(square, numbers)
        print(f"Результаты (map): {results_map}")

        # Пример с apply_async
        print("\nИспользуем pool.apply_async():")
        async_results = [pool.apply_async(square, (num,)) for num in numbers]
        results_apply_async = [res.get(timeout=1) for res in async_results]
        print(f"Результаты (apply_async): {results_apply_async}")

    print("Работа с пулом завершена.")


Используем pool.map():


In [1]:
import asyncio

async def greet(name):
    print(f"Привет, {name}!")
    await asyncio.sleep(1) # Имитация I/O операции (неблокирующий сон)
    print(f"Пока, {name}!")

# Вызов greet() возвращает объект корутины, но не выполняет ее код
coro_obj = greet("Алиса")
print(type(coro_obj)) # <class 'coroutine'>

# Чтобы запустить корутину, нужен цикл событий
# Пример запуска см. ниже с использованием asyncio.run()


<class 'coroutine'>


In [5]:
import asyncio

async def hello_world_async():
    print("Асинхронный Hello...")
    await asyncio.sleep(0.1) # Имитируем небольшую асинхронную операцию
    print("...World!")

if __name__ == "__main__":
    asyncio.run(hello_world_async())

RuntimeError: asyncio.run() cannot be called from a running event loop