Изучить механизмы многопроцессного (multiprocessing) и асинхронного (asyncio) программирования в Python. Понять, когда и зачем применяется каждый из подходов, научиться использовать их на практике.
Все алгоритмы можно условно разделить на два типа в зависимости от того, что ограничивает скорость их выполнения:
- CPU-bound (ограничены процессором) — вычислительные задачи, которые постоянно нагружают ЦП: математические расчёты, обработка данных, перемножение матриц.
- IO-bound (ограничены вводом-выводом) — задачи, которые большую часть времени ожидают внешних событий: сетевые запросы, чтение файлов, ввод пользователя.
От типа задачи зависит выбор инструмента для ускорения:
flowchart TD
Task["Задача"] --> Question{"Тип задачи?"}
Question -->|"CPU-bound (вычисления)"| MP["multiprocessing"]
Question -->|"IO-bound (сеть, файлы)"| AS["asyncio"]
MP --> Parallel["Параллельное выполнение\nна нескольких ядрах CPU"]
AS --> EventLoop["Кооперативное переключение\nзадач в event loop"]
Модуль multiprocessing создаёт настоящие процессы операционной системы. Каждый процесс получает собственную копию интерпретатора Python и всех данных. Это единственный способ в Python обойти GIL и задействовать несколько ядер процессора для CPU-bound задач.
flowchart LR
Main["Главный процесс\n(main)"] -->|"fork"| P1["Процесс 1\nPython + копия данных"]
Main -->|"fork"| P2["Процесс 2\nPython + копия данных"]
Main -->|"fork"| P3["Процесс 3\nPython + копия данных"]
P1 -->|"результат"| Main
P2 -->|"результат"| Main
P3 -->|"результат"| Main
Пример создания процесса (файл 01_basic_process.py):
from multiprocessing import Process, current_process
import os, time
def worker(task_name, duration):
"""Функция, выполняемая в отдельном процессе."""
print(f"[{current_process().name}] Начало задачи '{task_name}' "
f"(PID={os.getpid()}, родитель={os.getppid()})")
time.sleep(duration)
print(f"[{current_process().name}] Задача '{task_name}' завершена")
if __name__ == '__main__':
print(f"Главный процесс: PID={os.getpid()}")
tasks = [("Загрузка", 2), ("Обработка", 3), ("Сохранение", 1)]
processes = []
for name, dur in tasks:
p = Process(target=worker, args=(name, dur)) # создаём процесс
processes.append(p)
for p in processes:
p.start() # запускаем все процессы
for p in processes:
p.join() # ждём завершения всехProcess(target=..., args=...)— создаёт новый процесс. Функцияworkerбудет выполнена в отдельном процессе ОС.p.start()— запускает процесс (ОС создаёт новый PID).p.join()— главный процесс ожидает завершения дочернего.os.getpid()— PID текущего процесса,os.getppid()— PID родительского.- Три задачи с
time.sleep(2, 3, 1)последовательно заняли бы 6 сек, но параллельно — ~3 сек (время самой длинной).
Передача данных между процессами (файл 02_matrix_multiply.py):
Каждый процесс работает с копией данных. Обычные переменные не передают результат обратно в главный процесс. Для этого используется Queue:
from multiprocessing import Process, Queue
def element_to_queue(index, A, B, q):
i, j = index
res = sum(A[i][k] * B[k][j] for k in range(len(A[0])))
q.put((index, res)) # отправляем результат в очередь
q = Queue()
p = Process(target=element_to_queue, args=((0, 0), A, B, q))
p.start()
p.join()
(i, j), value = q.get() # получаем результат из очередиМежпроцессное взаимодействие (IPC)
Так как каждый процесс изолирован в своём адресном пространстве, для обмена данными между ними используются специальные механизмы IPC (Inter-Process Communication):
| Механизм | Описание | Когда использовать |
|---|---|---|
Queue |
Потокобезопасная очередь FIFO | Много процессов → один сборщик |
Pipe |
Двусторонний канал между двумя процессами | Прямая связь «родитель ↔ потомок» |
Pool.map/starmap |
Возвращаемые значения через пул | Распределение однотипных задач |
Pipe() создаёт пару связанных объектов-соединений. Данные, отправленные в один конец, появляются в другом:
from multiprocessing import Process, Pipe
def child(conn):
conn.send("Привет от дочернего процесса!") # отправляем
conn.close()
parent_conn, child_conn = Pipe() # два конца канала
p = Process(target=child, args=(child_conn,))
p.start()
msg = parent_conn.recv() # получаем
print(msg) # "Привет от дочернего процесса!"
p.join()Pipe()— возвращает два конца:(conn1, conn2). Что отправлено черезconn1.send(), получается черезconn2.recv(), и наоборот.Queueудобнее, когда много процессов пишут в одну очередь.Pipeбыстрее для связи двух процессов.
Пул процессов (файл 03_pool_matrix.py):
Создавать отдельный процесс на каждую маленькую задачу неэффективно. Pool создаёт фиксированное число процессов и распределяет задачи между ними:
from multiprocessing import Pool
def element(i, j, A, B):
res = sum(A[i][k] * B[k][j] for k in range(len(A[0])))
return (i, j, res)
args = [(i, j, A, B) for i in range(rows) for j in range(cols)]
with Pool(processes=4) as pool:
results = pool.starmap(element, args) # 4 процесса делят все задачиМодуль asyncio реализует кооперативную многозадачность: программист сам определяет точки переключения между задачами с помощью await. Все корутины выполняются в одном потоке и одном процессоре, но за счёт переключений во время ожидания IO достигается значительное ускорение IO-bound задач.
sequenceDiagram
participant EL as Event Loop
participant C1 as Корутина 1
participant C2 as Корутина 2
EL->>C1: запуск
C1->>EL: await — ожидание IO
EL->>C2: запуск
C2->>EL: await — ожидание IO
EL->>C1: IO завершён, возобновление
C1->>EL: завершение
EL->>C2: IO завершён, возобновление
C2->>EL: завершение
Пример: синхронный vs асинхронный подход (файл 01_sync_vs_async.py):
Синхронно — три запроса выполняются последовательно (2 + 3 + 1 = 6 сек):
import time
def fetch_data_sync(source, delay):
print(f" Запрос к '{source}'...")
time.sleep(delay) # блокирует весь поток на delay секунд
return f"данные из {source}"
results = []
results.append(fetch_data_sync("API сервер", 2))
results.append(fetch_data_sync("База данных", 3))
results.append(fetch_data_sync("Файловое хранилище", 1))
# Общее время: ~6 секАсинхронно — три запроса выполняются «одновременно» (max(2, 3, 1) = 3 сек):
import asyncio
async def fetch_data_async(source, delay):
print(f" Запрос к '{source}'...")
await asyncio.sleep(delay) # отпускает управление event loop-у
return f"данные из {source}"
async def main_async():
results = await asyncio.gather(
fetch_data_async("API сервер", 2),
fetch_data_async("База данных", 3),
fetch_data_async("Файловое хранилище", 1),
)
return results
asyncio.run(main_async())
# Общее время: ~3 секasync def— определяет корутину (асинхронную функцию).await asyncio.sleep(delay)— приостанавливает текущую корутину и передаёт управление event loop, который может выполнить другие корутины. В отличие отtime.sleep(), не блокирует весь поток.asyncio.gather(...)— запускает несколько корутин параллельно и ждёт завершения всех.asyncio.run(main())— создаёт event loop и запускает корутину (Python 3.7+).
Асинхронный TCP-сервер (файл 02_echo_server.py):
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(1024) # неблокирующее чтение
addr = writer.get_extra_info('peername')
print(f"Подключение от {addr}: '{data.decode()}'")
writer.write(data) # отправка обратно
await writer.drain() # ожидание отправки
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle_echo, '127.0.0.1', 9095)
async with server:
await server.serve_forever()
asyncio.run(main())asyncio.start_server(callback, host, port)— создаёт TCP-сервер. При каждом подключении вызывается корутинаhandle_echo.reader/writer— асинхронные потоки для чтения и записи данных.- Сервер обслуживает много клиентов в одном потоке — переключение происходит в точках
await.
| Характеристика | multiprocessing | asyncio |
|---|---|---|
| Тип задач | CPU-bound | IO-bound |
| Механизм | Отдельные процессы ОС | Корутины в event loop |
| Многозадачность | Вытесняющая | Кооперативная |
| Использование ядер CPU | Да (несколько) | Нет (одно) |
| Масштабируемость | Низкая (единицы–десятки) | Высокая (тысячи) |
| Обмен данными | Queue, Pipe, Pool | Общая память (один поток) |
| Блокирующие операции | Стандартные | Только асинхронные |
- Откройте репозиторий преподавателя: github.com/Mohanad0101/lab14_part2
- Нажмите кнопку "Fork" в правом верхнем углу.
- GitHub создаст копию репозитория в вашем аккаунте:
https://github.com/<ВАШ_ЛОГИН>/lab14_part2
cd ~
rm -rf lab14_part2
git clone https://github.com/<ВАШ_ЛОГИН>/lab14_part2.git lab14_part2
cd lab14_part2Замените
<ВАШ_ЛОГИН>на ваш логин GitHub. Командаrm -rf lab14_part2удаляет старую папку, если она существует.
python3 --versionТребуется Python 3.8 или выше.
lab14_part2/
├── README.md
├── RESULTS.md # Шаблон для результатов (заполнить)
├── multiprocessing_examples/
│ ├── 01_basic_process.py # Справочный пример (готовый)
│ ├── 02_matrix_multiply.py # Перемножение матриц — TODO
│ ├── 03_pool_matrix.py # Пул процессов — TODO
│ ├── 04_mp_echo_server.py # Многопроцессный сервер — TODO
│ └── 05_mp_echo_client.py # Клиент для сервера (готовый)
└── asyncio_examples/
├── 01_sync_vs_async.py # Сравнение sync/async — TODO
├── 02_echo_server.py # Эхо-сервер — TODO
└── 03_echo_client.py # Эхо-клиент — TODO
Для редактирования файлов используйте Sublime Text. Откройте файл командой subl:
subl multiprocessing_examples/02_matrix_multiply.pyИли откройте весь проект сразу:
subl ~/lab14_part2Если команда
sublне найдена, используйтеnanoилиvim:nano multiprocessing_examples/02_matrix_multiply.py
Совет: в Sublime Text нажмите Ctrl+G, чтобы перейти к нужной строке, или Ctrl+F и введите TODO, чтобы найти все места для заполнения.
В файлах с заданиями вы найдёте:
Справка (СПРАВКА) — в файлах, основанных на курсовых репозиториях (3_Parallelism, 4_asyncio_server, 2_threaded_server), в начале docstring приведён оригинальный код из этих репозиториев. Изучите его, чтобы понять, как работает исходный пример и что изменилось в нашей версии.
TODO — помеченные места вида:
# TODO 1: Описание задания
# Подсказка: ...Вам нужно дописать код в этих местах. Не удаляйте комментарии — они помогут преподавателю проверить работу.
- Откройте файл в Sublime Text:
subl multiprocessing_examples/02_matrix_multiply.py-
Найдите
TODO(Ctrl+F→TODO), допишите код, сохраните (Ctrl+S). -
Переключитесь в терминал и запустите:
python3 multiprocessing_examples/02_matrix_multiply.py- Если есть ошибка — вернитесь в Sublime Text, исправьте, сохраните, запустите снова.
Файл: multiprocessing_examples/01_basic_process.py
Запустите и изучите пример:
cd ~/lab14_part2
python3 multiprocessing_examples/01_basic_process.pyОбратите внимание на:
- Как создаётся и запускается процесс.
- Что выводят
os.getpid()иos.getppid(). - Как
join()заставляет главный процесс ждать дочерний.
Файл: multiprocessing_examples/02_matrix_multiply.py
Этот файл содержит функцию вычисления одного элемента произведения матриц (из репозитория 3_Parallelism). Ваша задача — распараллелить вычисление всех элементов по процессам.
Что нужно сделать:
- TODO 1: Создать процесс для каждого элемента результирующей матрицы и передать результат через
Queue. - TODO 2: Замерить время последовательного и параллельного вычисления, вывести результат.
subl multiprocessing_examples/02_matrix_multiply.py
python3 multiprocessing_examples/02_matrix_multiply.pyФайл: multiprocessing_examples/03_pool_matrix.py
Используйте Pool для более эффективного распределения задач между фиксированным числом процессов.
Что нужно сделать:
- TODO 3: Использовать
Pool.starmap()для параллельного вычисления элементов матрицы. - TODO 4: Запустить программу с разным числом процессов в пуле (1, 2, 4) и сравнить время.
subl multiprocessing_examples/03_pool_matrix.py
python3 multiprocessing_examples/03_pool_matrix.pyФайл: asyncio_examples/01_sync_vs_async.py
Сравните время выполнения одинаковой задачи в синхронном и асинхронном режимах.
Что нужно сделать:
- TODO 5: Допишите асинхронную версию функции
main_async()с использованиемasyncio.gather().
subl asyncio_examples/01_sync_vs_async.py
python3 asyncio_examples/01_sync_vs_async.pyФайл: asyncio_examples/02_echo_server.py
Реализуйте асинхронный TCP эхо-сервер на базе asyncio (по мотивам 4_asyncio_server).
Что нужно сделать:
- TODO 6: Реализовать тело корутины
handle_echo— прочитать данные, залогировать адрес клиента и сообщение, отправить данные обратно, закрыть соединение.
subl asyncio_examples/02_echo_server.py
python3 asyncio_examples/02_echo_server.pyФайл: asyncio_examples/03_echo_client.py
Реализуйте клиент, который подключается к серверу из задания B2.
Что нужно сделать:
- TODO 7: Дописать отправку сообщения и получение ответа от сервера.
- TODO 8: Запустить несколько клиентов одновременно через
asyncio.gather()и проанализировать порядок вывода.
Запуск (в отдельном терминале, пока работает сервер):
subl asyncio_examples/03_echo_client.py
python3 asyncio_examples/03_echo_client.pyВ лабораторной работе 2 вы создавали многопоточный сервер с помощью threading.Thread (2_threaded_server). Теперь реализуем аналогичный сервер, но с использованием multiprocessing.Process — каждый клиент обслуживается в отдельном процессе ОС.
Файл: multiprocessing_examples/04_mp_echo_server.py
Цикл приёма подключений и создание процессов уже реализованы. Ваша задача — дописать функцию обработки клиента.
Что нужно сделать:
- TODO 9: Реализовать тело
handle_client— принять данные, залогировать PID и сообщение, отправить обратно, закрыть соединение.
subl multiprocessing_examples/04_mp_echo_server.py
python3 multiprocessing_examples/04_mp_echo_server.py- Запустите сервер в одном терминале.
- Откройте 2–3 дополнительных терминала и в каждом запустите клиент:
python3 multiprocessing_examples/05_mp_echo_client.py- Обратите внимание на вывод сервера — PID каждого обработчика будет разным, в отличие от многопоточного сервера из lab 2, где PID одинаковый.
После выполнения всех заданий ответьте на следующие вопросы (впишите ответы в файл RESULTS.md):
- Во сколько раз параллельное перемножение матриц быстрее последовательного? Совпадает ли ускорение с количеством ядер CPU? Почему?
- Как изменяется время выполнения при увеличении числа процессов в
Pool? Есть ли предел, после которого увеличение числа процессов не даёт ускорения? - Почему для передачи результатов из процессов нельзя использовать обычные глобальные переменные?
- Почему асинхронная версия в
01_sync_vs_async.pyвыполняется быстрее синхронной, хотя использует только одно ядро? - В каком порядке выводятся сообщения при запуске нескольких клиентов через
asyncio.gather()? Является ли этот порядок детерминированным? - Что произойдёт, если в асинхронном сервере использовать
time.sleep()вместоawait asyncio.sleep()? Почему?
- Сравните вывод PID в многопроцессном сервере (
04_mp_echo_server.py) с многопоточным сервером из лабораторной 2. Почему в multiprocessing PID разные, а в threading — одинаковые?
После выполнения всех заданий пройдите тест из 20 вопросов по материалам лабораторной работы:
Вы уже сделали Fork и Clone в разделе «Подготовка». Ваш форк — это ваш личный репозиторий, куда вы можете отправлять (push) свою работу.
Укажите ваше имя и email — они будут записываться в каждый коммит:
git config --global user.name "Ваше Имя"
git config --global user.email "your.email@example.com"GitHub не принимает обычный пароль для push. Настройте один из двух способов: Personal Access Token (проще) или SSH-ключ (удобнее для постоянной работы).
A.1. Создайте токен на GitHub:
- Перейдите: GitHub → Settings → Developer settings → Personal access tokens → Tokens (classic). Прямая ссылка: github.com/settings/tokens
- Нажмите "Generate new token" → "Generate new token (classic)".
- Заполните:
- Note:
lab14-vm(произвольное описание) - Expiration: 30 days (достаточно для сдачи)
- Scopes: поставьте галочку
repo(полный доступ к репозиториям)
- Note:
- Нажмите "Generate token".
- Скопируйте токен сейчас — он показывается только один раз! Он выглядит примерно так:
ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
A.2. Сохраните токен на время работы:
git config --global credential.helper 'cache --timeout=3600'При первом git push Git спросит логин и пароль. Вместо пароля вставьте токен. Он сохранится в памяти на 1 час (3600 секунд) и повторно запрашиваться не будет.
Безопасность: не используйте
credential.helper store— он сохраняет токен в открытом виде в файле~/.git-credentials. Вариантcacheхранит токен только в оперативной памяти и автоматически удаляет его после истечения таймаута.
B.1. Проверьте, есть ли уже SSH-ключ:
ls -la ~/.ssh/Если вы видите файлы id_ed25519 и id_ed25519.pub (или id_rsa / id_rsa.pub) — ключ уже есть, переходите к шагу B.3.
B.2. Сгенерируйте новый SSH-ключ:
ssh-keygen -t ed25519 -C "your.email@example.com"На все вопросы нажимайте Enter (путь по умолчанию, без пароля):
Enter file in which to save the key (/home/user/.ssh/id_ed25519): [Enter]
Enter passphrase (empty for no passphrase): [Enter]
Enter same passphrase again: [Enter]
B.3. Скопируйте публичный ключ:
cat ~/.ssh/id_ed25519.pubСкопируйте весь вывод (он начинается с ssh-ed25519 ...).
B.4. Добавьте ключ на GitHub:
- Перейдите: GitHub → Settings → SSH and GPG keys. Прямая ссылка: github.com/settings/keys
- Нажмите "New SSH key".
- Заполните:
- Title:
Lab VM(произвольное описание) - Key: вставьте скопированный публичный ключ
- Title:
- Нажмите "Add SSH key".
B.5. Проверьте подключение:
ssh -T git@github.comОжидаемый ответ:
Hi <ВАШ_ЛОГИН>! You've successfully authenticated, but GitHub does not provide shell access.
B.6. (Только для SSH) Смените remote URL на SSH:
cd ~/lab14_part2
git remote set-url origin git@github.com:<ВАШ_ЛОГИН>/lab14_part2.gitЕсли вы используете HTTPS + токен, этот шаг не нужен — remote URL уже настроен при клонировании.
Убедитесь, что все TODO выполнены и код запускается без ошибок. Заполните файл RESULTS.md — вставьте вывод каждой программы, заполните сравнительную таблицу и впишите ответы на вопросы. Затем проверьте, какие файлы были изменены:
cd ~/lab14_part2
git statusПример вывода:
On branch main
Changes not staged for commit:
modified: RESULTS.md
modified: multiprocessing_examples/02_matrix_multiply.py
modified: multiprocessing_examples/03_pool_matrix.py
modified: multiprocessing_examples/04_mp_echo_server.py
modified: asyncio_examples/01_sync_vs_async.py
modified: asyncio_examples/02_echo_server.py
modified: asyncio_examples/03_echo_client.py
Добавьте все изменённые файлы:
git add .Или добавьте конкретные файлы по одному:
git add multiprocessing_examples/02_matrix_multiply.py
git add asyncio_examples/02_echo_server.pyПроверьте, что файлы добавлены:
git statusТеперь файлы должны быть в секции Changes to be committed (зелёным цветом).
git commit -m "lab14 part2: выполнены TODO — multiprocessing и asyncio"Коммит — это «снимок» вашей работы. Сообщение после -m кратко описывает, что было сделано.
Проверьте, что коммит создан:
git log --oneline -3Вы клонировали свой форк — вы единственный, кто в него пишет. Поэтому push должен пройти без проблем:
git push -u origin mainФлаг -u привязывает локальную ветку main к удалённой. В дальнейшем можно просто писать git push.
Если push отклонён (ошибка
rejected — non-fast-forward), значит в вашем форке на GitHub есть изменения, которых нет локально (например, вы редактировали файл через веб-интерфейс). В этом случае сначала синхронизируйте:git pull origin main --rebaseЭта команда скачает изменения с GitHub и поставит ваши коммиты поверх них. Если возникнет конфликт — Git покажет, в каких файлах проблема. Исправьте их, затем:
git add . git rebase --continueПосле этого повторите
git push origin main.
Если ветка называется master (а не main):
git branchЕсли текущая ветка — master, используйте git push -u origin master.
Если при push возникает ошибка аутентификации:
- Для HTTPS: убедитесь, что ввели токен (не пароль) и у токена есть scope
repo. - Для SSH: проверьте
ssh -T git@github.comи убедитесь, что публичный ключ добавлен на GitHub.
- Откройте в браузере:
https://github.com/<ВАШ_ЛОГИН>/lab14_part2 - Убедитесь, что все файлы загружены и содержат ваш код.
- Проверьте, что
RESULTS.mdзаполнен: вывод программ, таблица сравнения и ответы на вопросы. - Продемонстрируйте работающий код и результаты преподавателю.
flowchart LR
A["git add ."] --> B["git commit -m '...'"]
B --> C["git push origin main"]
C --> D["GitHub\n(ваш форк)"]