## 同步

总结：

   * Event通常用于线程之间事件通知
   * Lock通常用于保护共享资源
   * Condition通常用于生产者消费者模型


In [9]:
import time 
import logging
import threading
import importlib

importlib.reload(logging)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s  [%(threadName)s] %(message)s')

In [10]:
logging.info('xixi')

2017-04-18 07:20:05,869 INFO  [MainThread] xixi


In [12]:
import random
import datetime

In [13]:
## 先看一个例子:
## worker做一些事情，当worker完成的时候，通知boss完成了，由boss来处理后续工作
## 假设5个worker,1个boss.其中一个worker退出时，boss要计算它sleep了多长时间
## 这就需要线程间的通信机制了

def worker():
    time.sleep(random.randint(1,5))

def boss():
    start = datetime.datetime.now()
    logging.info('worker exit {}'.format(datetime.datetime.now()-start))

In [14]:
event = threading.Event()        ##  最简单的线程通信机制就是event
                                ##  event 是一个threading.Event的对象！

In [15]:
event.set()        ## 它有set 和 wait 这两个方法

In [16]:
event.wait()      ## 注：即使是超时，也是返回False，只有被set了，才会返回True!!

True

用处：wait 会阻塞线程，直到set 方法被调用或者超时！理解这句话，就好了！

**但是这句话与是不是返回True，没关系；set 过的返回 True **

In [67]:
def worker(event: threading.Event):   ## 这种定义函数的方式没见过呢？谁说没见过？讲函数
    time.sleep(random.randint(2,5))   ## 时候的类型注解！ 这里传递一个event的对象
    event.set()         ## 调用 set  方法

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.info('worker exit {}'.format(datetime.datetime.now() - start))

函数定义的类型注解：http://blog.csdn.net/qbw2010/article/details/45390449

In [70]:
def start():
    event = threading.Event()   ## event 是 threading.Event 的一个实例
    b = threading.Thread(target=boss, args=(event, ),name='boss')
    b.start()    ## 当一启动，就会阻塞了吧 因为boss里有event.wait()
    for x in range(5):  ## 假设5个worker,1个boss.其中一个worker退出时
                         ## boss要计算它sleep了多长时间  
        ## 5次随机，谁先结束 ，谁先调用set 方法 ，就不等待了，然后logging出了等待多长时间
        threading.Thread(target=worker, args=(event, ),name='worker').start()

In [80]:
start()           ## 随机了3秒多才出来！    

2017-04-18 08:33:52,412 INFO  [boss] worker exit 0:00:03.005121


In [82]:
start()      ## 再试，随机了4秒才出来!

2017-04-18 08:34:20,834 INFO  [boss] worker exit 0:00:04.043009


wait会阻塞当前线程，直到它的set方法被调用。

但是以上，发现一个问题，无论如何，这5个线程它都是至少要等待下去的(因为使用的是sleep)；有没有一个办法，其中任何一个完成，就完成呢？

In [84]:
## 先这么来看一看：就验证了是这5个线程都执行下去了，但是发现第1个线程结束，boss就退出

def worker(event: threading.Event):
    s = random.randint(2,5)
    time.sleep(s)
    event.set()     
    logging.info('sleep {}'.format(s))

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.info('worker exit {}'.format(datetime.datetime.now() - start))

In [85]:
def start():
    event = threading.Event()
    b = threading.Thread(target=boss, args=(event, ),name='boss')
    b.start()
    for x in range(5):  
        threading.Thread(target=worker, args=(event, ),name='worker-{}'.format(x)).start()

In [86]:
start()      ## 差不多3秒的时候，boss就退出了 ，但是其它的4个worker确实都执行到了最后

2017-04-18 09:02:46,480 INFO  [worker-1] sleep 3
2017-04-18 09:02:46,481 INFO  [boss] worker exit 0:00:03.005539
2017-04-18 09:02:46,482 INFO  [worker-4] sleep 3
2017-04-18 09:02:46,482 INFO  [worker-3] sleep 3
2017-04-18 09:02:46,482 INFO  [worker-2] sleep 3
2017-04-18 09:02:48,482 INFO  [worker-0] sleep 5


In [88]:
## 再观察一次
start()

2017-04-18 09:04:29,999 INFO  [worker-0] sleep 2
2017-04-18 09:04:29,999 INFO  [boss] worker exit 0:00:02.003690
2017-04-18 09:04:30,000 INFO  [worker-3] sleep 2
2017-04-18 09:04:30,000 INFO  [worker-1] sleep 2
2017-04-18 09:04:33,001 INFO  [worker-2] sleep 5
2017-04-18 09:04:33,004 INFO  [worker-4] sleep 5


In [89]:
start()        ## 整个start完成，需要大约5秒时间

2017-04-18 09:04:40,590 INFO  [worker-1] sleep 2
2017-04-18 09:04:40,592 INFO  [worker-4] sleep 2
2017-04-18 09:04:40,596 INFO  [boss] worker exit 0:00:02.009136
2017-04-18 09:04:40,596 INFO  [worker-3] sleep 2
2017-04-18 09:04:43,590 INFO  [worker-2] sleep 5
2017-04-18 09:04:43,591 INFO  [worker-0] sleep 5


有没有办法更快一些呢？

In [90]:
def worker(event: threading.Event):
    s = random.randint(2,5)
    ## time.sleep(s)         ## 这里改了，把time.sleep改了，被event.sait()属性代替了
    event.wait(s)           ## 改成 event.wait 并带一个timeout的属性，当 timeout到时的时候
    event.set()                                 ## 不管有没有 set，它都会执行
    logging.info('sleep {}'.format(s))

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.info('worker exit {}'.format(datetime.datetime.now() - start))

In [91]:
def start():
    event = threading.Event()
    b = threading.Thread(target=boss, args=(event, ),name='boss')
    b.start()
    for x in range(5):  
        threading.Thread(target=worker, args=(event, ),name='worker-{}'.format(x)).start()

In [92]:
start()          ## 再看这里，几乎是同一个时刻退出的！！

2017-04-18 09:08:02,315 INFO  [worker-0] sleep 3
2017-04-18 09:08:02,315 INFO  [worker-3] sleep 4
2017-04-18 09:08:02,316 INFO  [worker-4] sleep 3
2017-04-18 09:08:02,316 INFO  [worker-2] sleep 5
2017-04-18 09:08:02,316 INFO  [boss] worker exit 0:00:03.001883
2017-04-18 09:08:02,317 INFO  [worker-1] sleep 3


In [93]:
start()

2017-04-18 09:08:13,674 INFO  [worker-1] sleep 2
2017-04-18 09:08:13,675 INFO  [boss] worker exit 0:00:02.001894
2017-04-18 09:08:13,675 INFO  [worker-3] sleep 3
2017-04-18 09:08:13,676 INFO  [worker-4] sleep 4
2017-04-18 09:08:13,676 INFO  [worker-0] sleep 4
2017-04-18 09:08:13,676 INFO  [worker-2] sleep 2


In [94]:
start()   ## 经过观察，几乎在同一个时刻退出的

2017-04-18 09:08:31,443 INFO  [worker-1] sleep 2
2017-04-18 09:08:31,443 INFO  [worker-2] sleep 2
2017-04-18 09:08:31,444 INFO  [boss] worker exit 0:00:02.002042
2017-04-18 09:08:31,444 INFO  [worker-3] sleep 3
2017-04-18 09:08:31,444 INFO  [worker-0] sleep 3
2017-04-18 09:08:31,444 INFO  [worker-4] sleep 2


wait 会阻塞线程，直到set 方法被调用或者超时！而wait和set是可以在**不同线程**里去调用的！

理解这一句话好了！就知道是干什么用的了！

是可以有多个线程持有wait的，其中一个线程set的话，wait就都结束了！

event可以在线程之间发送信号

通常用于某个线程需要等待其它线程处理完成某些动作之后，才能启动

In [95]:
event = threading.Event()   ## 再定义一个event

In [96]:
event.wait(1)    ## 这里返回的是False(1秒后超时了)  但是在上面,先set的时候，返回的则是True
                                                      

False

根据这个，我们又可以做一件事情！定时的定期的做一件事情

In [3]:
## 注：not False 的结果是 True 

In [4]:
not False       ## 在下面的while里，大致就是这意思

True

In [97]:
def worker(event: threading.Event):
    while not event.wait(3):  # 这里的event在它超时的时候返回的是false  这是一种技巧(判断是因为超时的，还是因为wait的)
    ## while True:          ## 注释的两行，是可以替代while not event.wait(3)这个的
        logging.info('run run run')
        ## time.sleep(3)

In [98]:
event = threading.Event()

In [99]:
threading.Thread(target=worker, name='printer',args=(event,)).start()

2017-04-18 14:11:16,902 INFO  [printer] run run run
2017-04-18 14:11:19,905 INFO  [printer] run run run
2017-04-18 14:11:22,909 INFO  [printer] run run run
2017-04-18 14:11:25,911 INFO  [printer] run run run
2017-04-18 14:11:28,914 INFO  [printer] run run run
2017-04-18 14:11:31,918 INFO  [printer] run run run
2017-04-18 14:11:34,921 INFO  [printer] run run run
2017-04-18 14:11:37,925 INFO  [printer] run run run
2017-04-18 14:11:40,927 INFO  [printer] run run run


In [101]:
event.set()    ## 以上，每3秒，打印一次; set后，wait(3)就返回True,not True后就退出了！
               ## 当没有set的时候，会一直循环下去

定时的做任务的时候，就可以这样做，用这样的技巧，理解了吗？

还有一种技巧：

In [102]:
event.is_set()       ## 查看event这个对象有没有set 过 ，这里是被set过了

True

In [103]:
event.clear()         ## clear方法是清除set标志

In [104]:
event.is_set()        ## OK,这就可以做线程退出的判断条件了

False

In [105]:
## 注：这个常用！！  控制线程退出的机制

def worker(event: threading.Event):
    while not event.is_set():
        ## 业务逻辑
        pass                  ## 这个就是通常我们常用的控制我们线程退出的机制！
                               ## 可以控制线程何时退出了  （利用 set 就可以退出了）

注：event是线程安全的，(只要是同一个对象)无论在哪个线程里set，它的wait都会结束。

问：event.wait(3) 和 sleep(3) 有什么区别么？

wait会主动让出时间片，sleep不会主动让出时间片；所以多线程编程都会使用wait而忘记sleep

In [19]:
help(e.wait)

Help on method wait in module threading:

wait(timeout=None) method of threading.Event instance
    Block until the internal flag is true.
    
    If the internal flag is true on entry, return immediately. Otherwise,
    block until another thread calls set() to set the flag to true, or until
    the optional timeout occurs.
    
    When the timeout argument is present and not None, it should be a
    floating point number specifying a timeout for the operation in seconds
    (or fractions thereof).
    
    This method returns the internal flag on exit, so it will always return
    True except if a timeout is given and the operation times out.



练习实现Timer

In [1]:
import time 
import logging
import threading
import importlib

importlib.reload(logging)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s  [%(threadName)s] %(message)s')

In [2]:
class Timer:
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.event = threading.Event()
        self.thread = threading.Thread(target=self.__target)
        
    def __target(self):
        if not self.event.wait(self.interval):      ## 关键点就在这里  
            self.function(*self.args, **self.kwargs) ## 如果cancel了，就退出了，不执行这里了
            
    def start(self):
        self.thread.start()
    
    def cancel(self):
        self.event.set()

In [3]:
def worker():
    logging.info('run')             

## Timer这两段代码已明白    这就是 event 的妙用！

In [7]:
t = Timer(interval=5, function=worker)

In [8]:
t.start()   ## 5秒后出来

2017-04-19 08:09:17,142 INFO  [Thread-5] run


In [9]:
t2 = Timer(interval=10, function=worker)

In [10]:
t2.start()

In [11]:
t2.cancel()     ## 10 秒内停止，就调用了 event.set属性了

In [12]:
t3 =  Timer(interval=10, function=worker)

In [13]:
t3.start()    ## 10秒后输出了

2017-04-19 08:24:12,443 INFO  [Thread-7] run


总结：event用于线程之间发送信号。

### lock

并发的难点，其实就在于锁！Python的技巧算是相对比较少的。 

现在有倒是有无锁编程，能提高性能。

搞定了锁，就搞定了并发。锁是并发变串行的

在需要对共享资源进行操作的时候是多线程串行，但逻辑部分还是并发执行的

下面开始锁

In [14]:
## 看一个计数的类

In [15]:
class Counter:
    def __init__(self):
        self.__val = 0
        
    @property
    def value(self):
        return self.__val
    
    def inc(self):
        self.__val += 1  ## 相当于 self.__val = self.__val + 1
        
    def dec(self):
        self.__val -= 1  ## 相当于 self.__val = self.__val - 1

In [16]:
counter = Counter()

In [17]:
counter.inc()

In [18]:
counter.inc()

In [19]:
counter.dec()

In [20]:
counter.value

1

In [None]:
## 以上，这在单线程的情况下是OK的。

In [2]:
import random

In [22]:
def fn():
    if random.choice([-1,1]) > 0:
        logging.info('inc')
        counter.inc()
    else:
        logging.info('dec')
        counter.dec()

In [26]:
for x in range(10):               ## 启动10个线程
    threading.Thread(target=fn).start()

2017-04-19 08:48:12,654 INFO  [Thread-28] dec
2017-04-19 08:48:12,655 INFO  [Thread-29] inc
2017-04-19 08:48:12,656 INFO  [Thread-30] inc
2017-04-19 08:48:12,657 INFO  [Thread-31] dec
2017-04-19 08:48:12,657 INFO  [Thread-32] inc
2017-04-19 08:48:12,658 INFO  [Thread-33] dec
2017-04-19 08:48:12,660 INFO  [Thread-34] inc
2017-04-19 08:48:12,661 INFO  [Thread-35] inc
2017-04-19 08:48:12,661 INFO  [Thread-36] inc
2017-04-19 08:48:12,662 INFO  [Thread-37] inc


In [27]:
counter.value     ## 多次执行后的结果
                  ## 事实上，是不知道的！为什么不知道呢？因为要考虑并发的问题
                  ## 这里进程数太小，偶然性比较大

-3

如何让它必然的，有确定的成果？

涉及到对共享资源的保护！

在这里，counter对象，就是一个共享资源，因为每个线程都会对它操作；再细一点，counter的__val是共享资源 ，所以共享资源是需要被保护的

如何保护呢？ 用lock

In [28]:
lock = threading.Lock()

In [29]:
lock.acquire()       ## acquire  得到，获得

True

In [30]:
lock.acquire()    ## 再执行，它就阻塞了  需要手动中断的！

KeyboardInterrupt: 

对于lock实例，只能调用一次acquire方法，再次调用，会被阻塞，直到release方法被调用。

In [31]:
lock.release()    ## 释放 lock 

In [32]:
lock.acquire()

True

根据这个特征，改造counter

In [35]:
class Counter:
    def __init__(self):
        self.__val = 0
        self._lock = threading.Lock()     ## 定义一个锁的对象
        
    @property
    def value(self):
        return self.__val
    
    def inc(self):
        try:
            self._lock.acquire()
            self.__val += 1 
        finally:
            self._lock.release()  ## 加上try 和 finally 就永远都能执行到release 释放锁
        
    def dec(self):
        try:
            self._lock.acquire()
            self.__val -= 1  ## 相当于 self.__val = self.__val - 1
        finally:
            self._lock.release()

通过这样，就可以保证共享资源；永远永远只有一个线程会修改_val

凡是用锁的地方，必须的finally里release，否则就会有被锁死的可能性了！

事实上，它是支持上下文管理的！

In [37]:
## 修改成 with（上下文管理） 方式

class Counter:
    def __init__(self):
        self.__val = 0
        self._lock = threading.Lock()     ## 定义一个锁的对象
        
    @property
    def value(self):
        return self.__val
    
    def inc(self):
        with self._lock:         ## __enter__时候相当于调用了acquire
            self.__val += 1      ## __exit__的时候相当于调用了release
        
    def dec(self):
        with self._lock:
            self.__val -= 1 
            
## 这样的话，就会自动的加锁，自动的释放了

再看一个问题，inc和dec都加锁了，value(读这里)需要不需要加锁呢？
需要，如果不加锁，脏读、幻读都有可能！

In [49]:

class Counter:
    def __init__(self):
        self.__val = 0
        self._lock = threading.Lock()     ## 定义一个锁的对象
        
    @property
    def value(self):
        with self._lock:            ## 这里也加锁
            return self.__val
    
    def inc(self):
        with self._lock:
            self.__val += 1 
        
    def dec(self):
        with self._lock:
            self.__val -= 1 

这样，就是一个线程安全的count了

In [50]:
counter = Counter()

In [51]:
def fn():
    if random.choice([-1,1]) > 0:
        logging.info('inc')
        counter.inc()
    else:
        logging.info('dec')
        counter.dec()

这样就可以放心的用了！

In [52]:
for x in range(10):               ## 启动10个线程
    threading.Thread(target=fn).start()

2017-04-19 13:15:52,873 INFO  [Thread-78] inc
2017-04-19 13:15:52,880 INFO  [Thread-79] inc
2017-04-19 13:15:52,882 INFO  [Thread-80] inc
2017-04-19 13:15:52,889 INFO  [Thread-81] dec
2017-04-19 13:15:52,895 INFO  [Thread-82] inc
2017-04-19 13:15:52,897 INFO  [Thread-83] inc
2017-04-19 13:15:52,900 INFO  [Thread-84] dec
2017-04-19 13:15:52,902 INFO  [Thread-85] inc
2017-04-19 13:15:52,904 INFO  [Thread-86] inc
2017-04-19 13:15:52,906 INFO  [Thread-87] dec


In [53]:
counter.value      ## 这样的结果就非常明确了！   7次 inc 3次 dec  得4  
## 注：如果写IDE或放在一起运行的时候，需要在counter.value前 time.sleep(1) 
## 才能得到正确结果,不然会错乱

4

这就是关于锁的内容。

**注：以上如果写IDE或放在一起运行的时候，需要 time.sleep(1) 才能得到正确结果,不然会错乱**

所谓线程安全：就是多个线程同时调用，结果总是一致的。

何时需要加锁？

只要是共享资源的时候，就需要加锁！

接着往下：

In [54]:
lock = threading.Lock()

In [55]:
lock.acquire()

True

In [56]:
help(lock.acquire)

Help on built-in function acquire:

acquire(...) method of _thread.lock instance
    acquire(blocking=True, timeout=-1) -> bool
    (acquire_lock() is an obsolete synonym)
    
    Lock the lock.  Without argument, this blocks if the lock is already
    locked (even by the same thread), waiting for another thread to release
    the lock, and return True once the lock is acquired.
    With an argument, this will only block if the argument is true,
    and the return value reflects whether the lock is acquired.
    The blocking operation is interruptible.



In [57]:
lock.acquire(blocking=False)  ## 并不会阻塞  锁上以后，当再次加锁时，如果blocking为False,那么并不会阻塞，而是返回False

False

In [58]:
lock.acquire(timeout=3)  ## 如果blocking为True, timeout >=0 会阻塞到超时，并返回False

False

这两个参数其实用处不多，但是有时候用处会非常有用。

预先启动10个线程，处理一些任务，当其中一个线程在处理其中一个任务时，其它线程可以处理其它任务

In [29]:
## 举例

def worker(tasks):
    for task in tasks:
        if task.lock.acquire(False):   ## 第一个参数简写了，看以上帮助 acquire(blocking=True, timeout=-1) -> bool
#            try:                       ## 默认是True的，也就是block的，把它改成不block的
            logging.info(task.name)    ##  if task.lock.acquire(False) 这段，加锁成功返回True，不成功才返回False
#             finally:                 ## False参数只是不阻塞的意思，而不是返回False

#                 task.lock.release()   ## 一旦release 会被后面的重复执行

In [30]:
class Task:
    def __init__(self,name):
        self.name = name 
        self.lock = threading.Lock()   ##  这里是上面worker函数里 lock 的赋值

In [31]:
tasks = [Task(x) for x in range(10)] ## 列表生成式   这里假设有10个task

In [15]:
## [Task(x) for x in range(10)]       这个就得到以下

[<__main__.Task at 0x7ff69c40f5f8>,
 <__main__.Task at 0x7ff69c40f668>,
 <__main__.Task at 0x7ff69c40f5c0>,
 <__main__.Task at 0x7ff69c40f4e0>,
 <__main__.Task at 0x7ff69c40f588>,
 <__main__.Task at 0x7ff69c40f518>,
 <__main__.Task at 0x7ff69c40f9b0>,
 <__main__.Task at 0x7ff69c40f6a0>,
 <__main__.Task at 0x7ff69c40fc88>,
 <__main__.Task at 0x7ff69c40f7b8>]

In [32]:
for x in range(5):    ## 用5个线程来处理这10个task
    threading.Thread(target=worker, args=(tasks, ),name='worker-{}'.format(x)).start() 

2017-04-19 20:24:48,620 INFO  [worker-0] 0
2017-04-19 20:24:48,644 INFO  [worker-0] 1
2017-04-19 20:24:48,647 INFO  [worker-0] 2
2017-04-19 20:24:48,650 INFO  [worker-0] 3
2017-04-19 20:24:48,653 INFO  [worker-0] 4
2017-04-19 20:24:48,656 INFO  [worker-0] 5
2017-04-19 20:24:48,659 INFO  [worker-0] 6
2017-04-19 20:24:48,672 INFO  [worker-0] 8
2017-04-19 20:24:48,675 INFO  [worker-0] 9
2017-04-19 20:24:48,665 INFO  [worker-1] 7


注：无法保证先后顺序的，要保证先后顺序的话，就不能用并发了。

先继续吧！

锁是不可重入

#### 可重入锁    (标准库提供的)

In [33]:
rlock = threading.RLock()

In [34]:
rlock.acquire()

True

In [35]:
rlock.acquire(False)   ## 还是True，说明加锁成功

True

In [36]:
rlock.release()

In [37]:
rlock.release()

可重入锁在同一个线程内，可以多次acquire(加锁)成功，但是只能有一个线程acquire成功，acquire几次，就需要release几次。

可重入锁了解一下就可以了，通常还是用lock

In [None]:
## 继续

### Condition

从名字看出来，是条件

In [1]:
import time 
import logging
import threading
import importlib

importlib.reload(logging)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s  [%(threadName)s] %(message)s')

In [2]:
import random

In [20]:
class Dispatcher:         ## Dispatcher 调度员
    def __init__(self):
        self.data = None
        self.event = threading.Event()
    
    def consumer(self):             ## consumer 消费者
        while not self.event.wait(1):
            logging.info(self.data)
        
    def producer(self):             ## 生产者
        for _ in range(10):
            data = random.randint(0,100)  ## 随机选择0-100间的数字
            logging.info(data)
            self.data = data
            self.event.wait(1)
        self.event.set()

In [21]:
d = Dispatcher()

In [22]:
p = threading.Thread(target=d.producer, name='producer')

In [23]:
c = threading.Thread(target=d.consumer, name='consumer')

In [24]:
c.start()
p.start()

2017-04-19 21:59:16,778 INFO  [producer] 38
2017-04-19 21:59:17,778 INFO  [consumer] 38
2017-04-19 21:59:17,782 INFO  [producer] 84
2017-04-19 21:59:18,781 INFO  [consumer] 84
2017-04-19 21:59:18,810 INFO  [producer] 100
2017-04-19 21:59:19,784 INFO  [consumer] 100
2017-04-19 21:59:19,812 INFO  [producer] 50
2017-04-19 21:59:20,789 INFO  [consumer] 50
2017-04-19 21:59:20,813 INFO  [producer] 27
2017-04-19 21:59:21,792 INFO  [consumer] 27
2017-04-19 21:59:21,823 INFO  [producer] 54
2017-04-19 21:59:22,795 INFO  [consumer] 54
2017-04-19 21:59:22,836 INFO  [producer] 22
2017-04-19 21:59:23,799 INFO  [consumer] 22
2017-04-19 21:59:23,838 INFO  [producer] 68
2017-04-19 21:59:24,802 INFO  [consumer] 68
2017-04-19 21:59:24,840 INFO  [producer] 51
2017-04-19 21:59:25,805 INFO  [consumer] 51
2017-04-19 21:59:25,843 INFO  [producer] 96
2017-04-19 21:59:26,808 INFO  [consumer] 96


In [28]:
d = Dispatcher()

In [29]:
p = threading.Thread(target=d.producer, name='producer')

In [30]:
c = threading.Thread(target=d.consumer, name='consumer')

In [31]:
c.start()   ## 这次让c先启来   看效果一秒输出一次

2017-04-19 22:02:26,755 INFO  [consumer] None
2017-04-19 22:02:27,758 INFO  [consumer] None
2017-04-19 22:02:28,761 INFO  [consumer] None
2017-04-19 22:02:29,764 INFO  [consumer] None
2017-04-19 22:02:30,769 INFO  [consumer] None
2017-04-19 22:02:31,771 INFO  [consumer] None
2017-04-19 22:02:32,773 INFO  [consumer] None
2017-04-19 22:02:33,775 INFO  [consumer] None
2017-04-19 22:02:34,778 INFO  [consumer] None
2017-04-19 22:02:35,791 INFO  [consumer] None
2017-04-19 22:02:36,793 INFO  [consumer] None
2017-04-19 22:02:37,795 INFO  [consumer] None


In [32]:
p.start()   ## p再start

2017-04-19 22:02:38,257 INFO  [producer] 86
2017-04-19 22:02:38,798 INFO  [consumer] 86
2017-04-19 22:02:39,259 INFO  [producer] 44
2017-04-19 22:02:39,801 INFO  [consumer] 44
2017-04-19 22:02:40,261 INFO  [producer] 87
2017-04-19 22:02:40,809 INFO  [consumer] 87
2017-04-19 22:02:41,272 INFO  [producer] 19
2017-04-19 22:02:41,811 INFO  [consumer] 19
2017-04-19 22:02:42,274 INFO  [producer] 49
2017-04-19 22:02:42,814 INFO  [consumer] 49
2017-04-19 22:02:43,276 INFO  [producer] 23
2017-04-19 22:02:43,817 INFO  [consumer] 23
2017-04-19 22:02:44,278 INFO  [producer] 89
2017-04-19 22:02:44,823 INFO  [consumer] 89
2017-04-19 22:02:45,283 INFO  [producer] 65
2017-04-19 22:02:45,826 INFO  [consumer] 65
2017-04-19 22:02:46,286 INFO  [producer] 27
2017-04-19 22:02:46,829 INFO  [consumer] 27
2017-04-19 22:02:47,288 INFO  [producer] 95
2017-04-19 22:02:47,831 INFO  [consumer] 95


In [21]:
class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond =  threading.Condition()    ## 加入condition
    
    def consumer(self):             ## consumer 消费者
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()    ## 这里wait，就是在wait notify
                logging.info(self.data)  ## 当producer进程 notify的时候，wait会被唤醒
        
    def producer(self):             ## 生产者
        for _ in range(10):
            data = random.randint(0,100)  ## 随机选择
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify_all()    ## notify_all 通知所有wait的线程
            self.event.wait(1)
        self.event.set()

In [4]:
d = Dispatcher()

In [5]:
p = threading.Thread(target=d.producer, name='producer')
c = threading.Thread(target=d.consumer, name='consumer')

In [6]:
c.start()   ## 启动来后没任何输出

In [7]:
p.start()   ## 只有在生产者产生数据的时候，才交替执行

2017-04-20 07:09:39,797 INFO  [producer] 16
2017-04-20 07:09:39,800 INFO  [consumer] 16
2017-04-20 07:09:40,800 INFO  [producer] 86
2017-04-20 07:09:40,803 INFO  [consumer] 86
2017-04-20 07:09:41,803 INFO  [producer] 48
2017-04-20 07:09:41,811 INFO  [consumer] 48
2017-04-20 07:09:42,814 INFO  [producer] 35
2017-04-20 07:09:42,826 INFO  [consumer] 35
2017-04-20 07:09:43,826 INFO  [producer] 62
2017-04-20 07:09:43,828 INFO  [consumer] 62
2017-04-20 07:09:44,828 INFO  [producer] 81
2017-04-20 07:09:44,830 INFO  [consumer] 81
2017-04-20 07:09:45,831 INFO  [producer] 15
2017-04-20 07:09:45,869 INFO  [consumer] 15
2017-04-20 07:09:46,870 INFO  [producer] 93
2017-04-20 07:09:46,871 INFO  [consumer] 93
2017-04-20 07:09:47,872 INFO  [producer] 30
2017-04-20 07:09:47,878 INFO  [consumer] 30
2017-04-20 07:09:48,878 INFO  [producer] 91
2017-04-20 07:09:48,883 INFO  [consumer] 91


这里启4个consumer

In [14]:
d = Dispatcher()

In [15]:
p = threading.Thread(target=d.producer, name='producer')

In [16]:
for x in range(4):
    threading.Thread(target=d.consumer, name='consumer-{}'.format(x)).start()

In [17]:
p.start()     ## 每生产一个数据，会被4个消费者来消费      这就是生产者、消费者模式

2017-04-20 07:20:33,697 INFO  [producer] 93
2017-04-20 07:20:33,699 INFO  [consumer-0] 93
2017-04-20 07:20:33,700 INFO  [consumer-2] 93
2017-04-20 07:20:33,701 INFO  [consumer-3] 93
2017-04-20 07:20:33,702 INFO  [consumer-1] 93
2017-04-20 07:20:34,700 INFO  [producer] 31
2017-04-20 07:20:34,702 INFO  [consumer-0] 31
2017-04-20 07:20:34,707 INFO  [consumer-3] 31
2017-04-20 07:20:34,710 INFO  [consumer-1] 31
2017-04-20 07:20:34,714 INFO  [consumer-2] 31
2017-04-20 07:20:35,703 INFO  [producer] 87
2017-04-20 07:20:35,705 INFO  [consumer-1] 87
2017-04-20 07:20:35,710 INFO  [consumer-3] 87
2017-04-20 07:20:35,712 INFO  [consumer-2] 87
2017-04-20 07:20:35,717 INFO  [consumer-0] 87
2017-04-20 07:20:36,706 INFO  [producer] 46
2017-04-20 07:20:36,707 INFO  [consumer-2] 46
2017-04-20 07:20:36,708 INFO  [consumer-3] 46
2017-04-20 07:20:36,710 INFO  [consumer-1] 46
2017-04-20 07:20:36,711 INFO  [consumer-0] 46
2017-04-20 07:20:37,708 INFO  [producer] 49
2017-04-20 07:20:37,712 INFO  [consumer-3] 4

In [18]:
cond = threading.Condition()

In [19]:
help(cond.notify_all)   ## 通知所有wait线程，可以往下执行了

Help on method notify_all in module threading:

notify_all() method of threading.Condition instance
    Wake up all threads waiting on this condition.
    
    If the calling thread has not acquired the lock when this method
    is called, a RuntimeError is raised.



In [20]:
help(cond.notify)     ## 它会唤醒 n 个线程

Help on method notify in module threading:

notify(n=1) method of threading.Condition instance
    Wake up one or more threads waiting on this condition, if any.
    
    If the calling thread has not acquired the lock when this method is
    called, a RuntimeError is raised.
    
    This method wakes up at most n of the threads waiting for the condition
    variable; it is a no-op if no threads are waiting.



In [22]:
class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond =  threading.Condition()
    
    def consumer(self):             ## consumer 消费者
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()    ## 这里wait，就是在wait notify
                logging.info(self.data)  ## 当producer进程 notify的时候，wait会被唤醒
        
    def producer(self):             ## 生产者
        for _ in range(10):
            data = random.randint(0,100)  ## 随机选择
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify(2)    ## 最多允许2个消费者来消费
            self.event.wait(1)
        self.event.set()

In [23]:
d = Dispatcher()

In [24]:
p = threading.Thread(target=d.producer, name='producer')

In [25]:
for x in range(4):
    threading.Thread(target=d.consumer, name='consumer-{}'.format(x)).start()

In [26]:
p.start()    ## 总是只有 2个消费者来消费   具体由哪个消费者来消费，不确定

2017-04-20 07:56:40,214 INFO  [producer] 43
2017-04-20 07:56:40,219 INFO  [consumer-0] 43
2017-04-20 07:56:40,223 INFO  [consumer-1] 43
2017-04-20 07:56:41,220 INFO  [producer] 14
2017-04-20 07:56:41,225 INFO  [consumer-2] 14
2017-04-20 07:56:41,226 INFO  [consumer-3] 14
2017-04-20 07:56:42,225 INFO  [producer] 17
2017-04-20 07:56:42,227 INFO  [consumer-0] 17
2017-04-20 07:56:42,229 INFO  [consumer-1] 17
2017-04-20 07:56:43,228 INFO  [producer] 39
2017-04-20 07:56:43,230 INFO  [consumer-2] 39
2017-04-20 07:56:43,233 INFO  [consumer-3] 39
2017-04-20 07:56:44,231 INFO  [producer] 69
2017-04-20 07:56:44,233 INFO  [consumer-1] 69
2017-04-20 07:56:44,235 INFO  [consumer-0] 69
2017-04-20 07:56:45,234 INFO  [producer] 75
2017-04-20 07:56:45,235 INFO  [consumer-3] 75
2017-04-20 07:56:45,247 INFO  [consumer-2] 75
2017-04-20 07:56:46,236 INFO  [producer] 84
2017-04-20 07:56:46,241 INFO  [consumer-0] 84
2017-04-20 07:56:46,242 INFO  [consumer-1] 84
2017-04-20 07:56:47,242 INFO  [producer] 84
2017

In [27]:
class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond =  threading.Condition()
    
    def consumer(self):             ## consumer 消费者
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()    ## 这里wait，就是在wait notify
                logging.info(self.data)  ## 当producer进程 notify的时候，wait会被唤醒
        
    def producer(self):             ## 生产者
        for _ in range(10):
            data = random.randint(0,100)  ## 随机选择
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify()    ##  默认是 1 个消费者来消费
            self.event.wait(1)     ## 这个是每 1 秒产生一条消息，相当于sleep一样；并不是等待消息完
        self.event.set()           ## 最后这个 set 是控制让consumer能退出的

In [28]:
d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
for x in range(4):
    threading.Thread(target=d.consumer, name='consumer-{}'.format(x)).start()

In [29]:
p.start()   ##  具体由哪个消费者来消费，不确定

2017-04-20 08:00:05,065 INFO  [producer] 97
2017-04-20 08:00:05,067 INFO  [consumer-0] 97
2017-04-20 08:00:06,067 INFO  [producer] 85
2017-04-20 08:00:06,070 INFO  [consumer-1] 85
2017-04-20 08:00:07,070 INFO  [producer] 26
2017-04-20 08:00:07,072 INFO  [consumer-2] 26
2017-04-20 08:00:08,073 INFO  [producer] 0
2017-04-20 08:00:08,076 INFO  [consumer-3] 0
2017-04-20 08:00:09,077 INFO  [producer] 36
2017-04-20 08:00:09,080 INFO  [consumer-0] 36
2017-04-20 08:00:10,080 INFO  [producer] 47
2017-04-20 08:00:10,082 INFO  [consumer-1] 47
2017-04-20 08:00:11,083 INFO  [producer] 9
2017-04-20 08:00:11,088 INFO  [consumer-2] 9
2017-04-20 08:00:12,086 INFO  [producer] 71
2017-04-20 08:00:12,092 INFO  [consumer-3] 71
2017-04-20 08:00:13,092 INFO  [producer] 84
2017-04-20 08:00:13,095 INFO  [consumer-0] 84
2017-04-20 08:00:14,095 INFO  [producer] 29
2017-04-20 08:00:14,098 INFO  [consumer-1] 29


#### 以上这些就是condition

Condition 通常用于(通知 等待)生产者消费者模式，生产者生产消息之后，使用 notify 或者 notify_all 通知消费者消费

消费者使用 wait 方法阻塞等待生产者通知

notify 通知指定个wait 的线程，notify_all 通知所有的 wait 线程

无论 notify 或 notify_all 还是 wait 都必须先 acqurie, 完成后必须确保release,通常使用 with 语法。

生产者、费者模型其实是非常有用的。

cond它的notify和wait是互斥的!!

凡是数据要通过流式处理的，就是生产者消费者啦

mq、kafka 等都是跨进程跨主机的，我们这是进程内的。

消息队列都是单播的，我们这个是可以广播的。notify_all

凡是跨进程的都需要序列化，进程内的不需要序列化。

总结：
* Event通常用于线程之间事件通知
* Lock通常用于保护共享资源 
* Condition通常用于生产者消费者模型