# [threading.Thread]启动与停止线程

`threading` 库可以在单独的线程中执行任何的在 `Python` 中可以调用的对象。你可以创建一个 `Thread` 对象并将你要执行的对象以 `target` 参数的形式提供给该对象

In [1]:
# Code to execute in an independent thread
import time
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()

T-minus 10


当你创建好一个线程对象后，该对象并不会立即执行，除非你调用它的 `start()` 方法（当你调用 `start()` 方法时，它会调用你传递进来的函数，并把你传递进来的参数传递给该函数）。

`Python` 中的线程会在一个单独的系统级线程中执行（比如说一个 `POSIX` 线程或者一个 `Windows` 线程），这些线程将由操作系统来全权管理。

线程一旦启动，将独立执行直到目标函数返回。你可以查询一个线程对象的状态，看它是否还在执行：

```py
if t.is_alive():
    print('Still running')
else:
    print('Completed')

```

你也可以将一个线程加入到当前线程，并等待它终止：

```py
t.join()

```


后台线程无法等待，不过，这些线程会在主线程终止时自动销毁。 

除了如上所示的两个操作，并没有太多可以对线程做的事情。

你无法结束一个线程，无法给它发送信号，无法调整它的调度，也无法执行其他高级操作。

如果需要这些特性，你需要自己添加。

比如说，如果你需要终止线程，那么这个线程必须通过编程在某个特定点轮询来退出。

你可以像下边这样把线程放入一个类中：

In [23]:
%%file ThreadTest.py
import time
class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)

c = CountdownTask()

from threading import Thread
t = Thread(target=c.run, args=(10,))
t.start()
c.terminate() # Signal termination 信號終止
t.join()      # Wait for actual termination (if needed) 實際停止

Overwriting ThreadTest.py


In [22]:
# ============ 開新 CONSOLE ============
# ------------ Run the server ------------

import os
import subprocess
# os.path.abspath {本黨位置}: D:\Google 雲端硬碟\learn\線程調用\TestOS.py
# os.path.dirname {目錄} : D:\Google 雲端硬碟\learn\線程調用
BASE_DIR = os.path.dirname(os.path.abspath('__file__'))
# 透過 cmd 呼叫
DIR = os.path.join(BASE_DIR, 'ThreadTest.py')
cmd = "python " + f'"{DIR}"'
print(cmd,BASE_DIR,sep='\n')
#  CONSOLE混雜
#os.system(cmd)
#subprocess.call(cmd)

#  NEW 一個 CONSOLE
subprocess.Popen(cmd, creationflags=subprocess.CREATE_NEW_CONSOLE)

python "D:\CODE\GitHub\py\資料結構\py3-cookbook\并发编程\ThreadTest.py"
D:\CODE\GitHub\py\資料結構\py3-cookbook\并发编程


<subprocess.Popen at 0x179f62f3d68>

如果线程执行一些像 `I/O` 这样的阻塞操作，那么通过轮询来终止线程将使得线程之间的协调变得非常棘手。

比如，如果一个线程一直阻塞在一个 `I/O` 操作上，它就永远无法返回，也就无法检查自己是否已经被结束了。

要正确处理这些问题，你需要利用超时循环来小心操作线程。 例子如下：

```py
class IOTask:
    def terminate(self):
        self._running = False

    def run(self, sock):
        # sock is a socket
        sock.settimeout(5)        # Set timeout period
        while self._running:
            # Perform a blocking I/O operation w/ timeout
            try:
                data = sock.recv(8192)
                break
            except socket.timeout:
                continue
            # Continued processing
            ...
        # Terminated
        return
```



## [multiprocessing]讨论

由于全局解释锁 `（GIL）` 的原因， `Python` 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。

所以， `Python` 的线程更适用于处理 `I/O` 和其他需要并发执行的阻塞操作（比如等待 `I/O` 、等待从数据库获取数据等等），而**不是需要多处理器并行的计算密集型任务**

你可以通过 `multiprocessing` 模块在一个单独的进程中执行你的代码：

In [18]:
import multiprocessing

class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(1)

c = CountdownTask()
p = multiprocessing.Process(
    target=c.run(n=5)
)
p.start()

T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


# [threading.Event()]判断线程是否已经启动

线程的一个关键特性是每个线程都是独立运行且状态不可预测。

如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作，这时线程同步问题就会变得非常棘手。

为了解决这些问题，我们需要使用 `threading` 库中的 `Event` 对象。 

`Event` 对象包含一个可由线程设置的信号标志，它允许线程等待某些事件的发生。

在初始情况下， `event` 对象中的信号标志被设置为假。

如果有线程等待一个 `event` 对象，而这个 `event` 对象的标志为假，那么这个线程将会被一直阻塞直至该标志为真。

一个线程如果将一个 `event` 对象的信号标志设置为真，它将唤醒所有等待这个 `event` 对象的线程。

如果一个线程等待一个已经被设置为真的 `event` 对象，那么它将忽略这个事件，继续执行。 

下边的代码展示了如何使用 `Event` 来协调线程的启动：

In [19]:
from threading import Thread, Event
import time

# Code to execute in an independent thread
def countdown(n, started_evt):
    print('countdown starting')
    # Event().set()
    started_evt.set()
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create the event object that will be used to signal startup
started_evt = Event()

# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10,started_evt))
t.start()

# Wait for the thread to start
started_evt.wait()
print('countdown is running')

Launching countdown
countdown starting
T-minus 10


True

countdown is running


## [threading.Condition()]讨论

`event` 对象最好单次使用，就是说，你创建一个 `event` 对象，让某个线程等待这个对象，一旦这个对象被设置为真，你就应该丢弃它。

尽管可以通过 `clear()` 方法来重置 `event` 对象，但是很难确保安全地清理 `event` 对象并对它重新赋值。

很可能会发生错过事件、死锁或者其他问题（特别是，你无法保证重置 `event` 对象的代码会在线程再次等待这个 `event` 对象之前执行）。

如果一个线程需要不停地重复使用 `event` 对象，你最好使用 `Condition` 对象来代替。

下面的代码使用 `Condition` 对象实现了一个周期定时器，每当定时器超时的时候，其他线程都可以监测到：

In [25]:
%%file MultiThreadTest.py
import threading
import time

# ======= Timer =======

class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()

    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True

        t.start()

    def run(self):
        '''
        Run the timer and notify waiting threads after each interval
        '''
        while True:
            time.sleep(self._interval)
            with self._cv:
                # 二進制 異位(XOR)
                self._flag ^= 1
                self._cv.notify_all()

    def wait_for_tick(self):
        '''
        Wait for the next tick of the timer
        '''
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait()

# Example use of the timer
ptimer = PeriodicTimer(1)
ptimer.start()

# Two threads that synchronize on the timer
def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus', nticks)
        nticks -= 1

def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1

threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

Overwriting MultiThreadTest.py


In [26]:
# ============ 開新 CONSOLE ============
# ------------ Run the server ------------

import os
import subprocess
# os.path.abspath {本黨位置}: D:\Google 雲端硬碟\learn\線程調用\TestOS.py
# os.path.dirname {目錄} : D:\Google 雲端硬碟\learn\線程調用
BASE_DIR = os.path.dirname(os.path.abspath('__file__'))
# 透過 cmd 呼叫
DIR = os.path.join(BASE_DIR, 'MultiThreadTest.py')
cmd = "python " + f'"{DIR}"'
print(cmd,BASE_DIR,sep='\n')
#  CONSOLE混雜
#os.system(cmd)
#subprocess.call(cmd)

#  NEW 一個 CONSOLE
subprocess.Popen(cmd, creationflags=subprocess.CREATE_NEW_CONSOLE)

python "D:\CODE\GitHub\py\資料結構\py3-cookbook\并发编程\MultiThreadTest.py"
D:\CODE\GitHub\py\資料結構\py3-cookbook\并发编程


<subprocess.Popen at 0x179f62f3c50>

`event` 对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程。

如果你只想唤醒单个线程，最好是使用 **信号量** 或者 `Condition` 对象来替代。考虑一下这段使用信号量实现的代码：

In [42]:
import threading
# Worker thread
def worker(n, sema):
    # Wait to be signaled
    sema.acquire()

    # Do some work
    print('Working', n)

# Create some threads
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema,))
    t.start()

每次信号量被释放，只有一个线程会被唤醒并执行

In [43]:
sema.release()

sema.release()

Working 0
Working 1


# {情境及應用不瞭解}线程间通信

从一个线程向另一个线程发送数据最安全的方式可能就是使用 `queue` 库中的队列了。

创建一个被多个线程共享的 `Queue` 对象，这些线程通过使用 `put()` 和 `get()` 操作来向队列中添加或者删除元素。 例如：

In [44]:
from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(data)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

`Queue` 对象已经包含了必要的锁，所以你可以通过它在多个线程间多安全地共享数据。 

当使用队列时，协调生产者和消费者的关闭问题可能会有一些麻烦。

一个通用的解决方法是在队列中放置一个特殊的值，当消费者读到这个值的时候，终止执行。例如：

```py
from queue import Queue
from threading import Thread

# Object that signals shutdown
_sentinel = object()

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        # Process the data
        ...
```



本例中有一个特殊的地方：消费者在读到这个特殊值之后立即又把它放回到队列中，将之传递下去。

这样，所有监听这个队列的消费者线程就可以全部关闭了。 

尽管队列是最常见的线程间通信机制，但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。

最常见的方法是使用 `Condition` 变量来包装你的数据结构。

下边这个例子演示了如何创建一个线程安全的优先级队列

```py
import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
        
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()

    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]
```


使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下，你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。

不过队列对象提供一些基本完成的特性，比如下边这个例子中的 `task_done()` 和 `join()` ：

```py
from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Process the data
        ...
        # Indicate completion
        in_q.task_done()

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

# Wait for all produced items to be consumed
q.join()
```


如果一个线程需要在一个 **“消费者”** 线程处理完特定的数据项时立即得到通知，你可以把要发送的数据和一个 `Event` 放到一起使用，这样 **“生产者”** 就可以通过这个 `Event` 对象来监测处理的过程了。示例如下：

```py
from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        # Make an (data, event) pair and hand it to the consumer
        evt = Event()
        out_q.put((data, evt))
        ...
        # Wait for the consumer to process the item
        evt.wait()

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data, evt = in_q.get()
        # Process the data
        ...
        # Indicate completion
        evt.set()
```


## [列隊判斷與線程]讨论

使用队列这种基于消息的通信机制可以被扩展到更大的应用范畴，比如，你可以把你的程序放入 **多个进程** 甚至是 **分布式系统** 而无需改变底层的队列结构。 

使用线程队列有一个要注意的问题是，向队列中 **添加数据项** 时并不会复制此数据项，线程间通信实际上是在线程间传递对象引用。

如果你担心对象的 **共享状态** ，那你最好只传递不可修改的数据结构（如：整型、字符串或者元组）或者一个对象的 **深拷贝** 。

```py
from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...
```


`Queue` 对象提供一些在当前上下文很有用的附加特性。

比如在创建 `Queue` 对象时提供可选的 `size` 参数来限制可以添加到队列中的元素数量。

对于 **“生产者”** 与 **“消费者”** 速度有差异的情况，为队列中的元素数量添加上限是有意义的。

比如，一个 **“生产者”** 产生项目的速度比 **“消费者”** **“消费”** 的速度快，那么使用固定大小的队列就可以在队列已满的时候阻塞队列，以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。

在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。

如果你发现自己曾经试图通过摆弄队列大小来解决一个问题，这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。 **get()** 和 **put()** 方法都 **支持非阻塞方式和设定超时** ，例如：

```py
import queue
q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    ...

try:
    q.put(item, block=False)
except queue.Full:
    ...

try:
    data = q.get(timeout=5.0)
except queue.Empty:
    ...
```

这些操作都可以用来避免当执行某些特定队列操作时发生无限阻塞的情况，比如，一个非阻塞的 `put()` 方法和 **一个固定大小的队列** 一起使用，这样当队列已满时就可以执行不同的代码。

比如输出一条日志信息并丢弃。

```py
def producer(q):
    ...
    try:
        q.put(item, block=False)
    except queue.Full:
        log.warning('queued item %r discarded!', item)
```

如果你试图让消费者线程在执行像 `q.get()` 这样的操作时，超时自动终止以便检查终止标志，你应该使用 `q.get()` 的可选参数 `timeout` ，如下：

```py
_running = True

def consumer(q):
    while _running:
        try:
            item = q.get(timeout=5.0)
            # Process item
            ...
        except queue.Empty:
            pass
```

最后，有 `q.qsize()` ， `q.full()` ， `q.empty()` 等实用方法可以获取一个队列的当前大小和状态。但要注意，这些方法都不是线程安全的。

可能你对一个队列使用 `empty()` 判断出这个队列为空，但同时另外一个线程可能已经向这个队列中插入一个数据项。

所以，你最好不要在你的代码中使用这些方法。


# [threading; with]给关键部分加锁

要在多线程程序中安全使用可变对象，你需要使用 `threading` 库中的 `Lock` 对象，就像下边这个例子这样：

```py
import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with self._value_lock:
             self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with self._value_lock:
             self._value -= delta
```

`Lock` 对象和 `with` 语句块一起使用可以保证互斥执行，就是每次只有一个线程可以执行 `with` 语句包含的代码块。

`with` 语句会在这个代码块执行前自动获取锁，在执行结束后自动释放锁。


## [無明確應用，難以理解，未完]讨论

线程调度本质上是不确定的，因此，在多线程程序中错误地使用锁机制可能会导致随机数据损坏或者其他的异常行为，我们称之为竞争条件。

为了避免竞争条件，最好只在临界区（对临界资源进行操作的那部分代码）使用锁。

显式获取和释放锁是很常见的。下边是一个上一个例子的变种：


```py
import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        # 上 锁
        self._value_lock.acquire()
        self._value += delta
        # 释放锁
        self._value_lock.release()

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        # 上 锁
        self._value_lock.acquire()
        self._value -= delta
        # 释放锁
        self._value_lock.release()
```




相比于这种显式调用的方法， `with` 语句更加优雅，也更不容易出错，

特别是程序员可能会忘记调用 `release()` 方法或者程序在获得锁之后产生异常这两种情况（使用 `with` 语句可以保证在这两种情况下仍能正确释放锁）。 

为了避免出现死锁的情况，使用锁机制的程序应该设定为每个线程一次只允许获取一个锁

**未完**

# 防止死锁的加锁机制

在多线程程序中，死锁问题很大一部分是由于线程同时获取多个锁造成的。

举个例子：一个线程获取了第一个锁，然后在获取第二个锁的 时候发生阻塞，那么这个线程就可能阻塞其他线程的执行，从而导致整个程序假死。 

解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的`id`，然后只允许按照升序规则来使用多个锁，这个规则使用上下文管理器 是非常容易实现的，示例如下：

In [47]:
import threading
from contextlib import contextmanager

# Thread-local state to stored information on locks already acquired
_local = threading.local()

@contextmanager
def acquire(*locks):
    # Sort locks by object identifier
    locks = sorted(locks, key=lambda x: id(x))

    # Make sure lock order of previously acquired locks is not violated
    acquired = getattr(_local,'acquired',[])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    # Acquire all of the locks
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # Release locks in reverse order of acquisition
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]

# ======= 使用 ========
# 如何使用这个上下文管理器
# 不论是单个锁还是多个锁中都使用 acquire() 函数来申请锁        
import threading
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
    while True:
        with acquire(x_lock, y_lock):
            print('Thread-1')

def thread_2():
    while True:
        with acquire(y_lock, x_lock):
            print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

Thread-1
Thread-2
Thread-1
Thread-2
Thread-1
Thread-2
Thread-1
Thread-2
Thread-1
Thread-2
Thread-1
Thread-2
Thread-1
Thread-2
Thread-1
Thread-2
Thread-1
Thread-2
Thread-1

如果你执行这段代码，你会发现它即使在不同的函数中以不同的顺序获取锁也没有发生死锁。 

其关键在于，在第一段代码中，我们对这些锁进行了排序。

通过排序，使得不管用户以什么样的顺序来请求锁，这些锁都会按照固定的顺序被获取。 

如果有多个 `acquire()` 操作被**嵌套**调用，可以通过线程本地存储 `（TLS）` 来检测潜在的死锁问题。 

假设你的代码是这样写的: 
```py
import threading
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():

    while True:
        with acquire(x_lock):
            with acquire(y_lock):
                print('Thread-1')

def thread_2():
    while True:
        with acquire(y_lock):
            with acquire(x_lock):
                print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()
```

必定会有一个线程发生崩溃

```py
Exception in thread Thread-45:
Traceback (most recent call last):
  File "D:\anaconda\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "D:\anaconda\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-49-bb2fb04bd3bb>", line 15, in thread_2
    with acquire(x_lock):
  File "D:\anaconda\lib\contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "<ipython-input-47-57757de4db3d>", line 15, in acquire
    raise RuntimeError('Lock Order Violation')
RuntimeError: Lock Order Violation
```

发生崩溃的原因在于，每个线程都记录着自己已经获取到的锁。

`acquire()` 函数会检查之前已经获取的锁列表， 由于锁是按照升序排列获取的，所以函数会认为之前已获取的锁的 `id` 必定小于新申请到的锁，这时就会触发异常。

## 讨论

下面以一个关于线程死锁的经典问题：“哲学家就餐问题”，作为本节最后一个例子。

题目是这样的：

五位哲学家围坐在一张桌子前，每个人 面前有一碗饭和一只筷子。

在这里每个哲学家可以看做是一个独立的线程，而每只筷子可以看做是一个锁。

每个哲学家可以处在静坐、 思考、吃饭三种状态中的一个。

需要注意的是，每个哲学家吃饭是需要两只筷子的，这样问题就来了：如果每个哲学家都拿起自己左边的筷子， 那么他们五个都只能拿着一只筷子坐在那儿，直到饿死。

此时他们就进入了死锁状态。 

下面是一个简单的使用死锁避免机制解决“哲学家就餐问题”的实现：


```py
import threading

# The philosopher thread
def philosopher(left, right):
    while True:
        with acquire(left,right):
             print(threading.currentThread(), 'eating')

# The chopsticks (represented by locks)
NSTICKS = 5
chopsticks = [threading.Lock() for n in range(NSTICKS)]

# Create all of the philosophers
for n in range(NSTICKS):
    t = threading.Thread(
        target=philosopher,
        args=(chopsticks[n],chopsticks[(n+1) % NSTICKS])
    )
    t.start()
```
    
最后，要特别注意到，为了避免死锁，所有的加锁操作必须使用 `acquire()` 函数。如果代码中的某部分绕过 `acquire` 函数直接申请锁，那么整个死锁避免机制就不起作用了

# 保存线程的状态信息

有时在多线程编程中，你需要只保存当前运行线程的状态。 

要这么做，可使用 `thread.local()` 创建一个本地线程存储对象。 

对这个对象的属性的保存和读取操作都只会对执行线程可见，而其他线程并不可见。

作为使用本地存储的一个有趣的实际例子， 考虑在 `8.3小节` 定义过的 `LazyConnection` 上下文管理器类。 

下面我们对它进行一些小的修改使得它可以适用于多线程：

In [2]:
from socket import socket, AF_INET, SOCK_STREAM
import threading

class LazyConnection:
    def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
        self.address = address
        self.family = AF_INET
        self.type = SOCK_STREAM
        self.local = threading.local()

    def __enter__(self):
        if hasattr(self.local, 'sock'):
            raise RuntimeError('Already connected')
        self.local.sock = socket(self.family, self.type)
        self.local.sock.connect(self.address)
        return self.local.sock

    def __exit__(self, exc_ty, exc_val, tb):
        self.local.sock.close()
        del self.local.sock

代码中，自己观察对于 `self.local` 属性的使用。

它被初始化为一个 `threading.local()` 实例。 

其他方法操作被存储为 `self.local.sock` 的套接字对象。 

有了这些就可以在多线程中安全的使用 `LazyConnection` 实例了。

例如：

In [3]:
from functools import partial
def test(conn):
    with conn as s:
        s.send(b'GET /index.html HTTP/1.0\r\n')
        s.send(b'Host: www.python.org\r\n')

        s.send(b'\r\n')
        resp = b''.join(iter(partial(s.recv, 8192), b''))

    print('Got {} bytes'.format(len(resp)))

if __name__ == '__main__':
    conn = LazyConnection(('www.python.org', 80))

    t1 = threading.Thread(target=test, args=(conn,))
    t2 = threading.Thread(target=test, args=(conn,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

Got 392 bytes
Got 392 bytes


它之所以行得通的原因是每个线程会创建一个自己专属的套接字连接（存储为 `self.local.sock` ）。 

因此，当不同的线程执行套接字操作时，由于操作的是不同的套接字，因此它们不会相互影响。

## 讨论

当出了问题的时候，通常是因为某个对象被多个线程使用到，用来操作一些专用的系统资源， 比如一个套接字或文件。

你不能让所有线程共享一个单独对象， 因为多个线程同时读和写的时候会产生混乱。

本地线程存储通过让这些资源只能在被使用的线程中可见来解决这个问题。

本节中，使用 `thread.local()` 可以让 `LazyConnection` 类支持一个线程一个连接， 而不是对于所有的进程都只有一个连接。

其原理是，每个 `threading.local()` 实例为每个线程维护着一个单独的实例字典。

所有普通实例操作比如获取、修改和删除值仅仅操作这个字典。 

每个线程使用一个独立的字典就可以保证数据的隔离了。

# 创建一个线程池

`concurrent.futures` 函数库有一个 `ThreadPoolExecutor` 类可以被用来完成这个任务。 下面是一个简单的 `TCP` 服务器，使用了一个线程池来响应客户端：

In [None]:
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
    '''
    Handle a client connection
    '''
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')
    sock.close()

def echo_server(addr):
    pool = ThreadPoolExecutor(128)
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

如果你想手动创建你自己的线程池， 通常可以使用一个 `Queue` 来轻松实现。

下面是一个稍微不同但是手动实现的例子：

In [None]:
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
    '''
    Handle a client connection
    '''
    sock, client_addr = q.get()
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')

    sock.close()

def echo_server(addr, nworkers):
    # Launch the client workers
    q = Queue()
    for n in range(nworkers):
        t = Thread(target=echo_client, args=(q,))
        t.daemon = True
        t.start()

    # Run the server
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        q.put((client_sock, client_addr))

echo_server(('',15000), 128)

# [資料，例子沒有執行][concurrent.futures.ProcessPoolExecutor]简单的并行编程

`concurrent.futures` 库提供了一个 `ProcessPoolExecutor` 类， 可被用来在一个单独的 `Python` 解释器中执行计算密集型函数。

不过，要使用它，你首先要有一些计算密集型的任务。 

我们通过一个简单而实际的例子来演示它。假定你有个 `Apache web` 服务器日志目录的 `gzip` 压缩包：

```
logs/
   20120701.log.gz
   20120702.log.gz
   20120703.log.gz
   20120704.log.gz
   20120705.log.gz
   20120706.log.gz
   ...
```

进一步假设每个日志文件内容类似下面这样：

```
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...
```

下面是一个脚本，在这些日志文件中查找出所有访问过 `robots.txt` 文件的主机：
  
```py
# findrobots.py

import gzip
import io
import glob

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file
    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    for robots in map(find_robots, files):
        all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)
```

前面的程序使用了通常的 `map-reduce` 风格来编写。 

函数 `find_robots()` 在一个文件名集合上做map操作，并将结果汇总为一个单独的结果， 也就是 `find_all_robots()` 函数中的 `all_robots` 集合。

现在，假设你想要修改这个程序让它使用多核 `CPU` 。 很简单——只需要将 `map()` 操作替换为一个 `concurrent.futures` 库中生成的类似操作即可。 

下面是一个简单修改版本：
  
```py
# findrobots.py

import gzip
import io
import glob
from concurrent import futures

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file

    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    with futures.ProcessPoolExecutor() as pool:
        for robots in pool.map(find_robots, files):
            all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
```

## 讨论

`ProcessPoolExecutor` 的典型用法如下：

```py
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as pool:
    ...
    do work in parallel using pool
    ...
```    
    
其原理是，一个 `ProcessPoolExecutor` 创建 `N` 个独立的 `Python` 解释器， `N` 是系统上面可用 `CPU` 的个数。你可以通过提供可选参数给 `ProcessPoolExecutor(N)` 来修改 处理器数量。

这个处理池会一直运行到 `with` 块中最后一个语句执行完成， 然后处理池被关闭。

不过，程序会一直等待直到所有提交的工作被处理完成。

---

被提交到池中的工作必须被定义为一个函数。有两种方法去提交。 如果你想让一个列表推导或一个 `map()` 操作并行执行的话，可使用 `pool.map()` :

```py
# A function that performs a lot of work
def work(x):
    ...
    return result

# Nonparallel code
results = map(work, data)

# Parallel implementation
with ProcessPoolExecutor() as pool:
    results = pool.map(work, data)
```    

---
    
另外，你可以使用 `pool.submit()` 来手动的提交单个任务：

```py
# Some function
def work(x):
    ...
    return result

with ProcessPoolExecutor() as pool:
    ...
    # Example of submitting work to the pool
    future_result = pool.submit(work, arg)

    # Obtaining the result (blocks until done)
    r = future_result.result()
    ...
```

如果你手动提交一个任务，结果是一个 `Future` 实例。 

要获取最终结果，你需要调用它的 `result()` 方法。 

它会阻塞进程直到结果被返回来。

如果不想阻塞，你还可以使用一个回调函数，例如：

```py
def when_done(r):
    print('Got:', r.result())

with ProcessPoolExecutor() as pool:
     future_result = pool.submit(work, arg)
     future_result.add_done_callback(when_done)
```

回调函数接受一个 `Future` 实例，被用来获取最终的结果（比如通过调用它的 `result()` 方法）。 

尽管处理池很容易使用，在设计大程序的时候还是有很多需要注意的地方，如下几点：

* 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。

* 被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。

* 函数参数和返回值必须兼容 `pickle`，因为要使用到进程间的通信，所有解释器之间的交换数据必须被序列化

* 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情，一旦启动你不能控制子进程的任何行为，因此最好保持简单和纯洁——函数不要去修改环境。

在`Unix`上进程池通过调用 `fork()` 系统调用被创建，

它会克隆 `Python` 解释器，包括 `fork` 时的所有程序状态。 

而在 `Windows` 上，克隆解释器时不会克隆状态。 实际的 `fork` 操作会在第一次调用 `pool.map()` 或 `pool.submit()` 后发生。

当你混合使用进程池和多线程的时候要特别小心。

你应该在创建任何线程之前先创建并激活进程池（比如在程序启动的 `main` 线程中创建进程池）。




# Python的全局锁问题

在讨论普通的 `GIL` 之前，有一点要强调的是 `GIL` 只会影响到那些严重依赖 `CPU` 的程序（比如计算型的）。

如果你的程序大部分只会涉及到 `I/O` ，比如网络交互，那么使用多线程就很合适， 因为它们大部分时间都在等待。

实际上，你完全可以放心的创建几千个 `Python` 线程， 现代操作系统运行这么多线程没有任何压力，没啥可担心的。

而对于依赖 `CPU` 的程序，你需要弄清楚执行的计算的特点。 

例如，优化底层算法要比使用多线程运行快得多。 

类似的，由于 `Python` 是解释执行的，如果你将那些性能瓶颈代码移到一个 `C` 语言扩展模块中， 速度也会提升的很快。

如果你要操作数组，那么使用 `NumPy` 这样的扩展会非常的高效。

---

还有一点要注意的是，线程不是专门用来优化性能的。

一个 `CPU` 依赖型程序可能会使用线程来管理一个图形用户界面、一个网络连接或其他服务。 这时候， `GIL` 会产生一些问题，因为如果一个线程长期持有 `GIL` 的话会导致其他非 `CPU` 型线程一直等待。 

事实上，一个写的不好的 `C` 语言扩展会导致这个问题更加严重， 尽管代码的计算部分会比之前运行的更快些。

---

**multiprocessing**

现在想说的是我们有两种策略来解决 `GIL`  的缺点。 首先，如果你完全工作于 `Python` 环境中，你可以使用 `multiprocessing` 模块来创建一个进程池， 并像协同处理器一样的使用它。

例如，假如你有如下的线程代码：

```py
# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = some_work(args)
    ...
```

修改代码，使用进程池：

```py
# Processing pool (see below for initiazation)
pool = None

# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = pool.apply(some_work, (args))
        ...

# Initiaze the pool
if __name__ == '__main__':
    import multiprocessing
    pool = multiprocessing.Pool()
```

这个通过使用一个技巧利用进程池解决了 `GIL` 的问题。

当一个线程想要执行 `CPU` 密集型工作时，会将任务发给进程池。 

然后进程池会在另外一个进程中启动一个单独的 `Python` 解释器来工作。 

当线程等待结果的时候会释放 `GIL`。 

并且，由于计算任务在单独解释器中执行，那么就不会受限于 `GIL` 了。

如果你准备使用一个处理器池，注意的是这样做涉及到数据序列化和在不同 `Python` 解释器通信。 

被执行的操作需要放在一个通过 `def` 语句定义的 `Python` 函数中，不能是 `lambda` 、闭包可调用实例等， 并且函数参数和返回值必须要兼容 `pickle` 。 

同样， **要执行的任务量必须足够大以弥补额外的通信开销。**

另外一个难点是当混合使用线程和进程池的时候会让你很头疼。 

如果你要同时使用两者，最好在程序启动时， **创建任何线程之前先创建一个单例的进程池** 。 然后线程使用同样的进程池来进行它们的计算密集型工作。


---

**C扩展**

另外一个解决 `GIL` 的策略是使用 `C` 扩展编程技术。 主要思想是将计算密集型任务转移给 `C` ，跟 `Python` 独立，在工作的时候在 `C` 代码中释放 `GIL` 。 

这可以通过在 `C` 代码中插入下面这样的特殊宏来完成：

```py
#include "Python.h"
...

PyObject *pyfunc(PyObject *self, PyObject *args) {
   ...
   Py_BEGIN_ALLOW_THREADS
   // Threaded C code
   ...
   Py_END_ALLOW_THREADS
   ...
}
```

 `C` 扩展最重要的特征是它们和 `Python` 解释器是保持独立的。 
 
也就是说，如果你准备将 `Python` 中的任务分配到 `C` 中去执行， 你需要确保 `C` 代码的操作跟 `Python` 保持独立， 这就意味着不要使用 `Python数据结构` 以及不要调用 `Python的C API` 。 

另外一个就是你要确保 `C` 扩展所做的工作是足够的，值得你这样做。 也就是说 `C` 扩展担负起了大量的计算任务，而不是少数几个计算。


---

**Cython**

如果你使用其他工具访问 `C` 语言，比如对于 `Cython` 的 `ctypes` 库，你不需要做任何事。 例如， `ctypes` 在调用 `C` 时会自动释放 `GIL` 。

---

这些解决 `GIL` 的方案并不能适用于所有问题。 例如，某些类型的应用程序如果被分解为多个进程处理的话并不能很好的工作， 也不能将它的部分代码改成C语言执行。

# 定义一个Actor任务

`actor` 模式是一种最古老的也是最简单的并行和分布式计算解决方案。

简单来讲，一个 `actor` 就是一个并发执行的任务，只是简单的执行发送给它的消息任务。 

响应这些消息时，它可能还会给其他 `actor` 发送更进一步的消息。 

`actor` 之间的通信是单向和异步的。

因此，消息发送者不知道消息是什么时候被发送， 也不会接收到一个消息已被处理的回应或通知。

结合使用一个线程和一个队列可以很容易的定义 `actor` ，例如：

In [1]:
from queue import Queue
from threading import Thread, Event

# Sentinel used for shutdown
class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()

    def send(self, msg):
        '''
        Send a message to the actor
        '''
        self._mailbox.put(msg)

    def recv(self):
        '''
        Receive an incoming message
        '''
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg

    def close(self):
        '''
        Close the actor, thus shutting it down
        '''
        self.send(ActorExit)

    def start(self):
        '''
        Start concurrent execution
        '''
        self._terminated = Event()
        t = Thread(target=self._bootstrap)

        t.daemon = True
        t.start()

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()

    def join(self):
        self._terminated.wait()

    def run(self):
        '''
        Run method to be implemented by the user
        '''
        while True:
            msg = self.recv()

# Sample ActorTask
class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)

# Sample use
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()

Got: Hello
Got: World


用户可以通过继承 `Actor` 并定义实现自己处理逻辑 `run()` 方法来定义新的 `actor` 。 

`ActorExit` 异常的使用就是用户自定义代码可以在需要的时候来捕获终止请求 （异常被 `get()` 方法抛出并传播出去）。

如果你放宽对于同步和异步消息发送的要求， 类 `actor` 对象还可以通过生成器来简化定义。

例如：

In [2]:
def print_actor():
    while True:
        try:
            # Get a message
            msg = yield      
            print('Got:', msg)
        except GeneratorExit:
            print('Actor terminating')

# Sample use
p = print_actor()

# Advance to the yield (ready to receive)
next(p)     

p.send('Hello')
p.send('World')
p.close()

Got: Hello
Got: World
Actor terminating


RuntimeError: generator ignored GeneratorExit

## 讨论

`actor` 模式的魅力就在于它的简单性。 

实际上，这里仅仅只有一个核心操作 `send()` . 甚至，对于在基于 `actor` 系统中的“消息”的泛化概念可以已多种方式被扩展。

例如，你可以以元组形式传递标签消息，让 `actor` 执行不同的操作，如下：

In [2]:
from queue import Queue
from threading import Thread, Event

# Sentinel used for shutdown
class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()

    def send(self, msg):
        '''
        Send a message to the actor
        '''
        self._mailbox.put(msg)

    def recv(self):
        '''
        Receive an incoming message
        '''
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg

    def close(self):
        '''
        Close the actor, thus shutting it down
        '''
        self.send(ActorExit)

    def start(self):
        '''
        Start concurrent execution
        '''
        self._terminated = Event()
        t = Thread(target=self._bootstrap)

        t.daemon = True
        t.start()

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()

    def join(self):
        self._terminated.wait()

    def run(self):
        '''
        Run method to be implemented by the user
        '''
        while True:
            msg = self.recv()


class TaggedActor(Actor):
    def run(self):
        while True:
            tag, *payload = self.recv()
            getattr(self,'do_'+tag)(*payload)

    # Methods correponding to different message tags
    def do_A(self, x):
        print('Running A', x)

    def do_B(self, x, y):
        print('Running B', x, y)

# Example
a = TaggedActor()
a.start()
a.send(('A', 1))      # Invokes do_A(1)
a.send(('B', 2, 3))   # Invokes do_B(2,3)
a.close()


Running A 1
Running B 2 3


作为另外一个例子，下面的 `actor` 允许在一个工作者中运行任意的函数， 并且通过一个特殊的 `Result` 对象返回结果：

In [None]:
from threading import Event
class Result:
    def __init__(self):
        self._evt = Event()
        self._result = None

    def set_result(self, value):
        self._result = value

        self._evt.set()

    def result(self):
        self._evt.wait()
        return self._result

class Worker(Actor):
    def submit(self, func, *args, **kwargs):
        r = Result()
        self.send((func, args, kwargs, r))
        return r

    def run(self):
        while True:
            func, args, kwargs, r = self.recv()
            r.set_result(func(*args, **kwargs))

# Example use
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 3)
print(r.result())

最后，“发送”一个任务消息的概念可以被扩展到多进程甚至是大型分布式系统中去。

例如，一个类 `actor` 对象的 `send()` 方法可以被编程让它能在一个套接字连接上传输数据 或通过某些消息中间件（比如 `AMQP` 、 `ZMQ` 等）来发送。

# 实现消息发布/订阅模型

要实现发布/订阅的消息通信模式， 你通常要引入一个单独的“交换机”或“网关”对象作为所有消息的中介。 

也就是说，不直接将消息从一个任务发送到另一个，而是将其发送给交换机， 然后由交换机将它发送给一个或多个被关联任务。

下面是一个非常简单的交换机实现例子：

In [28]:
from collections import defaultdict

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)

    def detach(self, task):
        self._subscribers.remove(task)

    def send(self, msg):
        for subscriber in self._subscribers:
            subscriber.send(msg)

# Dictionary of all created exchanges
_exchanges = defaultdict(Exchange)

# Return the Exchange instance associated with a given name
def get_exchange(name):
    return _exchanges[name]

一个交换机就是一个普通对象，负责维护一个活跃的订阅者集合，并为绑定、解绑和发送消息提供相应的方法。 

每个交换机通过一个名称定位， `get_exchange()` 通过给定一个名称返回相应的 `Exchange` 实例。

下面是一个简单例子，演示了如何使用一个交换机：

In [29]:
# Example of a task.  Any object with a send() method

class Task:
    def __init__(self,Name):
        self.Name = Name
    
    def send(self, msg):
        print(self.Name ,msg)

task_a = Task("A")
task_b = Task("B")

# Example of getting an exchange
exc = get_exchange('name')

# Examples of subscribing tasks to it
exc.attach(task_a)
exc.attach(task_b)

# Example of sending messages
exc.send('msg1')
print("------")
exc.send('msg2')

# Example of unsubscribing
exc.detach(task_a)
exc.detach(task_b)

A msg1
B msg1
------
A msg2
B msg2


## 讨论

通过 *队列发送消息的任务* 或 *线程* 的模式很容易被实现并且也非常普遍。 

不过，使用 **发布/订阅模式** 的好处更加明显。

首先，使用一个交换机可以简化大部分涉及到线程通信的工作。 

无需去写通过多进程模块来操作多个线程，你只需要使用这个交换机来连接它们。 

某种程度上，这个就跟日志模块的工作原理类似。 实际上，它可以轻松的解耦程序中多个任务。

其次，交换机广播消息给多个订阅者的能力带来了一个全新的通信模式。 

例如，你可以使用多任务系统、广播或扇出。 你还可以通过以普通订阅者身份绑定来构建调试和诊断工具。 

例如，下面是一个简单的诊断类，可以显示被发送的消息：

In [5]:
from collections import defaultdict

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)

    def detach(self, task):
        self._subscribers.remove(task)

    def send(self, msg):
        for subscriber in self._subscribers:
            subscriber.send(msg)

# Dictionary of all created exchanges
_exchanges = defaultdict(Exchange)

# Return the Exchange instance associated with a given name
def get_exchange(name):
    return _exchanges[name]

# ---------------------------------------


class DisplayMessages:
    def __init__(self):
        self.count = 0
    def send(self, msg):
        self.count += 1
        print('msg[{}]: {!r}'.format(self.count, msg))

exc = get_exchange('name')
d = DisplayMessages()
exc.attach(d)
exc.send('msg1')
exc.send('msg11')
exc.detach(d)

msg[1]: 'msg1'
msg[2]: 'msg11'


最后，该实现的一个重要特点是它能兼容多个 `“task-like”对象` 。 

例如，消息接受者可以是 `actor（12.10小节介绍）` 、协程、网络连接或任何实现了正确的 `send()` 方法的东西。

关于交换机的一个可能问题是对于订阅者的 `正确绑定和解绑` 。 

为了正确的管理资源，每一个绑定的订阅者必须最终要解绑。 在代码中通常会是像下面这样的模式：

```py
exc = get_exchange('name')
exc.attach(some_task)
try:
    ...
finally:
    exc.detach(some_task)
```

某种意义上，这个和使用文件、锁和类似对象很像。 通常很容易会忘记最后的 detach() 步骤。 为了简化这个，你可以考虑使用上下文管理器协议。 例如，在交换机对象上增加一个 subscribe() 方法，如下：

In [6]:
from contextlib import contextmanager
from collections import defaultdict

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)

    def detach(self, task):
        self._subscribers.remove(task)

    @contextmanager
    def subscribe(self, *tasks):
        for task in tasks:
            self.attach(task)
        try:
            yield
        finally:
            for task in tasks:
                self.detach(task)

    def send(self, msg):
        for subscriber in self._subscribers:
            subscriber.send(msg)

# Dictionary of all created exchanges
_exchanges = defaultdict(Exchange)

# Return the Exchange instance associated with a given name
def get_exchange(name):
    return _exchanges[name]

# --------------------------------------------


class DisplayMessages:
    def __init__(self):
        self.count = 0
    def send(self, msg):
        self.count += 1
        print('msg[{}]: {!r}'.format(self.count, msg))

exc = get_exchange('name')
d = DisplayMessages()

with exc.subscribe(d):
    exc.send('msg1')
    exc.send('msg2')

msg[1]: 'msg1'
msg[2]: 'msg2'


最后还应该注意的是关于交换机的思想有很多种的扩展实现。 

例如，交换机可以实现一整个消息通道集合或提供交换机名称的模式匹配规则。 

交换机还可以被扩展到分布式计算程序中（比如，将消息路由到不同机器上面的任务中去）。

# [yield]使用生成器代替线程

要使用生成器实现自己的并发，你首先要对生成器函数和 `yield` 语句有深刻理解。

`yield` 语句会让一个生成器挂起它的执行，这样就可以编写一个调度器， 将生成器当做某种“任务”并使用任务协作切换来替换它们的执行。 

要演示这种思想，考虑下面两个使用简单的 `yield` 语句的生成器函数：

In [8]:
# Two simple generator functions
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield
        n -= 1
    print('Blastoff!')

def countup(n):
    x = 0
    while x < n:
        print('Counting up', x)
        yield
        x += 1

# =========== 任務調度 ============

from collections import deque

class TaskScheduler:
    def __init__(self):
        self._task_queue = deque()

    def new_task(self, task):
        '''
        Admit a newly started task to the scheduler

        '''
        self._task_queue.append(task)

    def run(self):
        '''
        Run until there are no more tasks
        '''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                # Run until the next yield statement
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # Generator is no longer executing
                pass

# Example use
sched = TaskScheduler()
sched.new_task(countdown(2))
sched.new_task(countdown(3))
sched.new_task(countup(4))
sched.run()

T-minus 2
T-minus 3
Counting up 0
T-minus 1
T-minus 2
Counting up 1
Blastoff!
T-minus 1
Counting up 2
Blastoff!
Counting up 3


生成器函数就是认为，而 `yield` 语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

实际上，你可能想要使用生成器来实现简单的并发。 那么，在实现 `actor` 或网络服务器的时候你可以使用生成器来替代线程的使用。

下面的代码演示了使用 **生成器** 来实现一个不依赖线程的 `actor` ：

In [10]:
from collections import deque

class ActorScheduler:
    def __init__(self):
        self._actors = { }          # Mapping of names to actors
        self._msg_queue = deque()   # Message queue

    def new_actor(self, name, actor):
        '''
        Admit a newly started actor to the scheduler and give it a name
        '''
        self._msg_queue.append((actor,None))
        self._actors[name] = actor

    def send(self, name, msg):
        '''
        Send a message to a named actor
        '''
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor,msg))

    def run(self):
        '''
        Run as long as there are pending messages.
        '''
        while self._msg_queue:
            actor, msg = self._msg_queue.popleft()
            try:
                 actor.send(msg)
            except StopIteration:
                 pass

# Example use
if __name__ == '__main__':
    def printer():
        while True:
            msg = yield
            print('Got:', msg)

    def counter(sched):
        while True:
            # Receive the current count
            n = yield
            if n == 0:
                break
            # Send to the printer task
            sched.send('printer', n)
            # Send the next count to the counter task (recursive)

            sched.send('counter', n-1)

    sched = ActorScheduler()
    # Create the initial actors
    sched.new_actor('printer', printer())
    sched.new_actor('counter', counter(sched))

    # Send an initial message to the counter to initiate
    sched.send('counter', 3)
    sched.run()

Got: 3
Got: 2
Got: 1


## {情境及應用不瞭解}更加高级的例子

下面是一个更加高级的例子，演示了使用生成器来实现一个并发网络应用程序：

In [1]:

from collections import deque
from select import select

# This class represents a generic yield event in the scheduler
class YieldEvent:
    def handle_yield(self, sched, task):
        pass
    def handle_resume(self, sched, task):
        pass

# Task Scheduler
class Scheduler:
    def __init__(self):
        self._numtasks = 0       # Total num of tasks
        self._ready = deque()    # Tasks ready to run
        self._read_waiting = {}  # Tasks waiting to read
        self._write_waiting = {} # Tasks waiting to write

    # Poll for I/O events and restart waiting tasks
    def _iopoll(self):
        rset,wset,eset = select(self._read_waiting,
                                self._write_waiting,[])
        for r in rset:
            evt, task = self._read_waiting.pop(r)
            evt.handle_resume(self, task)
        for w in wset:
            evt, task = self._write_waiting.pop(w)
            evt.handle_resume(self, task)

    def new(self,task):
        '''
        Add a newly started task to the scheduler
        '''

        self._ready.append((task, None))
        self._numtasks += 1

    def add_ready(self, task, msg=None):
        '''
        Append an already started task to the ready queue.
        msg is what to send into the task when it resumes.
        '''
        self._ready.append((task, msg))

    # Add a task to the reading set
    def _read_wait(self, fileno, evt, task):
        self._read_waiting[fileno] = (evt, task)

    # Add a task to the write set
    def _write_wait(self, fileno, evt, task):
        self._write_waiting[fileno] = (evt, task)

    def run(self):
        '''
        Run the task scheduler until there are no tasks
        '''
        while self._numtasks:
            if not self._ready:
                self._iopoll()
            task, msg = self._ready.popleft()
            try:
                # Run the coroutine to the next yield
                r = task.send(msg)
                if isinstance(r, YieldEvent):
                    r.handle_yield(self, task)
                else:
                    raise RuntimeError('unrecognized yield event')
            except StopIteration:
                self._numtasks -= 1

# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
    def __init__(self, sock, nbytes):
        self.sock = sock
        self.nbytes = nbytes
    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(), self, task)
    def handle_resume(self, sched, task):
        data = self.sock.recv(self.nbytes)
        sched.add_ready(task, data)

class WriteSocket(YieldEvent):
    def __init__(self, sock, data):
        self.sock = sock
        self.data = data
    def handle_yield(self, sched, task):

        sched._write_wait(self.sock.fileno(), self, task)
    def handle_resume(self, sched, task):
        nsent = self.sock.send(self.data)
        sched.add_ready(task, nsent)

class AcceptSocket(YieldEvent):
    def __init__(self, sock):
        self.sock = sock
    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(), self, task)
    def handle_resume(self, sched, task):
        r = self.sock.accept()
        sched.add_ready(task, r)

# Wrapper around a socket object for use with yield
class Socket(object):
    def __init__(self, sock):
        self._sock = sock
    def recv(self, maxbytes):
        return ReadSocket(self._sock, maxbytes)
    def send(self, data):
        return WriteSocket(self._sock, data)
    def accept(self):
        return AcceptSocket(self._sock)
    def __getattr__(self, name):
        return getattr(self._sock, name)

if __name__ == '__main__':
    from socket import socket, AF_INET, SOCK_STREAM
    import time

    # Example of a function involving generators.  This should
    # be called using line = yield from readline(sock)
    def readline(sock):
        chars = []
        while True:
            c = yield sock.recv(1)
            if not c:
                break
            chars.append(c)
            if c == b'\n':
                break
        return b''.join(chars)

    # Echo server using generators
    class EchoServer:
        def __init__(self,addr,sched):
            self.sched = sched
            sched.new(self.server_loop(addr))

        def server_loop(self,addr):
            s = Socket(socket(AF_INET,SOCK_STREAM))

            s.bind(addr)
            s.listen(5)
            while True:
                c,a = yield s.accept()
                print('Got connection from ', a)
                self.sched.new(self.client_handler(Socket(c)))

        def client_handler(self,client):
            while True:
                line = yield from readline(client)
                if not line:
                    break
                line = b'GOT:' + line
                while line:
                    nsent = yield client.send(line)
                    line = line[nsent:]
            client.close()
            print('Client closed')

    sched = Scheduler()
    EchoServer(('',16000),sched)
    sched.run()

Writing YieldEvent.py


这段代码有点复杂。不过，它实现了一个小型的操作系统。 

有一个就绪的任务队列，并且还有因 `I/O` 休眠的任务等待区域。 

还有很多调度器负责在就绪队列和 `I/O` 等待区域之间移动任务。

## 讨论

在构建基于生成器的并发框架时，通常会使用更常见的yield形式：

```py
def some_generator():
    ...
    result = yield data
    ...
```    

使用这种形式的 `yield` 语句的函数通常被称为“协程”。 

通过调度器， `yield` 语句在一个循环中被处理，如下：

```py
f = some_generator()

# Initial result. Is None to start since nothing has been computed
result = None
while True:
    try:
        data = f.send(result)
        result = ... do some calculation ...
    except StopIteration:
        break
```

这里的逻辑稍微有点复杂。

不过，被传给 `send()` 的值定义了在 `yield` 语句醒来时的返回值。 

因此，如果一个 `yield` 准备在对之前 `yield` 数据的回应中返回结果时，会在下一次 `send()` 操作返回。 

---

如果一个生成器函数刚开始运行，发送一个 `None` 值会让它排在第一个 `yield` 语句前面。

除了发送值外，还可以在一个生成器上面执行一个 `close()` 方法。 

它会导致在执行 `yield` 语句时抛出一个 `GeneratorExit` 异常，从而终止执行。 

如果进一步设计，一个生成器可以捕获这个异常并执行清理操作。 

同样还可以使用生成器的 `throw()` 方法在 `yield` 语句执行时生成一个任意的执行指令。 

一个任务调度器可利用它来在运行的生成器中处理错误。

---

最后一个例子中使用的 `yield from` 语句被用来实现协程，可以被其它生成器作为子程序或过程来调用。 

本质上就是将控制权透明的传输给新的函数。 

不像普通的生成器，一个使用 `yield from` 被调用的函数可以返回一个作为 `yield from` 语句结果的值。 

---

最后，如果使用生成器编程，要提醒你的是它还是有很多缺点的。

特别是，你得不到任何线程可以提供的好处。

例如，如果你执行 `CPU` 依赖或 `I/O` 阻塞程序， 它会将整个任务挂起知道操作完成。

为了解决这个问题， 你只能选择将操作委派给另外一个可以独立运行的线程或进程。

另外一个限制是大部分 `Python` 库并不能很好的兼容基于生成器的线程。

如果你选择这个方案，你会发现你需要自己改写很多标准库函数。



# 多个线程队列轮询

对于轮询问题的一个常见解决方案中有个很少有人知道的技巧，包含了一个隐藏的回路网络连接。 

本质上讲其思想就是：对于每个你想要轮询的队列，你创建一对连接的套接字。 

然后你在其中一个套接字上面编写代码来标识存在的数据， 另外一个套接字被传给 `select()` 或类似的一个轮询数据到达的函数。

In [1]:
import queue
import socket
import os

class PollableQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        # Create a pair of connected sockets
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            # Compatibility on non-POSIX systems
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        super().put(item)
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return super().get()

在这个代码中，一个新的 `Queue` 实例类型被定义，底层是一个被连接套接字对。

在 `Unix` 机器上的 `socketpair()` 函数能轻松的创建这样的套接字。 

在 `Windows` 上面，你必须使用类似代码来模拟它。 然后定义普通的 `get()` 和 `put()` 方法在这些套接字上面来执行 `I/O` 操作。 

`put()` 方法再将数据放入队列后会写一个单字节到某个套接字中去。 

而 `get()` 方法在从队列中移除一个元素时会从另外一个套接字中读取到这个单字节数据。

`fileno()` 方法使用一个函数比如 `select()` 来让这个队列可以被轮询。 

它仅仅只是暴露了底层被 `get()` 函数使用到的 `socket` 的文件描述符而已。

下面是一个例子，定义了一个为到来的元素监控多个队列的消费者：


In [4]:

import select
import threading

class consumer:
    
    def __init__(self):
        self._running = True

    def stop(self):
        self._running = False
    
    def run(self,queues):
        '''
        Consumer that reads data on multiple queues simultaneously
        '''
        while self._running:
            can_read, _, _ = select.select(queues,[],[])
            for r in can_read:
                item = r.get()
                print('Got:', item)

q1 = PollableQueue()
q2 = PollableQueue()
q3 = PollableQueue()

c = consumer()

t = threading.Thread(target=c.run, args=([q1,q2,q3],))
t.daemon = True
t.start()

# Feed data to the queues
q1.put(1)
q2.put(10)
q3.put('hello')
q2.put(15)


Got: 1
Got: 10
Got: hello
Got: 15


In [5]:
c.stop()

## 讨论

对于轮询非类文件对象，比如队列通常都是比较棘手的问题。 

例如，如果你不使用上面的套接字技术， 你唯一的选择就是编写代码来循环遍历这些队列并使用一个定时器。

像下面这样：

```py
import time
def consumer(queues):
    while True:
        for q in queues:
            if not q.empty():
                item = q.get()
                print('Got:', item)

        # Sleep briefly to avoid 100% CPU
        #time.sleep(0.01)
        time.sleep(0.1)
```

这样做其实不合理，还会引入其他的性能问题。 

例如，如果新的数据被加入到一个队列中，至少要花10毫秒才能被发现。 

如果你之前的轮询还要去轮询其他对象，比如网络套接字那还会有更多问题。 

例如，如果你想同时轮询套接字和队列，你可能要像下面这样使用：

```py
import select

def event_loop(sockets, queues):
    while True:
        # polling with a timeout
        can_read, _, _ = select.select(sockets, [], [], 0.01)
        for r in can_read:
            handle_read(r)
        for q in queues:
            if not q.empty():
                item = q.get()
                print('Got:', item)
```

这个方案通过将队列和套接字等同对待来解决了大部分的问题。 

一个单独的 `select()` 调用可被同时用来轮询。 

使用超时或其他基于时间的机制来执行周期性检查并没有必要。 

甚至，如果数据被加入到一个队列，消费者几乎可以实时的被通知。 

尽管会有一点点底层的 `I/O` 损耗，使用它通常会获得更好的响应时间并简化编程。


# [無法執行]在Unix系统上面启动守护进程

在 windows 上 AttributeError: module 'os' has no attribute 'fork'

無法執行