并发: 假同时 一段时间内同时处理多个任务  单核也可以并发

并行: 真同时 同时处理多个任务  必须要多核

Python中实现并发的手段有哪些？

主流操作系统:

* 线程
* 进程

主流的语言通常提供用户空间的调度： 协程

## 线程

In [1]:
import threading

In [2]:
def worker():
    print('work')

In [3]:
thread = threading.Thread(target=worker)  # 创建线程对象 target参数是一个函数， 这个函数即线程要执行的逻辑

In [4]:
thread.start() # start 方法启动一个线程， 当这个线程的逻辑执行完毕的时候，线程自动退出, Python 没有提供主动退出线程的方法

work


所以一定要注意线程的退出

In [6]:
import time

def worker(num):
    time.sleep(1)
    print('worker-{}'.format(num))
    

In [7]:
for x in range(5):
    t = threading.Thread(target=worker, args=(x, ))
    t.start()

worker-0worker-1

worker-2
worker-3worker-4



如何标识一个线程

In [10]:
threading.current_thread()  # 返回当前线程

<_MainThread(MainThread, started 140664094971712)>

In [12]:
threading.Thread(target=lambda:print(threading.current_thread())).start()

<Thread(Thread-10, started 140663182055168)>


In [13]:
thread = threading.current_thread()

In [16]:
thread.is_alive()

True

### logging

In [33]:
import logging
import importlib

importlib.reload(logging)

<module 'logging' from '/home/comyn/.pyenv/versions/3.5.2/lib/python3.5/logging/__init__.py'>

In [36]:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

In [37]:
logging.warning('haha')



In [38]:
def worker(num):
    logging.warning('worker-{}'.format(num))

In [39]:
for x in range(5):
    t = threading.Thread(target=worker, args=(x, ))
    t.start()



通常会用logging来替代print

### 参数

In [40]:
def add(x, y):
    logging.info(x + y)

In [41]:
add(1, 2)

2017-03-11 09:43:05,490 INFO MainThread 3


In [42]:
add(x=1, y=2)

2017-03-11 09:43:15,870 INFO MainThread 3


In [44]:
threading.Thread(target=add, args=(1, 2)).start()

2017-03-11 09:43:51,371 INFO Thread-22 3


In [46]:
threading.Thread(target=add, kwargs={'x':1, 'y':2}).start()

2017-03-11 09:44:35,228 INFO Thread-23 3


In [47]:
threading.Thread(target=add, args=(1, ), kwargs={'y': 2}).start()

2017-03-11 09:45:04,394 INFO Thread-24 3


通过args参数传递位置参数， 通过kwargs传递关键字参数

### 控制线程名字

In [48]:
threading.Thread(target=add, args=(1, 2), name='add').start()

2017-03-11 09:46:58,173 INFO add 3


通过name参数控制线程名字

In [49]:
def worker():
    logging.info('starting')
    time.sleep(2)
    logging.info('completed')

In [50]:
t1 = threading.Thread(target=worker, name='worker')
t2 = threading.Thread(target=worker, name='worker')

In [51]:
t1.start()
t2.start()

2017-03-11 09:51:41,392 INFO worker starting
2017-03-11 09:51:41,394 INFO worker starting
2017-03-11 09:51:43,398 INFO worker completed
2017-03-11 09:51:43,401 INFO worker completed


线程可以重名, 线程名并不是线程的唯一标识，但是通常应该避免线程重名，通常的处理手段是加前缀

In [53]:
t1 == t2

False

### daemon 与 non-daemon

In [62]:
t = threading.Thread(target=worker, daemon=True)

In [63]:
t.start()

2017-03-11 10:34:00,259 INFO Thread-27 starting
2017-03-11 10:34:02,267 INFO Thread-27 completed


In [64]:
t = threading.Thread(target=worker)

In [65]:
t.start()

2017-03-11 10:34:25,974 INFO Thread-28 starting
2017-03-11 10:34:27,987 INFO Thread-28 completed


线程退出时,其daemon子线程也会退出， 而non-daemon子线程不会退出

In [66]:
threading.current_thread().is_alive()

True

线程退出会等待所有的non-daemon子线程退出

join方法会阻塞直到线程退出或者超时, timeout 是可选的，如果不设置timeout， 会一直等待线程退出

In [67]:
threading.enumerate() # 获取所有线程

[<_MainThread(MainThread, started 140664094971712)>,
 <Thread(Thread-2, started daemon 140663735711488)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 140663702140672)>,
 <Heartbeat(Thread-3, started daemon 140663727318784)>,
 <ParentPollerUnix(Thread-1, started daemon 140663693223680)>]

In [68]:
for t in threading.enumerate():
    print(t.name)

MainThread
Thread-2
IPythonHistorySavingThread
Thread-3
Thread-1


通过实例化Thread类

In [69]:
class MyThread(threading.Thread):
    def run(self):
        logging.info('run')

In [70]:
t = MyThread()

In [71]:
t.start()

2017-03-11 11:11:31,127 INFO Thread-29 run


Python中通常不使用这种方法

In [72]:
t.run()

2017-03-11 11:12:23,761 INFO MainThread run


In [76]:
t = threading.Thread(target=worker)

In [78]:
t.run()

AttributeError: _target

In [77]:
t.start()

2017-03-11 11:13:54,060 INFO Thread-31 starting
2017-03-11 11:13:56,069 INFO Thread-31 completed


如果不是以继承的方式创建线程， run方法和start方法只能执行其中一个

### thread local

In [81]:
ctx = threading.local()

In [82]:
ctx.data = 5

In [83]:
ctx.data

5

In [86]:
data = 'abc'

In [88]:
def worker():
    logging.info(data)
    logging.info(ctx.data)

In [89]:
worker()

2017-03-11 11:18:51,450 INFO MainThread abc
2017-03-11 11:18:51,453 INFO MainThread 5


In [91]:
threading.Thread(target=worker).start()

2017-03-11 11:20:03,536 INFO Thread-33 abc
Exception in thread Thread-33:
Traceback (most recent call last):
  File "/home/comyn/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/home/comyn/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-88-c46dda5d9c3f>", line 3, in worker
    logging.info(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data'



thread local 对象的属性， 只在当前线程可见

### 定时器/延迟执行

In [95]:
def worker():
    logging.info('run')

In [96]:
t = threading.Timer(interval=5, function=worker)

In [97]:
t.start()

2017-03-11 11:34:40,438 INFO Thread-35 run


In [98]:
t.is_alive()

False

In [100]:
t = threading.Timer(interval=5, function=worker)

In [101]:
t.name = 'worker'

In [103]:
t.daemon = True

In [104]:
t.start()

2017-03-11 11:37:07,010 INFO worker run


In [115]:
t = threading.Timer(interval=300, function=worker)

In [116]:
t.name = 'worker'

In [117]:
t.start()

In [119]:
t.cancel()

In [120]:
for x in threading.enumerate():
    print(x.name)

MainThread
Thread-2
IPythonHistorySavingThread
Thread-3
Thread-1


In [121]:
t.is_alive()

False

In [122]:
def worker():
    logging.info('starting')
    time.sleep(30)
    logging.info('completed')

In [123]:
t = threading.Timer(interval=0, function=worker)

In [124]:
t.start()

2017-03-11 11:43:37,557 INFO Thread-41 starting


In [125]:
t.cancel()

In [126]:
t.is_alive()

True

2017-03-11 11:44:07,564 INFO Thread-41 completed


当function参数所指定函数开始执行的时候， cancel无效

## 同步

In [128]:
import random
import datetime

In [139]:
def worker(event: threading.Event):
    s = random.randint(1, 5)
    #time.sleep(s)
    event.wait(s)
    event.set()
    logging.info('sleep {}'.format(s))
    

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.info('worker exit {}'.format(datetime.datetime.now() - start))




In [130]:
event = threading.Event()

In [131]:
event.set()

In [132]:
event.wait()

True

wait会阻塞线程直到set方法被调用或者超时

In [141]:
def start():
    event = threading.Event()
    b = threading.Thread(target=boss, args=(event, ), name='boss')
    b.start()
    for x in range(5):
        threading.Thread(target=worker, args=(event, ), name='worker-{}'.format(x)).start()

In [142]:
start()

2017-03-11 14:26:08,193 INFO worker-2 sleep 1
2017-03-11 14:26:08,194 INFO boss worker exit 0:00:01.002193
2017-03-11 14:26:08,194 INFO worker-1 sleep 3
2017-03-11 14:26:08,194 INFO worker-3 sleep 4
2017-03-11 14:26:08,194 INFO worker-0 sleep 5
2017-03-11 14:26:08,195 INFO worker-4 sleep 3


event可以在线程之间发送信号

通常用于某个线程需要等待其他线程处理完成某些动作之后才能启动

In [143]:
event = threading.Event()

In [144]:
event.wait(1)

False

In [145]:
def worker(event: threading.Event):
    while not event.wait(3):
        logging.info('run run run')

In [146]:
event = threading.Event()

In [147]:
threading.Thread(target=worker, name='printer', args=(event, )).start()

2017-03-11 14:37:42,351 INFO printer run run run
2017-03-11 14:37:45,353 INFO printer run run run
2017-03-11 14:37:48,356 INFO printer run run run
2017-03-11 14:37:51,360 INFO printer run run run


In [148]:
event.set()

In [149]:
event.is_set()

True

In [150]:
event.clear()

In [151]:
event.is_set()

False

In [152]:
def worker(event: threading.Event):
    while not event.is_set():
        # biz
        pass

In [157]:
class Timer:
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.event = threading.Event()
        self.thread = threading.Thread(target=self.__target)
    
    def __target(self):
        if not self.event.wait(self.interval):
            self.function(*self.args, **self.kwargs)
    
    def start(self):
        self.thread.start()
    
    def cancel(self):
        self.event.set()

In [154]:
def worker():
    logging.info('run')

In [158]:
t = Timer(interval=5, function=worker)

In [159]:
t.start()

2017-03-11 15:07:23,525 INFO Thread-43 run


event 用于线程之间发送信号

### lock

In [160]:
class Counter:
    def __init__(self):
        self.__val = 0
    
    @property
    def value(self):
        return self.__val
    
    def inc(self):
        self.__val += 1 # self.__val = self.__val + 1
    
    def dec(self):
        self.__val -= 1 # self.__val = self.__val - 1

In [161]:
counter = Counter()

In [168]:
def fn():
    if random.choice([-1, 1]) > 0:
        logging.info('inc')
        counter.inc()
    else:
        logging.info('dec')
        counter.dec()

In [169]:
for x in range(10):
    threading.Thread(target=fn).start()

2017-03-11 15:15:55,083 INFO Thread-54 dec
2017-03-11 15:15:55,100 INFO Thread-55 inc
2017-03-11 15:15:55,104 INFO Thread-56 inc
2017-03-11 15:15:55,105 INFO Thread-57 inc
2017-03-11 15:15:55,105 INFO Thread-58 dec
2017-03-11 15:15:55,106 INFO Thread-59 dec
2017-03-11 15:15:55,106 INFO Thread-60 inc
2017-03-11 15:15:55,107 INFO Thread-61 inc
2017-03-11 15:15:55,109 INFO Thread-62 dec
2017-03-11 15:15:55,110 INFO Thread-63 inc


In [171]:
counter.value  # 不知道的

3

In [172]:
lock = threading.Lock()

In [173]:
lock.acquire()

True

In [174]:
lock.acquire()

KeyboardInterrupt: 

对于lock实例，只能调用一次acquire方法， 再次调用会被阻塞， 直到release方法被调用

In [175]:
lock.release()

In [176]:
lock.acquire()

True

In [182]:
class Counter:
    def __init__(self):
        self.__val = 0
        self._lock = threading.Lock()
    
    @property
    def value(self):
        with self._lock:
            return self.__val
    
    def inc(self):
        with self._lock:
            self.__val += 1 # self.__val = self.__val + 1
    
    def dec(self):
        with self._lock:
            self.__val -= 1 # self.__val = self.__val - 1

In [183]:
counter = Counter()

In [184]:
def fn():
    if random.choice([-1, 1]) > 0:
        logging.info('inc')
        counter.inc()
    else:
        logging.info('dec')
        counter.dec()

In [185]:
for x in range(10):
    threading.Thread(target=fn).start()

2017-03-11 15:37:06,958 INFO Thread-64 inc
2017-03-11 15:37:06,959 INFO Thread-65 inc
2017-03-11 15:37:06,960 INFO Thread-66 dec
2017-03-11 15:37:06,960 INFO Thread-67 inc
2017-03-11 15:37:06,961 INFO Thread-68 dec
2017-03-11 15:37:06,969 INFO Thread-69 inc
2017-03-11 15:37:06,974 INFO Thread-70 inc
2017-03-11 15:37:06,976 INFO Thread-71 inc
2017-03-11 15:37:06,978 INFO Thread-72 dec
2017-03-11 15:37:06,982 INFO Thread-73 dec


In [186]:
counter.value

2

何时需要加锁？

In [187]:
lock = threading.Lock()

In [188]:
lock.acquire()

True

In [191]:
lock.acquire(blocking=False) # 当再次加锁时， 如果blocking为False， 那么并不会阻塞，而是返回False

False

In [193]:
lock.acquire(timeout=3)# 如果blocking为True， timeout >=0 会阻塞到超时，并返回False

False

预先启动10个线程，处理一些任务，当其中一个线程在处理其中一个任务时， 其他线程可以处理其他任务

In [199]:
def worker(tasks):
    for task in tasks:
        if task.lock.acquire(False):
#             try:
            logging.info(task.name)
#             finally:
#                 task.lock.release()

In [195]:
class Task:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()

In [196]:
tasks = [Task(x) for x in range(10)]

In [200]:
for x in range(5):
    threading.Thread(target=worker, args=(tasks, ), name='worker-{}'.format(x)).start()

2017-03-11 16:03:59,289 INFO worker-0 0
2017-03-11 16:03:59,290 INFO worker-1 1
2017-03-11 16:03:59,292 INFO worker-2 2
2017-03-11 16:03:59,315 INFO worker-2 7
2017-03-11 16:03:59,295 INFO worker-4 4
2017-03-11 16:03:59,298 INFO worker-0 5
2017-03-11 16:03:59,302 INFO worker-1 6
2017-03-11 16:03:59,294 INFO worker-3 3
2017-03-11 16:03:59,317 INFO worker-2 8
2017-03-11 16:03:59,319 INFO worker-4 9


锁是不可重入

#### 可重入锁

In [201]:
rlock = threading.RLock()

In [202]:
rlock.acquire()

True

In [203]:
rlock.acquire(False)

True

In [204]:
rlock.release()

In [205]:
rlock.release()

可重入锁在同一个线程内， 可以多次acquire成功，但是只能有一个线程acquire成功， acquire几次，就需要release几次

### Condition

In [2]:
import threading
import logging
import importlib
importlib.reload(logging)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

In [76]:
import random


class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()
    
    def consumer(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()
                logging.info(self.data)
    
    def producer(self):
        for _ in range(10):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify()
            self.event.wait(1)
        self.event.set()

In [77]:
d = Dispatcher()

In [78]:
p = threading.Thread(target=d.producer, name='producer')

In [79]:
for x in range(4):
    threading.Thread(target=d.consumer, name='consumer-{}'.format(x)).start()

In [80]:
p.start()

2017-03-11 17:19:32,366 INFO [producer] 95
2017-03-11 17:19:32,371 INFO [consumer-0] 95
2017-03-11 17:19:33,372 INFO [producer] 35
2017-03-11 17:19:33,380 INFO [consumer-1] 35
2017-03-11 17:19:34,384 INFO [producer] 49
2017-03-11 17:19:34,386 INFO [consumer-2] 49
2017-03-11 17:19:35,387 INFO [producer] 99
2017-03-11 17:19:35,391 INFO [consumer-3] 99
2017-03-11 17:19:36,391 INFO [producer] 64
2017-03-11 17:19:36,394 INFO [consumer-0] 64
2017-03-11 17:19:37,395 INFO [producer] 39
2017-03-11 17:19:37,398 INFO [consumer-1] 39
2017-03-11 17:19:38,398 INFO [producer] 18
2017-03-11 17:19:38,401 INFO [consumer-2] 18
2017-03-11 17:19:39,401 INFO [producer] 18
2017-03-11 17:19:39,403 INFO [consumer-3] 18
2017-03-11 17:19:40,404 INFO [producer] 77
2017-03-11 17:19:40,413 INFO [consumer-0] 77
2017-03-11 17:19:41,413 INFO [producer] 33
2017-03-11 17:19:41,415 INFO [consumer-1] 33


Condition 通常用于生产者消费者模式， 生产者生产消息之后， 使用notify 或者 notify_all 通知消费者消费

消费者使用wait方法阻塞等待生产者通知

notify通知指定个wait的线程， notify_all通知所有的wait线程

无论notify/notify_all还是wait 都必须先acqurie， 完成后必须确保release， 通常使用with语法