# 并发编程

## 启动与停止线程

你要为需要并发执行的代码创建/销毁线程

threading 库可以在单独的线程中执行任何的在 Python 中可以调用的对象。你可
以创建一个 Thread 对象并将你要执行的对象以 target 参数的形式提供给该对象。下面
是一个简单的例子

In [2]:
import time

def countdown(n):
    while n>0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)
        
from threading import Thread
t = Thread(target=countdown, args=(10, ))
t.start()

T-minus 10
T-minus 9
T-minus 8
T-minus 7
T-minus 6


In [5]:
t.is_alive()

True

T-minus 8


In [6]:
t.is_alive()

True

T-minus 7
T-minus 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


In [7]:
t.is_alive()

False

当你创建好一个线程对象后，该对象并不会立即执行，除非你调用它的 start()
方法（当你调用 start() 方法时，它会调用你传递进来的函数，并把你传递进来的参
数传递给该函数）。Python 中的线程会在一个单独的系统级线程中执行（比如说一个
POSIX 线程或者一个 Windows 线程），这些线程将由操作系统来全权管理。线程一旦
启动，将独立执行直到目标函数返回。你可以查询一个线程对象的状态，看它是否还在
执行 t.is_alive()：

你也可以将一个线程加入到当前线程，并等待它终止：

In [None]:
t.join()

Python 解释器直到所有线程都终止前仍保持运行。对于需要长时间运行的线程或
者需要一直运行的后台任务，你应当考虑使用后台线程。例如：

In [None]:
t = Thread(target=countdown, args=(10, ), daemon=True)
t.start()

后台线程无法等待，不过，这些线程会在主线程终止时自动销毁。除了如上所示的
两个操作，并没有太多可以对线程做的事情。你无法结束一个线程，无法给它发送信
号，无法调整它的调度，也无法执行其他高级操作。如果需要这些特性，你需要自己添
加。比如说，如果你需要终止线程，那么这个线程必须通过编程在某个特定点轮询来退
出。你可以像下边这样把线程放入一个类中

In [3]:
import time
from threading import Thread

class CountdownTask:
    def __init__(self):
        self._running = True
        
    def terminate(self):
        self._running = False
        
    def run(self, n):
        while self._running and n > 0: # True and int -> int
            print('T-minus', n)
            n -= 1
            time.sleep(5)
            
c = CountdownTask()
t = Thread(target=c.run, args=(10, ))
t.start()
time.sleep(2)
c.terminate()
time.sleep(1)
t.join()


T-minus 10


如果线程执行一些像 I/O 这样的阻塞操作，那么通过轮询来终止线程将使得线程
之间的协调变得非常棘手。比如，如果一个线程一直阻塞在一个 I/O 操作上，它就永
远无法返回，也就无法检查自己是否已经被结束了。要正确处理这些问题，你需要利用
超时循环来小心操作线程。例子如下：

In [None]:
class IOTask:
    def terminate(self):
        self._running = False
        
    def run(self, sock):
        sock.settimeout(5)
        while self._running:
            try:
                data = sock.recv(8192)
                break
            except socket.timeout:
                continue
        return

由于全局解释锁（GIL）的原因，Python 的线程被限制到同一时刻只允许一个线
程执行这样一个执行模型。所以，Python 的线程更适用于处理 I/O 和其他需要并发执
行的阻塞操作（比如等待 I/O、等待从数据库获取数据等等），而不是需要多处理器并
行的计算密集型任务。

有时你会看到下边这种通过继承 Thread 类来实现的线程：

In [4]:
from threading import Thread

class CountdownThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = 0
    def run(self):
        while self.n > 0:
            print('T-minus', self.n)
            self.n -= 1
            time.sleep(5)
            
c = CountdownThread(5)
c.start()

尽管这样也可以工作，但这使得你的代码依赖于 threading 库，所以你的这些代
码只能在线程上下文中使用。上文所写的那些代码、函数都是与 threading 库无关的，
这样就使得这些代码可以被用在其他的上下文中，可能与线程有关，也可能与线程无
关。比如，你可以通过 multiprocessing 模块在一个单独的进程中执行你的代码

In [7]:
import multiprocessing
# c = CountDownTask(5)
c = CountdownThread(5)
# c = CountdownThread(5)
p = multiprocessing.Process(target=c.run)
p.start()

NameError: name 'CountDownTask' is not defined

再次重申，这段代码仅适用于 CountdownTask 类是以独立于实际的并发手段（多
线程、多进程等等）实现的情况

## 判断线程是否已经启动

你已经启动了一个线程，但是你想知道它是不是真的已经开始运行了

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其
他线程需要通过判断某个线程的状态来确定自己下一步的操作，这时线程同步问题就
会变得非常棘手。为了解决这些问题，我们需要使用 threading 库中的 Event 对象。
Event 对象包含一个可由线程设置的信号标志，它允许线程等待某些事件的发生。在初
始情况下，event 对象中的信号标志被设置为假。如果有线程等待一个 event 对象，而
这个 event 对象的标志为假，那么这个线程将会被一直阻塞直至该标志为真。一个线程
如果将一个 event 对象的信号标志设置为真，它将唤醒所有等待这个 event 对象的线
程。如果一个线程等待一个已经被设置为真的 event 对象，那么它将忽略这个事件，继
续执行。下边的代码展示了如何使用 Event 来协调线程的启动

In [5]:
from threading import Thread, Event
import time

def countdown(n ,started_evt):
    print('countdown starting')
    started_evt.set()  # Set the internal flag to true.
    while n>0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)
        
started_evt = Event()
print('Launching countdown')
t = Thread(target=countdown, args=(10, started_evt))
t.start()

#started_evt.wait()  # Block until the internal flag is true
time.sleep(2)
print('countdown is running')

Launching countdown
countdown starting
T-minus 10
countdown is running
T-minus 7
T-minus 9
T-minus 6
T-minus 8
T-minus 5
T-minus 7
T-minus 4
T-minus 6
T-minus 3
T-minus 5
T-minus 2
T-minus 4
T-minus 1
T-minus 3
T-minus 2
T-minus 1


当你执行这段代码，“countdown is running”总是显示在“countdown starting”之
后显示。这是由于使用 event 来协调线程，使得主线程要等到 countdown() 函数输出
启动信息后，才能继续执行。

event 对象最好单次使用，就是说，你创建一个 event 对象，让某个线程等待这个
对象，一旦这个对象被设置为真，你就应该丢弃它。尽管可以通过 clear() 方法来重
置 event 对象，但是很难确保安全地清理 event 对象并对它重新赋值。很可能会发生错
过事件、死锁或者其他问题（特别是，你无法保证重置 event 对象的代码会在线程再次
等待这个 event 对象之前执行）。如果一个线程需要不停地重复使用 event 对象，你最
好使用 Condition 对象来代替。

下面的代码使用 Condition 对象实现了一个周期定时
器，每当定时器超时的时候，其他线程都可以监测到：

In [7]:
# 没理解
import threading
import time

class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        # Class that implements a condition variable
        # A condition variable allows one or more threads to wait until they are
        # notified by another thread
        self._cv = threading.Condition()
        
    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True
        t.start()
        
    def run(self):
        while True:
            time.sleep(self._interval)
            with self._cv:
                self._flag ^= 1  # ^? 异或 相同为零 相异为一
                # Wake up all threads waiting on this condition
                self._cv.notify_all()
                
    def wait_for_tick(self):
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait() # Wait until notified or until a timeout occurs.
            
ptimer = PeriodicTimer(5)
ptimer.start()

def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus', nticks)
        nticks -= 1
        
def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1
        
threading.Thread(target=countdown, args=(10, )).start()
threading.Thread(target=countup,args=(5, )).start()

T-minusCounting 0
 10
T-minus 10
Counting 1
CountingT-minus 0
 9
T-minusCountingCountingT-minus 2
 8
 1
 9
T-minusCountingCountingT-minus 3
 8
 7
 2
CountingT-minusT-minus 6
 4
 7Counting
 3
CountingT-minusT-minus  5
 6
4
T-minusT-minus 5
 4
T-minusT-minus 3
 4
T-minusT-minus 3
 2
T-minusT-minus 1
 2
T-minus 1


In [22]:
def worker(n, sema):
    sema.acquire()
    print('Working', n)
    
sema = threading.Semaphore(0)
nworkers = 10

for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema))
    t.start()
    

运行上边的代码将会启动一个线程池，但是并没有什么事情发生。这是因为所有的
线程都在等待获取信号量。每次信号量被释放，只有一个线程会被唤醒并执行，示例如
下：

In [23]:
sema.release()

Working 0


In [24]:
sema.release()

Working 1


编写涉及到大量的线程间同步问题的代码会让你痛不欲生。比较合适的方式是使
用队列来进行线程间通信或者每个把线程当作一个 Actor，利用 Actor 模型来控制并
发。下一节将会介绍到队列，而 Actor 模型将在 12.10 节介绍。

### review@2019.10.26 ↑

## 线程间通信

你的程序中有多个线程，你需要在这些线程之间安全地交换信息或数据

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列
了。创建一个被多个线程共享的 Queue 对象，这些线程通过使用 put() 和 get() 操作
来向队列中添加或者删除元素。例如

........