In [1]:
# простой Python процесс

import time
import os

pid = os.getpid()

while True:
    print(pid, time.time())
    time.sleep(2)

7684 1525627761.394312
7684 1525627763.3949795
7684 1525627765.3951387
7684 1525627767.3960238
7684 1525627769.396175
7684 1525627771.3970401
7684 1525627773.3980389
7684 1525627775.398619
7684 1525627777.398928
7684 1525627779.3996093
7684 1525627781.4005213
7684 1525627783.4013705
7684 1525627785.4022417


KeyboardInterrupt: 

In [2]:
# Создание процесса на Python

import time
import os

pid = os.fork()
if pid == 0:
    # дочерний процесс
    while True:
        print("child:", os.getpid())
        time.sleep(5)
else:
    # родительский процесс
    print("parent:", os.getpid())
    os.wait()

AttributeError: module 'os' has no attribute 'fork'

In [5]:
# Создание процесса, модуль multiprocessing

from multiprocessing import Process

def f(name):
    print("hello", name)

p = Process(target=f, args=("Bob",))
p.start()
p.join()

In [6]:
# Создание процесса, модуль multiprocessing

from multiprocessing import Process

class PrintProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print("hello", self.name)

p = PrintProcess("Mike")
p.start()
p.join()

In [7]:
# Создание потока

from threading import Thread

def f(name):
    print("hello", name)

th = Thread(target=f, args=("Bob",))
th.start()
th.join()

hello Bob


In [8]:
# Создание потока

from threading import Thread

class PrintThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print("hello", self.name)

th = PrintThread("Mike")
th.start()
th.join()

hello Mike


In [15]:
# Пул потоков, concurrent.futures.Future

from concurrent.futures import ThreadPoolExecutor, as_completed

def f(a):
    return a * a

# .shutdown() in exit
with ThreadPoolExecutor(max_workers=3) as pool:
    results = [pool.submit(f, i) for i in range(10)]

    for future in as_completed(results):
        print(future.result())

4
0
1
9
16
25
36
49
64
81


# Синхронизация потоков
Очереди
Блокировки
Условные переменные
В многопоточной программе доступ к объектам иногда нужно синхронизировать.
Часто для синхронизации потоков используют блокировки.
Любые блокировки замедляют выполнение программы.

Лучше избегать использование блокировок 
и отдавать предпочтение обмену данными через очереди.

Использование очередей для потоков выглядит как показано на слайде.
Создаем очередь с максимальным размером 5.
Используем методы put() для того чтобы поместить данные в очередь
и get() для того чтобы забрать данные из очереди

Использование очередей делает код выполняемой программы более простым.
И по возможности лучше разрабатывать код таким образом,
чтобы не было глобального разделяемого ресурса, или состояния.

In [16]:
# Очереди, модуль queue
from queue import Queue
from threading import Thread

def worker(q, n):
    while True:
        item = q.get()
        if item is None:
            break
        print("process data:", n, item)

q = Queue(5)
th1 = Thread(target=worker, args=(q, 1))
th2 = Thread(target=worker, args=(q, 2))
th1.start(); th2.start()

for i in range(50):
    q.put(i)

q.put(None); q.put(None)
th1.join(); th2.join()

process data:process data: 1 0
process data: 1 2
process data: 1 3
process data: 1 4
process data: 1 5
process data: 1 6
process data: 1 7
process data: 1 8
process data: 1 9
process data: 1 10
process data: 1 11
process data:  1 12
process data: 12 1
process data: 2 14
process data: 2 15
process data: 2 16
process data: 2 17
process data: 2 18
 13
process data: 1 19process data: 2 20
process data: 2 21
process data: 2 22

process data: 2 23
process data: 1 24
process data: 1 25
process data: 1 26
process data: 1 27
process data: 1 28
process data: 1 29
process data: 1 30
process data: 1 31
process data: 1 32
process data: 1 33
process data: 1 34process data: 2 35
process data: 2 36
process data: 
process data: 1 38
process data: 1 39
process data: 1 40
process data: 1 41
process data: 1 42
process data: 1 43
process data: 1 44
process data: 1 45
2 37
process data: 2 46
process data: 2 process data:47
process data: 2  1 48
49


In [18]:
# Синхронизация потоков, race condition

import threading

class Point(object):
    def __init__(self, x, y):
        self.set(x, y)

    def get(self):
        return (self.x, self.y)

    def set(self, x, y):
        self.x = x
        self.y = y

# use in threads
my_point = Point(10, 20)
my_point.set(15, 10)
my_point.get()

(15, 10)

Этот код гарантирует что если объект класса Point будет использоваться в разных потоках,
то изменение x и y будет всегда атомарным.

Работает все это так: - при вызове метода берем блокировку через with self._mutex
Весь код внутри with блока будет выполнятся только в одном потоке.

Другими словами, если два разных потока вызовут .get то пока первый поток не выйдет из блока 
второй будет его ждать - и только потом продолжит выполнение.

Зачем это все нужно? Координаты нужно менять одновременно - ведь точка это атомарный объект.
Если позволить одному потоку поменять x, а другой в это же время поправит y
логика алгоритма может сломаться.

In [19]:
# Синхронизация потоков, блокировки

import threading

class Point(object):
    def __init__(self, x, y):
        self.mutex = threading.RLock()
        self.set(x, y)

    def get(self):
        with self.mutex:
            return (self.x, self.y)

    def set(self, x, y):
        with self.mutex:
            self.x = x
            self.y = y

# use in threads
my_point = Point(10, 20)
my_point.set(15, 10)
my_point.get()

(15, 10)

In [20]:
# Синхронизация потоков, блокировки

import threading


a = threading.RLock()
b = threading.RLock()

def foo():
    try:
        a.acquire()
        b.acquire()
    finally:
        a.release()
        b.release()

In [21]:
# Синхронизация потоков, условные переменные

class Queue(object):
    def __init__(self, size=5):
        self._size = size
        self._queue = []
        self._mutex = threading.RLock()
        self._empty = threading.Condition(self._mutex)
        self._full = threading.Condition(self._mutex)
    
    def put(self, val):
        with self._full:
            while len(self._queue) >= self._size:
                self._full.wait()
            
            self._queue.append(val)
            self._empty.notify()

    def get(self):
        with self._empty:
            while len(self._queue) == 0:
                self._empty.wait()
            
            ret = self._queue.pop(0)
            self._full.notify()
            return ret

# asyncio

In [28]:
import asyncio
import aiohttp

async def simulate_io():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://python.org') as resp:
            await resp.text()

async def coro(name):
    await simulate_io()
    print(f'{name}-1')
    await simulate_io()
    print(f'{name}-2')

async def main():
    await asyncio.gather(coro('A'), coro('B'))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

A-1
B-1
B-2
A-2
