In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Python 3
## многопоточность

https://docs.python.org/3/library/concurrency.html


MIPT 2020

Igor Slobodskov


## Особенности питона

* GIL - глобальная блокировка, практически "однопоточное" исполнение
* Можно сделать отдельный процесс с ещё одним интерпретатором питона

## Executor

https://docs.python.org/3/library/concurrent.futures.html

* submit(fn, \*args, \*\*kwargs) -> Future
* map(func, \*iterables, timeout=None, chunksize=1) -> Future
* shutdown(wait=True)

## Future

https://docs.python.org/3/library/concurrent.futures.html#future-objects
    
методы:
* result(timeout=None)
* статус:
    * running, 
    * cancelled 
    * done
* изменение статуса:
    * set_result
    * set_exception
* и прочие
    

* ThreadPoolExecutor - удобен для блокирующих операций типа работы с сетью
* ProcessPoolExecutor - настоящая многозадачность

# Example

## Executor.submit()

In [19]:
from concurrent.futures import ThreadPoolExecutor
import time


def func(n):
    print(f"func {n} start\n", end='')
    time.sleep(5)
    print(f"func {n} finished\n", end='')
    return 42
    
with ThreadPoolExecutor() as executor:
    future_result = executor.submit(func, 1)
    print(future_result)
    print(future_result.result())

<Future at 0x7f042821ada0 state=running>
func 1 start
func 1 finished
42


In [20]:
## several futures

In [21]:
from concurrent.futures import ThreadPoolExecutor
import time


def func(n):
    print(f"func {n} start\n", end='')
    time.sleep(5)
    print(f"func {n} finished\n", end='')
    return f"result for {n}"
    
with ThreadPoolExecutor() as executor:
    first = executor.submit(func, "first")
    second = executor.submit(func, "second")
    print(first, second)
    print(first.result())
    print(second.result())

func first start
func second start
<Future at 0x7f042821af60 state=running> <Future at 0x7f0428243860 state=running>
func second finished
func first finished
result for first
result for second


In [11]:
from concurrent.futures import ThreadPoolExecutor
import time


def func(n):
    print(f"func {n} start\n", end='')
    time.sleep(5)
    print(f"func {n} finished\n", end='')
    
with ThreadPoolExecutor() as executor:
    result = executor.map(func, [1,2,3])
    print(f"result = {result}")
    print(f"list(result) = {list(result)}")

func 1 start
func 2 start
func 3 start
result = <generator object Executor.map.<locals>.result_iterator at 0x7f04281c8d58>
func 1 finished
func 3 finished
func 2 finished
list(result) = [None, None, None]


## max_workers

In [13]:
from concurrent.futures import ThreadPoolExecutor
import time

def func(n):
    print(f"func {n} start\n", end='')
    time.sleep(5)
    print(f"func {n} finished\n", end='')
    
with ThreadPoolExecutor(max_workers=2) as executor:
    result = executor.map(func, [1,2,3])
    print(f"result = {result}")
    print(f"list(result) = {list(result)}")

func 1 start
func 2 start
result = <generator object Executor.map.<locals>.result_iterator at 0x7f04282c2bf8>
func 1 finished
func 2 finished
func 3 start
func 3 finished
list(result) = [None, None, None]


## func with a few args

In [17]:
from concurrent.futures import ThreadPoolExecutor
import time

def func(a, b):
    print(f"func({a}, {b}) start\n", end='')
    time.sleep(5)
    print(f"func({a}, {b}) finished\n", end='')
    
with ThreadPoolExecutor() as executor:
    result = executor.map(lambda args: func(*args), [(1,'f'),(2, 's')])
    print(f"result = {result}")
    print(f"list(result) = {list(result)}")

func(1, f) start
func(2, s) start
result = <generator object Executor.map.<locals>.result_iterator at 0x7f04281c8d58>
func(2, s) finished
func(1, f) finished
list(result) = [None, None]


## ThreadPoolExecutor vs ProcessPoolExecutor

In [22]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def fib(n): return fib(n-2) + fib(n-1) if n>2 else 1

t = time.time()
with ThreadPoolExecutor() as executor:
    list(executor.map(fib, [21, 21, 21, 21]))
    print (time.time() - t)
    
t = time.time()
with ProcessPoolExecutor() as executor:
    list(executor.map(fib, [21, 21, 21, 21]))
    print (time.time() - t)

[10946, 10946, 10946, 10946]

0.01505422592163086


[10946, 10946, 10946, 10946]

0.033594608306884766


In [26]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def fib(n): return fib(n-2) + fib(n-1) if n>2 else 1

t = time.time()
with ThreadPoolExecutor() as executor:
    list(executor.map(fib, [32, 32, 32, 32]))
    print (time.time() - t)
    
t = time.time()
with ProcessPoolExecutor() as executor:
    list(executor.map(fib, [32, 32, 32, 32]))
    print (time.time() - t)

[2178309, 2178309, 2178309, 2178309]

2.2270658016204834


[2178309, 2178309, 2178309, 2178309]

1.2138030529022217


## fork

In [29]:
import os

child_pid = os.fork()
print(f"child_pid = {child_pid}")

child_pid = 6207
child_pid = 0


In [30]:
import os
import time 

child_pid = os.fork()

if child_pid == 0:
    print("child")
else:
    print("parent")

parent
child


## Process, Thread

https://docs.python.org/3/library/threading.html#threading.Thread

In [35]:
from multiprocessing import Process
import time 

def func(n):
    print(f"func({n}) started")
    time.sleep(1)
    print(f"func({n}) finished")

p = Process(target=func, args=("f",))
print("p created")
p.start()
print("p runned")
p.join()
print("joined to p")

p created
func(f) started
p runned
func(f) finished
joined to p


In [39]:
from multiprocessing import Queue

def producer(data, q: Queue):
    for elem in data:
        q.put(elem)
    q.put("end")
        
def consumer(q: Queue):
    while True:
        elem = q.get()
        print(elem)
        if elem == "end":
            return

        
data = [1,2,3]
q = Queue()
p1 = Process(target=producer, args=(data, q))
p2 = Process(target=consumer, args=(q,))

p2.start()
p1.start()

p2.join()
        

1
2
3
end
