In [16]:
import multiprocessing
import os
import queue
import random
import subprocess as sp
import sys
import time
import threading
import urllib.request

# subprocess
# threading
# multiprocessing

# subprocess

### The problem
Sometimes, you want to **run other program from your Python program**.

*Example*: testing system runs compiler/interpreter.

*Requirements*: ability to check stdout/stderr

In [None]:
import subprocess as sp

# use it whenever it's possible
sp.run

# use when sp.run doesn't satisfy your needs
sp.Popen

# use if Python <= 3.4
sp.call
sp.check_call
sp.check_output

In [None]:
class Popen(object):    
    def __init__(
        self, args, *, stdin=None, stdout=None, stderr=None,
        cwd=None, env=None,  # many other arguments
    ):
        ...

In [None]:
def run(*popenargs, input=None, timeout=None, check=False, **kwargs):
    """Run command with arguments and return a CompletedProcess instance."""

Документация  
https://docs.python.org/3/library/subprocess.html

In [47]:
p = sp.Popen(['ls', '-l'])  # in practice, use os.listdir
p

<subprocess.Popen at 0x105bd1390>

In [48]:
# error code
p.poll()

0

In [49]:
# no stdout :(
p.stdout

In [50]:
p = sp.Popen(['ls', '-l'], stdout=sp.PIPE)

# it's almost the same as opened file
p.stdout

<_io.BufferedReader name=75>

In [51]:
list(p.stdout)[:3]

[b'total 120\n',
 b'-rw-r--r--  1 ygorishniy  LD\\Domain Users    240 Oct 30 17:20 bad_sum.cpp\n',
 b'drwxr-xr-x  6 ygorishniy  LD\\Domain Users    192 Oct 30 17:37 images\n']

In [None]:
def run(*popenargs, input=None, timeout=None, check=False, **kwargs):
    """Run command with arguments and return a CompletedProcess instance."""

In [None]:
### sum.cpp ###

#include <iostream>

int Sum(int first, int second) {
    return first + second;
}

int main() {
    int a = 0;
    int b = 0;
    std::cin >> a >> b;
    std::cout << Sum(a, b) << '\n';
}

In [52]:
sp.run(['clang++', '-std=c++14', 'sum.cpp', '-o', 'sum.out'])

CompletedProcess(args=['clang++', '-std=c++14', 'sum.cpp', '-o', 'sum.out'], returncode=0)

In [55]:
sp.run(['./sum.out'], stdout=sp.PIPE, input=b'2 3')

CompletedProcess(args=['./sum.out'], returncode=0, stdout=b'5\n')

In [None]:
### bad_sum.cpp ("#include <iostream>" is missing) ###

int Sum(int first, int second) {
    return first + second;
}

int main() {
    int a = 0;
    int b = 0;
    std::cin >> a >> b;
    std::cout << Sum(a, b) << '\n';
}

In [56]:
# use stderr to access error messages
sp.run(['clang++', '-std=c++14', 'bad_sum.cpp', '-o', 'bad_sum.out'], stderr=sp.PIPE)

CompletedProcess(args=['clang++', '-std=c++14', 'bad_sum.cpp', '-o', 'bad_sum.out'], returncode=1, stderr=b"bad_sum.cpp:7:5: error: use of undeclared identifier 'std'\n    std::ios_base::sync_with_stdio(false);\n    ^\nbad_sum.cpp:8:5: error: use of undeclared identifier 'std'\n    std::cin.tie(nullptr);\n    ^\nbad_sum.cpp:12:5: error: use of undeclared identifier 'std'\n    std::cin >> a >> b;\n    ^\nbad_sum.cpp:13:5: error: use of undeclared identifier 'std'\n    std::cout << Sum(a, b) << '\\n';\n    ^\n4 errors generated.\n")

### subprocess
This module intends to replace several older modules and functions:  
`os.system, os.spawn*`  
(c) python.org  

In [58]:
working_directory = os.system('pwd')
print(f'working_directory: {working_directory}')

working_directory: 0


In [59]:
working_directory = os.system('print_working_directory')
print(f'working_directory: {working_directory}')

working_directory: 32512


So, `subprocess` is the only correct way to run other programs from Python

# VERY IMPORTANT
### `subprocess` is an ideal replacement for Bash
Any program written in Bash can be rewritten in Python via subprocess.

### Summary
- `subprocess` is the only correct way to run other programs from Python
- `subprocess` can be used to run console tools, C++-binaries, etc.
- `subprocess` lets you replace Bash

# threading

### Summary
- each process has its own memory
- *thread* is the activity of executing commands within a process

- at one moment *ONE* cpu-core executes exactly *ONE* thread

- multiple cores can execute multiple threads literally SIMULTANEOUSLY (that's why multiple-core machines can operate faster)

- there is no sence in creating more threads than your cpu count

- multiple threads can execute within one process
- in such case, they work with the same memory

<img src="files/images/threads.png">

Источник: https://sites.google.com/site/sureshdevang/thread-vs-process

One more clarifying illustration

<img src="files/images/thread_vs_process.gif">

Источник: https://www.perl.com/pub/2002/09/04/threads.html

In [1]:
import threading
import sys

def thread_job(number):
    print('Privet {}'.format(number))
    sys.stdout.flush()

def run_threads(count):
    thread_job(0)
    threads = [
        threading.Thread(target=thread_job, args=(i,))
        for i in range(1, count + 1)
    ]
    for thread in threads:
        thread.start()
    # ALL THREADS MUST BE JOINED
    for thread in threads:
        thread.join()
run_threads(4)

Privet 0
Privet 1
Privet 2
Privet 3
Privet 4


In [2]:
import threading
import sys

def thread_job(number):
    print('Privet {}'.format(number, id(number)))
    sys.stdout.flush()

def run_threads(count):
    thread_job(0)
    threads = [
        threading.Thread(target=lambda: thread_job(i))
        for i in range(1, count + 1)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
run_threads(4)

Privet 0
Privet 4
Privet 4
Privet 4
Privet 4


### Summary
- multithreading programming is something new!
- **NEVER USE LAMBDAS AS THE TARGET**

The next example is a - ~~stolen~~ rethinked part of a supercool talk by Raymond Hettinger (Python core-developer):  
https://www.youtube.com/watch?v=Bv25Dwe84g0

### Summary
- respect Raymond Hettinger and especially respect the talk mentioned above

In [3]:
counter = 0

def thread_job():
    global counter
    old_counter = counter
    counter = old_counter + 1
    print('{} '.format(counter), end='')

threads = [threading.Thread(target=thread_job) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

counter

1 2 3 4 5 6 7 8 9 10 

10

WOW SO MUCH ERROR RIGHT IN THE SLIDE!!!

Reminder: threads are constantly interrupted by the OS.  
Let's model this phenomenon via `time.sleep`

In [65]:
import random
import time

counter = 0
def thread_job():
    global counter
    old_counter = counter
    time.sleep(random.randint(0, 1))
    counter = old_counter + 1
    print('{} '.format(counter), end='')

threads = [threading.Thread(target=thread_job) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
counter

1 2 3 4 5 6 1 7 6 5 

5

There are different names for such situations: *data race*, *race condition*, ...

### Straightforward solution

In [None]:
counter = 0
def thread_job(lock):
    lock.acquire()
    global counter
    counter += 1
    print('{} '.format(counter), end='')
    sys.stdout.flush()
    lock.release()

lock = threading.Lock()
threads = [
    threading.Thread(target=thread_job, args=(lock,))
    for i in range(10)
]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
counter

### Better solution

In [None]:
counter = 0

def thread_job(lock):
    with lock:
        global counter
        counter += 1
        print('{} '.format(counter), end='')
        sys.stdout.flush()

lock = threading.Lock()
threads = [
    threading.Thread(target=thread_job, args=(lock,))
    for i in range(10)
]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

counter

### The best solution
https://github.com/StrausMG/teaching/blob/master/2018_ysda_python_seminars/subprocess_threading_multiprocessing/queue_example.py

### Summary
- multithreading programming is something difficult!
- `threading.Queue` significantly simplify it!
- that's why you should ALWAYS use `threading.Queue`

### The problem
Calculate a sum of a vector of numbers using N threads (N <= core_count)

### C++ solution
- the exact code doesn't matter (you'll learn it in C++-2)
- what matters: it reduces the calculation time by the factor of N

### Python solution

In [7]:
import queue

def thread_job(arr, part_id, thread_count, results_queue):
    results_queue.put(
        sum(arr[i] for i in range(part_id, len(arr), thread_count))
    )

def sum_using_threads(arr, thread_count):
    results_queue = queue.Queue()
    threads = [
        threading.Thread(target=thread_job, args=(arr, i, thread_count, results_queue))
        for i in range(thread_count)
    ]
    for thread in threads:
        thread.start()

    results = []
    for thread in threads:
        results.append(results_queue.get())
        thread.join()

    return sum(results)

In [8]:
arr = [1 for _ in range(10 * 1000 * 1000)]

In [11]:
%%timeit
sum(arr[i] for i in range(len(arr)))

853 ms ± 10.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
%%timeit
sum_using_threads(arr, 4)

803 ms ± 9.24 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Summary
- Global Interpreter Lock makes Python threads useless for calculation tasks

In [72]:
import urllib.request

urls = [
    'https://www.yandex.ru', 'https://www.google.com',
    'https://www.python.org', 'https://isocpp.org',
    'https://habrahabr.ru', 'https://news.ycombinator.com',
    'https://www.twilio.com', 'http://www.celeryproject.org'
]

def read_url(url):
    with urllib.request.urlopen(url) as u:
        return u.read()

In [73]:
%%timeit
for url in urls:
    with urllib.request.urlopen(url) as u:
        data = u.read()

4.8 s ± 241 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [74]:
%%timeit
readers = [
    threading.Thread(target=read_url, args=(url,))
    for url in urls
]
for reader in readers:
    reader.start()
for reader in readers:
    reader.join()

1.23 s ± 74.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Summary
- Python threads are cool when there are blocking actions (input-output, networking)

### P.S.
Although, Python threads are a bit deficient, in real world apps you may want to use them anyway, because it's often more convenient to use different threads for different complex activities (graphics, animation, etc.) 

# multiprocessing

Reminder:
- Global Interpreter Lock forbids to exectute Python threads simultaneously within the same process

However:
- different Python processes (not threads!) can be executed simultaneously!
- in threory it means that processes can be used for parallel calculations

In [75]:
import threading


LIST = []


def worker():
    LIST.append('Privet!')
    
threads = [
    threading.Thread(target=worker)
    for _ in range(5)
]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
    
LIST

['Privet!', 'Privet!', 'Privet!', 'Privet!', 'Privet!']

In [76]:
import multiprocessing


LIST = []


def worker():
    LIST.append('item')
    
processes = [
    multiprocessing.Process(target=worker)
    for _ in range(5)
]
for p in processes:
    p.start()
for p in processes:
    p.join()
    
LIST

[]

Возвращаемся к задаче о суммировании

In [26]:
size = 10 * 1000 * 1000
arr = [1 for _ in range(size)]

In [27]:
%%timeit
sum(arr)

48.4 ms ± 931 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


### ULTRA FAST TUTORIAL ON `multiprocessing.Pool`

In [28]:
with multiprocessing.Pool(4) as p:
    p.map(print, range(7))

0
2
1
5
3
4
6


In [36]:
process_count = 4
part_size = size // process_count
array_parts = [
    arr[i * part_size: (i + 1) * part_size]
    for i in range(process_count)
]

In [37]:
%%timeit
with multiprocessing.Pool(process_count) as p:
    # every part is sent to separate processes
    p.map(sum, array_parts)

356 ms ± 20.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [38]:
one_part = arr[0 * part_size: (0 + 1) * part_size]
%timeit sum(one_part)

11.8 ms ± 94.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


### Summary
- it's time-consuming to share data between processes

Let's model the same as above, but with sharing almost no data

In [39]:
def do_n_actions(n):
    return sum(1 for _ in range(n))

In [40]:
%%timeit
do_n_actions(size)

610 ms ± 30.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [41]:
%%timeit
with multiprocessing.Pool(process_count) as p:
    p.map(do_n_actions, (part_size for _ in range(process_count)))

228 ms ± 1.64 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Finally, we managed to boost the calculation!  
But by the factor less than the process count :(

### Summary
- it's time-consuming to create processes
- sometimes less processes can lead to better performance

### Summary
- it's time-consuming to share data between processes
- if the task is light and data is big that DO NOT use multiprocessing
- if the opposite situation, USE multiprocessing (it's easy)
- the libraries `threading` and `multiprocessing` are very similar!
- `pip install joblib` for super-easy multiprocessing programming