# 12.1 启动与停止线程

In [1]:
import time

def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(2)

In [3]:
from threading import Thread
t = Thread(target=countdown,args=(10,))
t.start()

T-minus 10
T-minus 9
T-minus 8
T-minus 7
T-minus 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


In [4]:
class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(2)

c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
c.terminate() # Signal termination
t.join()      # Wait for actual termination (if needed)

T-minus 10


# 12.2 判断线程是否已经启动

你已经启动了一个线程，但是你想知道它是不是真的已经开始运行了

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作，这时线程同步问题就会变得非常棘手。为了解决这些问题，我们需要使用 threading 库中的 Event 对象。 Event 对象包含一个可由线程设置的信号标志，它允许线程等待某些事件的发生。在初始情况下，event 对象中的信号标志被设置为假。如果有线程等待一个 event 对象，而这个 event 对象的标志为假，那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真，它将唤醒所有等待这个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象，那么它将忽略这个事件，继续执行。 下边的代码展示了如何使用 Event 来协调线程的启动：

In [5]:
from threading import Thread, Event
import time

# Code to execute in an independent thread
def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(1.5)

In [9]:
started_evt = Event()

print('Launching countdown')
t = Thread(target=countdown,args=(10,started_evt))
t.start()

started_evt.wait()
print('countdown is running')

Launching countdown
countdown starting
T-minus 10
countdown is running
T-minus 5
T-minus 9
T-minus 4
T-minus 8
T-minus 3
T-minus 7
T-minus 2
T-minus 6
T-minus 1
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


event 对象最好单次使用，就是说，你创建一个 event 对象，让某个线程等待这个对象，一旦这个对象被设置为真，你就应该丢弃它。尽管可以通过 clear() 方法来重置 event 对象，但是很难确保安全地清理 event 对象并对它重新赋值。很可能会发生错过事件、死锁或者其他问题（特别是，你无法保证重置 event 对象的代码会在线程再次等待这个 event 对象之前执行）。如果一个线程需要不停地重复使用 event 对象，你最好使用 Condition 对象来代替。下面的代码使用 Condition 对象实现了一个周期定时器，每当定时器超时的时候，其他线程都可以监测到：

In [2]:
import threading
import time

class PeriodicTimer:
    def __init__(self,interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()
        
    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True
        
        t.start()
        
    def run(self):
        while True:
            time.sleep(self._interval)
            with self._cv:
                self._flag ^= 1
                self._cv.notify_all()
                
    def wait_for_tick(self):
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait

ptimer = PeriodicTimer(2)
ptimer.start()

# Two threads that synchronize on the timer
def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus', nticks)
        nticks -= 1

def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1

threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

event对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程。如果你只想唤醒单个线程，最好是使用信号量或者 Condition 对象来替代。考虑一下这段使用信号量实现的代码：

In [3]:
def worker(n,sema):
    sema.acquire()
    
    print('Working',n)
    
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
    t = threading.Thread(target=worker,args=(n,sema,))
    t.start()

In [5]:
sema.release()

Working 0


In [6]:
sema.release()

Working 1


运行上边的代码将会启动一个线程池，但是并没有什么事情发生。这是因为所有的线程都在等待获取信号量。每次信号量被释放，只有一个线程会被唤醒并执行

# 12.3 线程间通信

你的程序中有多个线程，你需要在这些线程之间安全地交换信息或数据

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象，这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。

In [5]:
from queue import Queue
from threading import Thread

def producer(out_q):
    while True:
        data = 1
        out_q.put(data)
        
        
def consumer(in_q):
    while True:
        data = in_q.get()
        


<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Que

In [6]:
q = Queue()
t1 = Thread(target = consumer,args=(q,))
t2 = Thread(target = producer,args=(q,))
t1.start()
t2.start()

<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queu

<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queue object at 0x0000000004DA3898>
<queue.Queu

In [14]:
from queue import Queue
from threading import Thread

# Object that signals shutdown
_sentinel = object()

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        # Process the data
        ...

本例中有一个特殊的地方：消费者在读到这个特殊值之后立即又把它放回到队列中，将之传递下去。这样，所有监听这个队列的消费者线程就可以全部关闭了。 尽管队列是最常见的线程间通信机制，但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用 Condition 变量来包装你的数据结构。下边这个例子演示了如何创建一个线程安全的优先级队列，

In [2]:
import threading,time
import heapq

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
        
    def put(self,item,priority):
        with self._cv:
            heapq.heappush(self._queue,(-priority,self._count,item))
            self._count += 1
            self._cv.notify
            
    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heap.heappop(self._queue)[-1]

使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下，你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性，比如下边这个例子中的 task_done() 和 join() ：

In [4]:
from queue import Queue
from threading import Thread

def produce(out_q):
    while runing:
        data = 1
        out_q.put(data)
        
def consumer(in_q):
    while  True:
        data = in_q.get()
        in_q.task_done()
        
q = Queue()
t1 = Thread(target=consumer,args=(q,))
t2 = Thread(target=consumer,args=(q,))
t1.start()
t2.start()

q.join()

如果一个线程需要在一个“消费者”线程处理完特定的数据项时立即得到通知，你可以把要发送的数据和一个 Event 放到一起使用，这样“生产者”就可以通过这个Event对象来监测处理的过程了。示例如下：

In [5]:
from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        # Make an (data, event) pair and hand it to the consumer
        evt = Event()
        out_q.put((data, evt))
        ...
        # Wait for the consumer to process the item
        evt.wait()

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data, evt = in_q.get()
        # Process the data
        ...
        # Indicate completion
        evt.set()

基于简单队列编写多线程程序在多数情况下是一个比较明智的选择。从线程安全队列的底层实现来看，你无需在你的代码中使用锁和其他底层的同步机制，这些只会把你的程序弄得乱七八糟。此外，使用队列这种基于消息的通信机制可以被扩展到更大的应用范畴，比如，你可以把你的程序放入多个进程甚至是分布式系统而无需改变底层的队列结构。 使用线程队列有一个要注意的问题是，向队列中添加数据项时并不会复制此数据项，线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态，那你最好只传递不可修改的数据结构（如：整型、字符串或者元组）或者一个对象的深拷贝。

```
from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...
        ```

Queue 对象提供一些在当前上下文很有用的附加特性。比如在创建 Queue 对象时提供可选的 size 参数来限制可以添加到队列中的元素数量。对于“生产者”与“消费者”速度有差异的情况，为队列中的元素数量添加上限是有意义的。比如，一个“生产者”产生项目的速度比“消费者” “消费”的速度快，那么使用固定大小的队列就可以在队列已满的时候阻塞队列，以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。如果你发现自己曾经试图通过摆弄队列大小来解决一个问题，这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。 get() 和 put() 方法都支持非阻塞方式和设定超时，例如：

```
import queue
q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    ...

try:
    q.put(item, block=False)
except queue.Full:
    ...

try:
    data = q.get(timeout=5.0)
except queue.Empty:
    ...
    ```

这些操作都可以用来避免当执行某些特定队列操作时发生无限阻塞的情况，比如，一个非阻塞的 put() 方法和一个固定大小的队列一起使用，这样当队列已满时就可以执行不同的代码。比如输出一条日志信息并丢弃。

```
def producer(q):
    ...
    try:
        q.put(item, block=False)
    except queue.Full:
        log.warning('queued item %r discarded!', item)
        
        ```

如果你试图让消费者线程在执行像 q.get() 这样的操作时，超时自动终止以便检查终止标志，你应该使用 q.get() 的可选参数 timeout ，如下：

```
_running = True

def consumer(q):
    while _running:
        try:
            item = q.get(timeout=5.0)
            # Process item
            ...
        except queue.Empty:
            pass
            ```

最后，有 q.qsize() ， q.full() ， q.empty() 等实用方法可以获取一个队列的当前大小和状态。但要注意，这些方法都不是线程安全的。可能你对一个队列使用 empty() 判断出这个队列为空，但同时另外一个线程可能已经向这个队列中插入一个数据项。所以，你最好不要在你的代码中使用这些方法。

# 12.4 给关键部分加锁

要在多线程程序中安全使用可变对象，你需要使用 threading 库中的 Lock 对象，就像下边这个例子这样：

In [6]:
import threading

class SharedCounter:
    def __init__(self,initial_value=0):
        self._value = initial_value
        self._value_lock = threading.Lock()
        
    def incr(self,delta=1):
        with self._value_lock:
            self._value += delta
    
    def decr(self,delta=1):
        with self._value_lock:
            self._value -= delta

Lock 对象和 with 语句块一起使用可以保证互斥执行，就是每次只有一个线程可以执行 with 语句包含的代码块。with 语句会在这个代码块执行前自动获取锁，在执行结束后自动释放锁

线程调度本质上是不确定的，因此，在多线程程序中错误地使用锁机制可能会导致随机数据损坏或者其他的异常行为，我们称之为竞争条件。为了避免竞争条件，最好只在临界区（对临界资源进行操作的那部分代码）使用锁。 在一些“老的” Python 代码中，显式获取和释放锁是很常见的。下边是一个上一个例子的变种：

```
import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        self._value_lock.acquire()
        self._value += delta
        self._value_lock.release()

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        self._value_lock.acquire()
        self._value -= delta
        self._value_lock.release()```

相比于这种显式调用的方法，with 语句更加优雅，也更不容易出错，特别是程序员可能会忘记调用 release() 方法或者程序在获得锁之后产生异常这两种情况（使用 with 语句可以保证在这两种情况下仍能正确释放锁）。 为了避免出现死锁的情况，使用锁机制的程序应该设定为每个线程一次只允许获取一个锁。如果不能这样做的话，你就需要更高级的死锁避免机制，我们将在12.5节介绍。 在 threading 库中还提供了其他的同步原语，比如 RLock 和 Semaphore 对象。但是根据以往经验，这些原语是用于一些特殊的情况，如果你只是需要简单地对可变对象进行锁定，那就不应该使用它们。一个 RLock （可重入锁）可以被同一个线程多次获取，主要用来实现基于监测对象模式的锁定和同步。在使用这种锁的情况下，当锁被持有时，只有一个线程可以使用完整的函数或者类中的方法。比如，你可以实现一个这样的 SharedCounter 类：

In [7]:
import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    _lock = threading.RLock()
    def __init__(self, initial_value = 0):
        self._value = initial_value

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with SharedCounter._lock:
            self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with SharedCounter._lock:
             self.incr(-delta)

在上边这个例子中，没有对每一个实例中的可变对象加锁，取而代之的是一个被所有实例共享的类级锁。这个锁用来同步类方法，具体来说就是，这个锁可以保证一次只有一个线程可以调用这个类方法。不过，与一个标准的锁不同的是，已经持有这个锁的方法在调用同样使用这个锁的方法时，无需再次获取锁。比如 decr 方法。 这种实现方式的一个特点是，无论这个类有多少个实例都只用一个锁。因此在需要大量使用计数器的情况下内存效率更高。不过这样做也有缺点，就是在程序中使用大量线程并频繁更新计数器时会有争用锁的问题。 信号量对象是一个建立在共享计数器基础上的同步原语。如果计数器不为0，with 语句将计数器减1，线程被允许执行。with 语句执行结束后，计数器加１。如果计数器为0，线程将被阻塞，直到其他线程结束将计数器加1。尽管你可以在程序中像标准锁一样使用信号量来做线程同步，但是这种方式并不被推荐，因为使用信号量为程序增加的复杂性会影响程序性能。相对于简单地作为锁使用，信号量更适用于那些需要在线程之间引入信号或者限制的程序。比如，你需要限制一段代码的并发访问量，你就可以像下面这样使用信号量完成：

In [8]:
from threading import Semaphore
import urllib.request

# At most, five threads allowed to run at once
_fetch_url_sema = Semaphore(5)

def fetch_url(url):
    with _fetch_url_sema:
        return urllib.request.urlopen(url)

# 12.5 防止死锁的加锁机制

你正在写一个多线程程序，其中线程需要一次获取多个锁，此时如何避免死锁问题。

在多线程程序中，死锁问题很大一部分是由于线程同时获取多个锁造成的。举个例子：一个线程获取了第一个锁，然后在获取第二个锁的 时候发生阻塞，那么这个线程就可能阻塞其他线程的执行，从而导致整个程序假死。 解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id，然后只允许按照升序规则来使用多个锁，这个规则使用上下文管理器 是非常容易实现的

In [9]:
import threading
from contextlib import contextmanager

_local = threading.local() 
@contextmanager
def acquire(*locks):
    locks = sorted(locks,key=lambda x:id(x))
    acquired = getattr(_local,'acquired',[])
    if acquired and max(id(lock) for lock in acquired) >=id(locks[0]):
        raise RuntimeError('Lock Order Violation')
        
    acquired.extend(locks)
    _local.acquired = acquired
    
    try:
        for lock in locks:
            lock.acquired()
        yield
        
    finally:
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]