## 创建线程

In [None]:
# 代码在独立线程中执行
import time
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))

# 启动线程
t.start()

## 把线程放入类中

In [5]:
class CountdownTask:
    def __init__(self):
        self._running = True
    
    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)

c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
c.terminate() # 信号终止
t.join()      # 等待实际终止

T-minus 10


## 给线程设置超时

In [6]:
class IOTask:
    def terminate(self):
        self._running = False

    def run(self, sock):
        sock.settimeout(5)        # 设置超时时间
        while self._running:
            # Perform a blocking I/O operation w/ timeout
            try:
                data = sock.recv(8192)
                break
            except socket.timeout:
                continue
            # Continued processing
            ...
        # Terminated
        return

## 通过继承Thread实现线程

In [7]:
from threading import Thread

class CountdownThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = n
    def run(self):
        while self.n > 0:

            print('T-minus', self.n)
            self.n -= 1
            time.sleep(5)

c = CountdownThread(5)
c.start()

T-minus 5


In [9]:
# 小心使用
import multiprocessing
c = CountdownThread(5)
p = multiprocessing.Process(target=c.run)
p.start()

T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


## 线程是否已经启动

In [10]:
from threading import Thread, Event
import time

# 代码在独立线程中执行
def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# 创建将用于发信号启动的事件对象
started_evt = Event()

# 启动线程并传递启动事件
print('Launching countdown')
t = Thread(target=countdown, args=(10,started_evt))
t.start()

# 等待线程启动
# 在主线程得到countdown输出才执行
started_evt.wait()
print('countdown is running')

Launching countdown
countdown starting
T-minus 10
countdown is running


## 周期定时器Condition

In [12]:
import threading
import time

class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()

    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True

        t.start()

    def run(self):
        """
        运行计时器并在每个间隔后通知等待线程
        """
        while True:
            time.sleep(self._interval)
            with self._cv:
                 self._flag ^= 1
                 self._cv.notify_all()

    def wait_for_tick(self):
        """
        等待计时器的下一个滴答
        """
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait()

# 计时器的使用示例
ptimer = PeriodicTimer(5)
ptimer.start()

# 两个在计时器上同步的线程
def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus', nticks)
        nticks -= 1

def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1

threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

## 使用信号量实现单个线程唤醒

In [13]:
def worker(n, sema):
    # 等待发出信号
    sema.acquire()

    # 做一些工作
    print('Working', n)

# 创建一些线程
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema,))
    t.start()

## 线程间的通信

In [None]:
from queue import Queue
from threading import Thread

_sentinel = object()
# A thread that produces data
def producer(out_q):
    pass
    # while running:
    #     Produce some data
        # out_q.put('a')
    # out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
    # Get some data
        data = in_q.get()
        # Process the data
        if data is _sentinel:
            in_q.put(_sentinel)
        print(data)

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

In [1]:
# 堆列
import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()

    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]

## 加锁

In [None]:
import threading

class SharedCounter:
    """
    可以由多个线程共享的计数器对象。
    """
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        """
        通过锁定增加计数器
        """
        with self._value_lock:
             self._value += delta

    def decr(self,delta=1):
        """
        通过锁定减少计数器
        """
        with self._value_lock:
             self._value -= delta

## 限制并发量

In [None]:
from threading import Semaphore
import urllib.request

_fetch_url_sema = Semaphore(5)

def fetch_url(url):
    with _fetch_url_sema:
        return urllib.request.urlopen(url)