<center>
    <img src="https://upload.wikimedia.org/wikipedia/commons/a/a8/%D0%9B%D0%9E%D0%93%D0%9E_%D0%A8%D0%90%D0%94.png" width=500px/>
    <font>Python 2022</font><br/>
    <br/>
    <br/>
    <b style="font-size: 2em">Subprocess, threading, multiprocessing</b><br/>
    <br/>
    <font>Вадим Мазаев</font><br/>
</center>

# subprocess

Иногда появляется необходимость **запустить некоторую программу из вашего питон-кода**

*Например*: наша тестирующая система запускает тесты для вашего кода. И нужно убедиться, что тесты отработали корректно

<center>
<img src="https://blag.felixhummel.de/_images/process_stdin_stdout_stderr_return-code.png" alt="process" width=800 />
</center>

In [1]:
import subprocess

In [None]:
# Simplified code
print('\033[93mRunning tests...\033[0m')
try:
    subprocess.run(
        ['pytest', '-p', 'no:cacheprovider', '--tb=no'],
        cwd=str(build_dir),
        timeout=60,
        check=True,
    )
except subprocess.CalledProcessError as exc:
    # Catch if any error occured
except subprocess.TimeoutExpired as exc:
    # or process timed out

In [None]:
# recommended to use whenever it's possible
subprocess.run

# for more complex cases
subprocess.Popen

In [None]:
class Popen:    
    def __init__(
        self,
        args,           # string or sequence to execute
        stdin=None, stdout=None, stderr=None,
        cwd=None,       # set current working directory
        env=None,       # set environment variables
        encoding=None,  # decode bytes from std*
        errors=None,    # how to handle encoding errors
        text=False,     # use default encoding and errors
        shell=False,    # handle args as shell command
        ...             # and many other arguments
    ): ...

Документация: https://docs.python.org/3/library/subprocess.html

In [2]:
process = subprocess.Popen(['ls', '-l'])  # in practice, use os.listdir
process

total 44
-rw-rw-r-- 1 vmazaev vmazaev   672 Oct  6 21:30 rise.css
-rw-rw-r-- 1 vmazaev vmazaev 38147 Nov 22 20:38 SubprocessThreadingMultiprocessing.ipynb


<Popen: returncode: None args: ['ls', '-l']>

In [3]:
# wait and get exit code
process.poll()

0

In [4]:
# wait with timeout
process.wait(timeout=1)  # timeout in secs

0

In [5]:
stdout, stderr = process.communicate()
stdout  # no stdout :(

In [6]:
process = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE, text=True)
stdout, _ = process.communicate()

In [7]:
print(''.join(stdout))

total 44
-rw-rw-r-- 1 vmazaev vmazaev   672 Oct  6 21:30 rise.css
-rw-rw-r-- 1 vmazaev vmazaev 38445 Nov 22 20:42 SubprocessThreadingMultiprocessing.ipynb



In [37]:
# communicate(input='...') with Popen(..., stdin=PIPE) to pass smth to stdin
# communicate(timeout=1) for setting timeout in seconds

In [None]:
def run(
    *popenargs,
    check=False,           # check process exit code and raise on failure
    capture_output=False,  # set stdout=PIPE and stderr=PIPE
    timeout=None,          # passed to Popen.communicate(), raise on timeout
    input=None,            # passed to Popen.communicate() as stdin
    **popenkwargs
) -> subprocess.CompletedProcess: ...

In [8]:
subprocess.run(['bc'], input=b'2 * 3\n', capture_output=True)

CompletedProcess(args=['bc'], returncode=0, stdout=b'6\n', stderr=b'')

### subprocess
This module intends to replace several older modules and functions:
```
os.system
os.spawn*
``` 
(c) python.org

see also: https://www.python.org/dev/peps/pep-0324/

На текущий момент `subprocess` - наиболее правильный способ запускать другие процессы на питоне

# threading

<center>
<img alt="threads" src="https://www.backblaze.com/blog/wp-content/uploads/2017/08/diagram-thread-process-1.png" width="800px" />
</center>

**Поток** (тред; от англ. thread — нить) — поток выполнения команд внутри процесса

<center>
<img src="http://www.openrtos.net/implementation/TaskExecution.gif" alt="concurrency" width="800px" />
</center>

В один момент времени **одно** ядро процессора исполняет **ровно один** поток

Несколько ядер могут выполнять несколько потоков буквально **одновременно**

<center>
<img src="https://www.backblaze.com/blog/wp-content/uploads/2017/08/diagram-thread-concurrency.png" alt="parallelism" width=800 />
</center>

Однако **не имеет смысла** создавать больше потоков, чем у вас есть ядер процессора (если целью является увеличение производительности)

Потоки в рамках одного процесса делят **общую** память

<div align="center"><img src="https://i.imgur.com/nlhI00n.png?1" width="650px"/></div>

In [9]:
import threading

In [10]:
def greeter(num: int) -> None:
    print(f'Hello {num}', flush=True)

In [13]:
def run_threads(count: int) -> None:
    threads = [
        threading.Thread(target=greeter, args=(i,))
        for i in range(count)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

In [7]:
run_threads(4)

Hello 0Hello 1

Hello 2
Hello 3


Следующий пример взят из доклада Реймонда Хэттингера (Python core-developer):
https://www.youtube.com/watch?v=Bv25Dwe84g0

In [8]:
count = 0

def counter() -> None:
    global count
    old_count = count
    count = old_count + 1
    print(f'{count} ', end='', flush=True)

In [9]:
threads = [threading.Thread(target=counter) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

1 2 3 4 5 6 7 8 9 10 

В коде на слайде кроется коварная ошибка

<center>
<img src="https://s3.amazonaws.com/s3-blogs.mentor.com/colinwalls/files/2018/05/RTC-520x118.png" alt="context-switching" />
</center>
Напоминание: потоки постоянно переключаются ОС

Давайте увеличим время жизни потока с помощью `time.sleep`

In [10]:
import time
import random

In [11]:
count = 0

def counter() -> None:
    global count
    old_count = count
    time.sleep(random.randint(0, 1))
    count = old_count + 1
    print(f'{count} ', end='', flush=True)

In [12]:
count = 0
threads = [threading.Thread(target=counter) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

1 2 3 4 5 1 2 5 6 6 

Это извечная проблема многопоточного кода — **гонка** (от англ. **race**/**race condition**)

### Решение

In [26]:
count = 0
lock = threading.Lock()

def counter() -> None:
    global count
    with lock:
        old_count = count
        time.sleep(random.randint(0, 1))
        count = old_count + 1
        print(f'{count} ', end='', flush=True)

In [27]:
threads = [threading.Thread(target=counter) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

1 2 3 4 5 6 7 8 9 10 

### Вариант с `queue.Queue`

<center>
<img src="https://pengphy.files.wordpress.com/2010/09/image5b95d.png?w=1400" alt="queue" width=500 />
</center>

### Задача
Посчитать сумму последовательности чисел в N потоков (<= кол-во ядер ЦПУ)

In [28]:
import queue
from collections.abc import Iterable

In [29]:
def adder(array: Iterable[int], part_id: int, thread_count: int, queue_out: queue.Queue) -> None:
    queue_out.put(sum(array[i] for i in range(part_id, len(array), thread_count)))

In [30]:
def sum_using_threads(array: Iterable[int], thread_count: int) -> list[int]:
    queue_out = queue.Queue()
    threads = [
        threading.Thread(target=lambda i=i: adder(array, i, thread_count, queue_out))
        for i in range(thread_count)
    ]
    for thread in threads:
        thread.start()
    results = []
    for thread in threads:
        results.append(queue_out.get())
        thread.join()
    return sum(results)

In [37]:
array = [1 for _ in range(10_000_000)]

In [41]:
%%timeit
sum(array[i] for i in range(len(array)))  # sum(array)

399 ms ± 11.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [42]:
%%timeit
sum_using_threads(array, 8)

522 ms ± 134 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Причина столько "значительного" ускорения: **GIL (Global Interpreter Lock)**

GIL делает питонячие потоки бесполезными для распараллеливания вычислений

In [21]:
import requests

In [22]:
urls = [
    'https://ya.ru', 'https://www.google.com',
    'https://www.python.org', 'https://isocpp.org',
    'https://habr.com', 'https://news.ycombinator.com'
]

In [23]:
def read_url(url: str) -> str:
    return requests.get(url).text

In [24]:
%%timeit
for url in urls:
    read_url(url)

4.19 s ± 230 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [25]:
%%timeit
readers = [
    threading.Thread(target=lambda url=url: read_url(url))
    for url in urls
]
for reader in readers:
    reader.start()
for reader in readers:
    reader.join()

870 ms ± 59.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Для специальных типов задач: **IO-bound**, питон-треды оказываются эффективны

<center>
<img alt="IO-bound" src="https://risingstack-blog.s3.amazonaws.com/2016/Jun/non_async_blocking_operations_example_in_node_hero_1459856858194-1466683867567.png" width=700 />
</center>

**NB**: Кроме того, иногда бывает полезным использовать треды для разделения потоков исполнения.

Хотя это и не даст выигрыша, организовывать сложный код таким образом может быть удобнее.

Например, отрисовка анимации, фоновые активности и прочие невысоконагруженные задачи.

# multiprocessing

GIL запрещает параллельное исполнение нескольких потоков

<center>
<img src="https://s3.amazonaws.com/media-p.slid.es/uploads/299675/images/1413349/Screen_Shot_2015-05-23_at_15.58.31.png" alt="GIL" width=600 />
</center>

Но GIL никак не может повлиять на разные процессы, поэтому в теории можно использовать разные процессы для распараллеливания вычислений!

In [43]:
import multiprocessing

In [44]:
accumulator = []

def worker() -> None:
    accumulator.append('item')

In [45]:
processes = [multiprocessing.Process(target=worker) for _ in range(5)]
for p in processes:
    p.start()
for p in processes:
    p.join()
    
accumulator

[]

Процессы не разделяют память, поэтому нужен механизм обмена данными между ними

Таким готовым механизмом обладает `multiprocessing.Pool`

In [46]:
def multiplier(x: int) -> int:
    return x * 2

In [47]:
with multiprocessing.Pool() as pool:
    result = pool.map(multiplier, range(10))
result

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Возвращаемся к задаче о суммировании

In [49]:
size = 10_000_000
array = [1 for _ in range(size)]

In [54]:
%%timeit
sum(array)

38.9 ms ± 451 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [55]:
process_count = multiprocessing.cpu_count()  # 8
part_size = size // process_count
array_parts = [
    array[i * part_size: (i + 1) * part_size]
    for i in range(process_count)
]

In [56]:
with multiprocessing.Pool(process_count) as pool:
    %timeit pool.map(sum, array_parts)

120 ms ± 3.91 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [57]:
one_part = array[0 * part_size: (0 + 1) * part_size]
%timeit sum(one_part)

4.96 ms ± 56.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


Много времени тратится на коммуникацию между процессами :(

Попробуем не пересылать много данных между процессами

In [58]:
def sum_n(n: int) -> int:
    return sum(1 for _ in range(n))

In [59]:
%%timeit
sum_n(size)

271 ms ± 1.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [60]:
with multiprocessing.Pool(process_count) as pool:
    %timeit pool.map(sum_n, (part_size for _ in range(process_count)))

94.9 ms ± 2.51 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


Есть заметный прирост в скорости, но множитель меньше, чем кол-во ядер ЦПУ :(

### Резюме

Создавать процессы — это дорого

Передавать данные между процессами — тоже дорого

Поэтому иногда меньше процессов — лучше

Если данных для обмена много, и задача не слишком тяжелая, лучше обойтись без multiprocessing'а

В противном случае используйте multiprocessing — это просто

# Спасибо за внимание!