### Лекция 11 - Многопоточность и асинхронность в Python

- Модуль threading
- Глобальная блокировка интерпретатора (GIL)
- Механизмы синхронизации
- Очереди, Producer/Consumer
- Асинхронность в Python
- Модуль asyncio
- Ключевые слова async/await

In [1]:
import threading
import time

In [2]:
def prime_numbers(lower, upper):
    ''' (неэффективная) функция для вывода простых чисел в диапазоне '''
    if lower < 2:
        lower = 2
    for num in range(lower, upper + 1):
        for i in range(2, num):
            if num % i == 0:
                break
        else:
            print('- up to {} : pn={}'.format(upper, num))
                

t = threading.Thread(target=prime_numbers,
                     args=(2**20, 2**20 + 256))
t.start()

- up to 1048832 : pn=1048583
- up to 1048832 : pn=1048589
- up to 1048832 : pn=1048601
- up to 1048832 : pn=1048609
- up to 1048832 : pn=1048613
- up to 1048832 : pn=1048627
- up to 1048832 : pn=1048633
- up to 1048832 : pn=1048661
- up to 1048832 : pn=1048681
- up to 1048832 : pn=1048703
- up to 1048832 : pn=1048709
- up to 1048832 : pn=1048717
- up to 1048832 : pn=1048721
- up to 1048832 : pn=1048759
- up to 1048832 : pn=1048783
- up to 1048832 : pn=1048793
- up to 1048832 : pn=1048799
- up to 1048832 : pn=1048807
- up to 1048832 : pn=1048829


In [3]:
class PrimeNumberThread(threading.Thread):
    ''' Класс генерации простых чисел с функцией потока '''
    def __init__(self, lower, upper):
        self.__lower = lower
        self.__upper = upper
        threading.Thread.__init__(self)

    def run(self):
        for num in range(self.__lower, self.__upper + 1):
            for i in range(2, num):
                if num % i == 0:
                    break
            else:
                print('- up to {} : pn={}'.format(self.__upper, num))


t = PrimeNumberThread(150, 200)
t.start()

- up to 200 : pn=151
- up to 200 : pn=157
- up to 200 : pn=163
- up to 200 : pn=167
- up to 200 : pn=173
- up to 200 : pn=179
- up to 200 : pn=181
- up to 200 : pn=191
- up to 200 : pn=193
- up to 200 : pn=197
- up to 200 : pn=199


In [4]:
# пример одновременного выполнения 2 потоков:
t1 = PrimeNumberThread(2**20, 2**20 + 256)
t2 = PrimeNumberThread(2**21, 2**21 + 256)
t1.start()
t2.start()

- up to 1048832 : pn=1048583
- up to 2097408 : pn=2097169
- up to 1048832 : pn=1048589
- up to 1048832 : pn=1048601
- up to 2097408 : pn=2097211- up to 1048832 : pn=1048609

- up to 1048832 : pn=1048613
- up to 2097408 : pn=2097223
- up to 1048832 : pn=1048627
- up to 1048832 : pn=1048633
- up to 2097408 : pn=2097229
- up to 1048832 : pn=1048661
- up to 1048832 : pn=1048681
- up to 2097408 : pn=2097257
- up to 1048832 : pn=1048703
- up to 1048832 : pn=1048709- up to 2097408 : pn=2097259

- up to 1048832 : pn=1048717
- up to 1048832 : pn=1048721
- up to 2097408 : pn=2097287- up to 1048832 : pn=1048759

- up to 1048832 : pn=1048783
- up to 2097408 : pn=2097289
- up to 1048832 : pn=1048793
- up to 1048832 : pn=1048799
- up to 2097408 : pn=2097311
- up to 1048832 : pn=1048807
- up to 1048832 : pn=1048829
- up to 2097408 : pn=2097317
- up to 2097408 : pn=2097349
- up to 2097408 : pn=2097373
- up to 2097408 : pn=2097383
- up to 2097408 : pn=2097397
- up to 2097408 : pn=2097401


### Пример окна с индикацией прогресса (wxPython)

![wxPythonDemo](images/gauge_demo.png)

In [None]:
import wx


class DownloadThread(threading.Thread):
    ''' Класс простого потока индикации прогресса.
         
        __percentage - текущий процент выполнения операции
        __update - функция, которая вызывается 
        __callback - функция, которая будет вызываться
    '''
    def __init__(self, thread_name, thread_func, callback):
        threading.Thread.__init__(self)
        self.daemon = True
        self.name = thread_name
        self.__percentage = 0
        self.__update = thread_func
        self.__callback = callback
    
    def run(self):
        while self.__percentage < 100:
            time.sleep(0.03)
            self.__percentage += 1
            self.__update(self.__percentage)
        
        self.__callback(self.name)


class MainFrame(wx.Frame):
    def __init__(self):
        wx.Frame.__init__(self, None, wx.ID_ANY, 'Threads!', size=(400, 300))
        
        panel = wx.Panel(self, -1)
        self.gauge1 = wx.Gauge(panel, -1, pos=(20,20), size=(360,40))
        self.gauge2 = wx.Gauge(panel, -1, pos=(20,80), size=(360,40))

        button_start1 = wx.Button(panel, -1, 'start thread #1', pos=(20,140),
                                 size = (170, 50) )
        button_start2 = wx.Button(panel, -1, 'start thread #2', pos=(210,140),
                                 size = (170, 50) )
        self.Bind(wx.EVT_BUTTON, self.on_start_thread1, button_start1)
        self.Bind(wx.EVT_BUTTON, self.on_start_thread2, button_start2)

        self.info_text = wx.StaticText(panel, -1, pos=(20, 230), size=(360, 20))
        self.info_text.SetLabel('Idle...')
        
    def on_start_thread1(self,event):
        self.dthread1 = DownloadThread('1', self.update_gauge1, self.download_finished)
        self.dthread1.start()
        
    def on_start_thread2(self,event):
        self.dthread2 = DownloadThread('2', self.update_gauge2, self.download_finished)
        self.dthread2.start()
        
    def update_gauge1(self, percentage):
        self.gauge1.SetValue(percentage)        
        
    def update_gauge2(self, percentage):
        self.gauge2.SetValue(percentage)
        
    def download_finished(self, thread_no):
        self.info_text.SetLabel('Thread {} has finished'.format(thread_no))


if __name__ == '__main__':
    app = wx.App()
    frame = MainFrame()
    frame.Show()
    app.MainLoop()

### GIL (Global Interpreter Lock)

Если свести к одному предложению, то: в Python CPU-bound задачи не многопоточны. 

In [5]:
# Посчитаем сумму всех чисел из диапазона [1, 20000000)
# однопоточная версия
lower = 1
upper = 20000000
 
tm1 = time.time()
 
total = 0
for i in range(lower, upper):
    total += i

tm2 = time.time()
 
print('Single-threaded version: {} sec'.format(tm2 - tm1))
print('Result: {}'.format(total))

Single-threaded version: 2.298603057861328 sec
Result: 199999990000000


In [6]:
# многопоточная версия

middle = 10000000
 
def count_sum(start, end, res):
    total = 0
    for i in range(start, end):
        total += i
    res[0] = total 


sum1 = [0] 
sum2 = [0]
 
tm1 = time.time()
 
t1 = threading.Thread(target=count_sum, args=(lower, middle, sum1)) 
t2 = threading.Thread(target=count_sum, args=(middle, upper, sum2)) 
t1.start()
t2.start()
t1.join()
t2.join()
 
total = sum1[0] + sum2[0]
 
tm2 = time.time()
 
print('Multi-threaded version: {} sec'.format(tm2 - tm1))
print('Result: {}'.format(total))

Multi-threaded version: 1.2420885562896729 sec
Result: 199999990000000


Согласно GIL, многопоточная версия не должна быть быстрее однопоточной.
А у нас результат оказался противоположным.

Причина - хитрая. В примере выше мы вызываем глобальную функцию, оперирующую глобальными переменными. Глобальные переменные хранятся в хеш-таблице.
А потоки выполняются посредством своих функций со стеком переменных, и там переменные хранятся в массивах. Скорость адресации в массиве выше скорости доступа к элементу хеш-таблицы. За счет этого получается выигрыш второй версии.

Ну а если напишем однопоточную версию с помощью функции, то получим ожидаемый результат:

In [7]:
lower = 1
upper = 20000000
 
tm1 = time.time()
 
total = [0]
count_sum(lower, upper, total)
 
tm2 = time.time()
 
print('Single-threaded version: {} sec'.format(tm2 - tm1))
print('Result: {}'.format(total[0]))

Single-threaded version: 1.2139441967010498 sec
Result: 199999990000000


Время замерять часто удобнее с помощью timeit:

In [8]:
%timeit count_sum(lower, upper, total)

1 loop, best of 3: 1.14 s per loop


In [9]:
from random import randint


def writer(x, event_for_wait, event_for_set):
    for i in range(5):
        event_for_wait.wait() # wait for event
        event_for_wait.clear() # clean event for future
        print(x)
        time.sleep(randint(1, 3) % 3)
        event_for_set.set() # set event for neighbor thread

# init events
e1 = threading.Event()
e2 = threading.Event()

# init threads
t1 = threading.Thread(target=writer, args=(0, e1, e2))
t2 = threading.Thread(target=writer, args=(1, e2, e1))

# start threads
t1.start()
t2.start()

e1.set() # initiate the first event

# join threads to the main thread
t1.join()
t2.join()

0
1
0
1
0
1
0
1
0
1


In [10]:
lock = threading.Lock()

def count_numbers(n, delay):
    with lock:
        for i in range(n):
            print(i)
            time.sleep(delay)


t1 = threading.Thread(target=count_numbers, args=(10, 1))
t2 = threading.Thread(target=count_numbers, args=(5, 2))

t1.start()
t2.start()

t1.join() 
t2.join()

0
1
2
3
4
5
6
7
8
9
0
1
2
3
4


In [11]:
# Работа с потоками через очередь Queue
# This module provides a thread-safe queue implementation. 
# It provides a convenient way of moving Python objects between different threads.

from queue import Queue


print_lock = threading.Lock()


def exampleJob(worker):
    time.sleep(.5) # pretend to do some work.
    with print_lock:
        print(threading.current_thread().name,worker)


# The threader thread pulls an worker from the queue and processes it
def threader():
    while True:
        # gets an worker from the queue
        worker = q.get()
        # Run the example job with the avail worker in queue (thread)
        exampleJob(worker)
        # completed with the job
        q.task_done()


# Create the queue and threader 
q = Queue()


# how many threads are we going to allow for
for x in range(10):
    t = threading.Thread(target=threader)
    # classifying as a daemon, so they will die when the main dies
    t.daemon = True
    # begins, must come after daemon definition
    t.start()


start = time.time()

# 20 jobs assigned.
for worker in range(20):
    q.put(worker)

# wait until the thread terminates.
q.join()

# with 10 workers and 20 tasks, with each task being .5 seconds, then the completed job
# is ~1 second using threading. Normally 20 tasks with .5 seconds each would take 10 seconds.

print('Entire job took:',time.time() - start)

Thread-19 5
Thread-20 6
Thread-21 7
Thread-18 4
Thread-17 3Thread-16
 Thread-14 20

Thread-15 1
Thread-22 8
Thread-23 9
Thread-19 10
Thread-20 11
Thread-21 12
Thread-18 13
Thread-17 14
Thread-16 15
Thread-14 16
Thread-15 17
Thread-22 18
Thread-23 19
Entire job took: 1.025883436203003


### Асинхронность в Python

In [12]:
"""
Two coroutines chained together.

The compute() coroutine is chained to the print_sum() coroutine. The
print_sum() coroutine waits until compute() is completed before it returns a
result.
"""
import asyncio


# Notice the decorator!
@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    # Pause the coroutine for 1 second by yielding from asyncio's built in
    # sleep coroutine. This simulates the time taken by a non-blocking I/O
    # call. During this time the event loop can get on with other things.
    yield from asyncio.sleep(1.0)
    # Actually return a result!
    return x + y


@asyncio.coroutine
def print_sum(x, y):
    # Pause the coroutine until the compute() coroutine has a result.
    result = yield from compute(x, y)
    # The following print() function won't be called until there's a result.
    print("%s + %s = %s" % (x, y, result))


# Reference the event loop.
loop = asyncio.get_event_loop()
# Start the event loop and continue until print_sum() is complete.
loop.run_until_complete(print_sum(10, 12))
# Shut down the event loop.
#loop.close()

Compute 10 + 12 ...
10 + 12 = 22


In [13]:
"""
Three tasks running the same factorial coroutine in parallel.
"""
import asyncio


@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))


# Instantiating tasks doesn't cause the coroutine to be run. It merely
# schedules the tasks.
tasks = [
    asyncio.Task(factorial("A", 2)),
    asyncio.Task(factorial("B", 3)),
    asyncio.Task(factorial("C", 4)),
]


# Get the event loop and cause it to run until all the tasks are done.
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24


({<Task finished coro=<factorial() done, defined at <ipython-input-13-c799d60bbb68>:7> result=None>,
  <Task finished coro=<factorial() done, defined at <ipython-input-13-c799d60bbb68>:7> result=None>,
  <Task finished coro=<factorial() done, defined at <ipython-input-13-c799d60bbb68>:7> result=None>},
 set())