## 多线程
`threadObj=threading.Thread(target=...,args=...,kwargs=...)  
 threadObj.start()`   
 --创建单独的线程，传入参数args及关键词参数kwargs,运行target

In [1]:
import threading,time
print('Start of program')

def takeANap():
    time.sleep(5)
    print('Wake up!')

threadObj=threading.Thread(target=takeANap)
threadObj.start()

print('End of program')

Start of program
End of program
Wake up!


- `threadObj`创建新的线程，执行`takeANap`函数，延时 5s 再`print('Wake up!')`;  
- 主线程继续执行`print('End of program')`;所以后者反而比前者先执行

In [2]:
if threadObj.is_alive():
    print('still running')
else:
    print('Completed')

Completed


- 判断线程是否运行，此时在主线程中执行

In [3]:
threadObj.join()

- 请求连接(join)到线程，这样做会等待线程结束

In [9]:
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)
        
t = threading.Thread(target=countdown, args=(10,), daemon=True)
t.start()

T-minus 10
T-minus 9
T-minus 8
T-minus 7
T-minus 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


- 守护线程（daemon），需长时间或不断运行的线程
- 守护线程无法被连接，主线程结束后自动销毁

In [12]:
class CountdownTask():
    def __init__(self):
        self._running = True
    def terminate(self):
        self._running = False
    def run(self, n):        
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)
            
c = CountdownTask()
t = threading.Thread(target=c.run, args=(10,))            
t.start()

c.terminate()
t.join()

T-minus 10
T-minus 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


- 终止线程、给线程发信号、调整线程属性等高级操作需要自己构建

In [15]:
class IOTask():
    def terminate(self):
        self._running = False
    def run(self, sock):
        sock.settimeout(5)
        while self._running:
            try:
                data = sock.recv(8192)
                break
            except socket.timeout:
                continue  
        return

- 线程执行阻塞性的操作如I/O，需要加上超时循环

In [19]:
class CountdownThread(threading.Thread):
    def __init__(self, n):
        super().__init__()
        self.n = 0
    def run(self):
        while self.n > 0:
            print('T-minus', n)
            self.n -= 1
            time.sleep(5)
c = CountdownThread(5)            
c.run()

- 继承Thread类，将任务封装为方法
- 待执行的任务和threading库引入了一层依赖关系，导致任务代码**无法被重用**

#### 线程同步问题，使用Event对象
- 线程的核心特征是以非确定性的方式(完全由操作系统调度管理)独立执行
- 判断线程是够已经启动，Event对象的`.set()`方法和`.wait()`方法配合使用;
- 用于一次性的事件。因为安全的清除事件并等待再次设置的过程很难同步协调

In [21]:
from threading import Thread, Event
import time 

start_evt = Event()

def countdown(n, start_evt):
    print('Countdown starting')
    start_evt.set()
    while n > 0:
        print("T-minus", n)
        n -= 1
        time.sleep(5)
        
print("Lauching Countdown")
t = Thread(target=countdown, args=(10, start_evt))
t.start()

start_evt.wait()
print('countdown is running')

Lauching Countdown
Countdown starting
T-minus 10countdown is running

T-minus 9
T-minus 8
T-minus 7
T-minus 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


#### 周期性重复事件，使用Condition对象

In [22]:
import threading
import time

class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()
    def start(self):        
        t = threading.Thread(target=self.run)
        t.daemon = True
        t.start()
    def run(self):        
        while True:
            time.sleep(self._interval)
            with self._cv:
                self._flag ^= 1
                self._cv.notify_all()
    def wait_for_tick(self):
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait()

ptimer = PeriodicTimer(5)
ptimer.start()

def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print("T-minus", nticks)
        nticks -= 1

def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1
        
threading.Thread(target=countdown, args=(10,)).start()                
threading.Thread(target=countup, args=(5,)).start()   

T-minusCounting 0
 10
CountingT-minus 9
 1
CountingT-minus 2
 8
CountingT-minus 7
 3
T-minusCounting 4
 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


### 小项目：多线程下载

In [None]:
# %load scripts\multidownloadXkcd.py
#!python3
# multidownloadXkcd.py - Download XKCD comics using multipile threads.

import requests
import os
import bs4
import threading


os.makedirs('xkcd', exist_ok=True)


def downloadXkcd(startComic, endComic):
    for urlNumber in range(startComic, endComic):
        print('Downloading page http://xkcd.com/%s...' % urlNumber)
        res = requests.get('http://xkcd.com/%s...' % urlNumber)
        res.raise_for_status()

        soup = bs4.BeautifulSoup(res.text)
        comicElem = soup.select('#comic.image')
        if not comicElem:
            print('Could not find comic image.')
        else:
            comicUrl = comicElem[0].get('src')
            print('Downloading images %s...' % comicUrl)
            res = requests.get(comicUrl)
            res.raise_for_status()

            imageFile = open(os.path.join('xkcd', os.path.basename(comicUrl)), 'wb')
            for chunk in res.iter_content(10000):
                imageFile.write(chunk)
            imageFile.close()


downloadThreads = []
for i in range(1, 1400, 100):
    downloadThread = threading.Thread(target=downloadXkcd, args=(i, i+99))
    downloadThreads.append(downloadThread)
    downloadThread.start()

for downloadThread in downloadThreads:
    downloadThread.join()

print('Done')

## 多进程
`concurrent.futures`模块

In [None]:
# %load scripts\makeThumbnail.py
#!python3
# makeThumbnail.py - Creates thumbnails of given images

from PIL import Image
import os
import glob


def make_thumbnail(filename):
    basename, file_extension = os.path.splitext(filename)
    thumbnail_name = f'{basename}_thumbnail{file_extension}'

    image = Image.open(filename)
    image.thumbnail((128, 128))
    image.save(thumbnail_name, 'JPEG')

    return thumbnail_name


for im in glob.glob('*.jpg'):
    thumbnail_file = make_thumbnail(im)
    print(f'A thumbnail for {im} was saved as {thumbnail_file}')

* 上述程序的多进程版本：

In [None]:
# %load scripts\makeThumbnail2.py
#!python3
# makeThumbnial2.py - A updated version of makeThumbnail.py,using parallel computing

import glob
import concurrent.futures
from makeThumbnail import make_thumbnail


with concurrent.futures.ProcessPoolExecutor() as executor:
    images = glob.glob('*.jpg')
    for im, thumbnail_file in zip(images, executor.map(make_thumbnail, images)):
        print((f'A thumbnail for {im} was saved as {thumbnail_file}'))


## 基础知识

* 进程(Process)：执行中的程序，拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。不同进程间采取进程间通信(ICP)的方式共享信息。
* 线程(Thread)：又称轻量级进程(Lightweight Process,LWP),不同线程共享相同的上下文。

- Python代码的执行是由Python虚拟机(又名解释器主循环)进行控制的，在主循环中可以运行多个线程，但在任意给定时刻只能有一个控制线程在执行。只有线程在执行I/O密集型的应用时才能更好发挥Python的并发性
- 由全局解释器锁(GIL)来保证同时只能有一个线程在运行.其机制如下：
    1. 设置GIL  
    2. 切换进一个线程去执行
    3. 执行下面操作之一：     
        a. 指定数量的字节码指令      
        b. 线程主动让出控制权(可以通过调用`time.sleep(0)`来完成)      
    4. 切换出线程
    5. 解锁GIL
    6. 重复上述步骤
- 计算密集型任务适合在多CPU核心上实现并行处理，python线程适合于I/O处理以及涉及阻塞操作的并发执行任务    
    

#### 创建子进程

In [None]:
from multiprocessing import Process
import os
def run_proc(name):
    print('Run child process %s (%s)...'%(name,os.getpid()))

if __name__=='__main__':
    print('Parent process %s.'%os.getpid())
    p=Process(target=run_proc,args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('child process end')

#### 启动大量子进程

In [None]:
from multiprocessing import Pool
import os,time,random

def long_time_task(name):
    print('Run task %s (%s)...'%(name,os.getpid)
    start=time.time()
    time.sleep(random.random()*3)
    end=time.time()
    print('Task %s run %0.2f seconds.'%(name,(end-start)))

if __name__=='__main__':
    print('Parent process %s.'%os.getpid())
    p=Pool(4)
    for i in range(5)
        p.apply_async(long_time_task,args=(i,))
    print('Waiting for all subprocess done...')
    p.close()
    p.join()
    print('All subprocess done')

### 示例：比较多线程及多进程的运行效率
1. 创建计时器，简单测试多线程及线程池

In [8]:
# %load scripts\simpleTimer.py
#!python3
# simpleTimer.py - 简单的计时器,比较多线程、线程池

import time
from functools import wraps
import threading
from multiprocessing.dummy import Pool


def fn_timer(func):
    @wraps(func)
    def func_timer(*args, **kwargs):
        t0 = time.time()
        results = func(*args, **kwargs)
        t1 = time.time()
        print(f'[finished function:{func.__name__} in {t1-t0}s]')
        return results
    return func_timer


def music(name):
    print(f"I'm listening to music {name}")
    time.sleep(1)


def movie(name):
    print(f"I'm watching movie {name}")
    time.sleep(5)


@fn_timer
def single_thread():
    for i in range(10):
        music(i)
    for i in range(2):
        movie(i)


@fn_timer
def multi_thread():
    threads = []
    for i in range(10):
        threads.append(threading.Thread(target=music, args=(i,)))
    for i in range(2):
        threads.append(threading.Thread(target=movie, args=(i,)))
    for t in threads:
        t.setDaemon(True)
        t.start()
    for t in threads:
        t.join()


@fn_timer
def use_pool():
    pool = Pool()
    pool.map(music, range(10))
    pool.map(movie, range(2))
    pool.close()
    pool.join()


if __name__ == '__main__':
    single_thread()
    multi_thread()
    use_pool()

I'm listening to music 0
I'm listening to music 1
I'm listening to music 2
I'm listening to music 3
I'm listening to music 4
I'm listening to music 5
I'm listening to music 6
I'm listening to music 7
I'm listening to music 8
I'm listening to music 9
I'm watching movie 0
I'm watching movie 1
[finished function:single_thread in 20.087326049804688s]
I'm listening to music 0
I'm listening to music 1
I'm listening to music 2
I'm listening to music 3
I'm listening to music 4
I'm listening to music 5
I'm listening to music 6
I'm listening to music 7
I'm listening to music 8
I'm listening to music 9
I'm watching movie 0
I'm watching movie 1
[finished function:multi_thread in 5.125583171844482s]
I'm listening to music 0
I'm listening to music 1I'm listening to music 2
I'm listening to music 3

I'm listening to music 4
I'm listening to music 5
I'm listening to music 6I'm listening to music 7

I'm listening to music 8
I'm listening to music 9
I'm watching movie 0I'm watching movie 1

[finished fu

2. 爬虫测试对比多线程及进程池

In [None]:
# %load scripts\multiThreads.py
#!python3
# multiThreads.py - 爬虫实例比较单线程与多线程


import threading
import time
from multiprocessing.dummy import Pool
import requests
from simpleTimer import fn_timer


urls = ['https://baike.baidu.com/item/%E8%87%AA%E7%94%B1%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E8%AE%A1%E7%AE%97%E6%9C%BA%E7%A8%8B%E5%BA%8F%E8%AE%BE%E8%AE%A1%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/%E5%9F%BA%E9%87%91%E4%BC%9A',
        'https://baike.baidu.com/item/%E5%88%9B%E6%96%B02.0',
        'https://baike.baidu.com/item/%E5%95%86%E4%B8%9A%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E5%BC%80%E6%94%BE%E6%BA%90%E4%BB%A3%E7%A0%81',
        'https://baike.baidu.com/item/OpenBSD',
        'https://baike.baidu.com/item/%E8%A7%A3%E9%87%8A%E5%99%A8',
        'https://baike.baidu.com/item/%E7%A8%8B%E5%BA%8F/71525',
        'https://baike.baidu.com/item/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/C%2B%2B',
        'https://baike.baidu.com/item/%E8%B7%A8%E5%B9%B3%E5%8F%B0',
        'https://baike.baidu.com/item/Web/150564',
        'https://baike.baidu.com/item/%E7%88%B1%E5%A5%BD%E8%80%85',
        'https://baike.baidu.com/item/%E6%95%99%E5%AD%A6',
        'https://baike.baidu.com/item/Unix%20shell',
        'https://baike.baidu.com/item/TIOBE',
        'https://baike.baidu.com/item/%E8%AF%BE%E7%A8%8B',
        'https://baike.baidu.com/item/MATLAB',
        'https://baike.baidu.com/item/Perl']



@fn_timer
def download_using_single_thread(urls):
    resps = []
    for url in urls:
        resp = requests.get(url, allow_redirects=False)
        resps.append(resp)
    return resps


@fn_timer
def download_using_multithread(urls):
    threads = []
    for url in urls:
        threads.append(threading.Thread(target=requests.get, args=(url,), kwargs={'allow_redirects': False}))
    for t in threads:
        t.setDaemon(True)
        t.start()
    for t in threads:
        t.join()
    return threads


@fn_timer
def download_using_pool(urls):
    pool = Pool()
    reps = pool.map(requests.get, urls)
    pool.close()
    pool.join()
    return reps


def main():
    download_using_single_thread(urls)
    download_using_multithread(urls)



if __name__ == '__main__':
    main()

3. 比较多进程及进程池

In [None]:
# %load scripts\multiProcesses.py
#!python3
# multiProcesses.py - 多进程及进程池的使用

import os
import time
import random
from multiprocessing import Process, Pool, Queue
from simpleTimer import fn_timer


@fn_timer
def do_sample_task(name):
    print(f'Run child process {os.getpid()},task name is {name}')
    time.sleep(1.2)
    return(name)


@fn_timer
def test_simple_multi_process():
    p1 = Process(target=do_sample_task, args=('task1',))
    p2 = Process(target=do_sample_task, args=('task2',))
    print('Process will start')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('Process end')


@fn_timer
def test_process_pool():
    pool = Pool()
    results = []
    task_name = []
    for i in range(7):
        task_name.append(f'task{i}')
    results = pool.map_async(do_sample_task, task_name)
    print('Many Process will start...')
    pool.close()
    pool.join()
    print(f'All processes end,results is:{results.get()}')


def main():
    test_simple_multi_process()
    print('\n\n\n')
    test_process_pool()

if __name__ == '__main__':
    main()

4. 进程间通信

In [None]:
# %load scripts\processCommunication.py
#!python3
# processCommunication.py - 测试进程间的通信

import os
import time
import random
from multiprocessing import Pool, Process, Queue
from simpleTimer import fn_timer


def write(q):
    for value in ['A','B','C']:
        print(f'Put value:{value} to queue')
        q.put(value)
        time.sleep(random.random())


def read(q):
    while True:
        value = q.get(True)
        print(f'Get value:{value} from queue')


def test_communication():
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()


def main():
    test_communication()

if __name__ == '__main__':
    main()

5. 协程的用法

### 示例

1. 简单的单线程执行循环

In [None]:
# %load scripts\onethr.py
#!python3
# onethr.py - 不利用多线程的示例

from time import sleep, ctime


def loop0():
    print(f'start loop 0 at:{ctime()}')
    sleep(4)
    print(f'loop 0 done at:{ctime()}')


def loop1():
    print(f'start loop 1 at:{ctime()}')
    sleep(2)
    print(f'loop 1 done at:{ctime()}')


def main():
    print(f'starting at:{ctime()}')
    loop0()
    loop1()
    print(f'all done at:{ctime()}')


if __name__ == '__main__':
    main()

2. 创建Thread实例，传给它一个函数

In [None]:
# %load scripts\mtsleepC.py
#!python3
# mtsleepC.py - 多线程循环,传给Thread实例一个函数

import threading
from time import sleep, ctime

loops = [4, 2]


def loop(nloop, nsec):
    print(f'start loop {nloop} at:{ctime()}')
    sleep(nsec)
    print(f'loop {nloop} done at:{ctime()}')


def main():
    print(f'starting at:{ctime()}')
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()
    print(f'all done at:{ctime()}')


if __name__ == '__main__':
    main()

3. 创建Thread实例，传给它一个类实例，更通用

In [None]:
# %load scripts\mtsleepD.py
#!python3
# mtsleepD.py - 创建Thread实例，传给它一个可调用的类实例

import threading
from time import sleep, ctime

loops = [4, 2]


class ThreadFunc(object):
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        self.func(*self.args)


def loop(nloop, nsec):
    print(f'start loop {nloop} at:{ctime()}')
    sleep(nsec)
    print(f'loop {nloop} done at:{ctime()}')


def main():
    print(f'starting at:{ctime()}')
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()
    print(f'all done at:{ctime()}')

if __name__ == '__main__':
    main()



4. Thread子类化，而不是直接对其实例化，定制线程对象

In [None]:
# %load scripts\mtsleepE.py
#!python3
# mtsleepE.py - Thread子类化，而不是直接对其实例化，定制线程对象

import threading
from time import sleep, ctime

loops = (4,2)


class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.func = func
        self.name = name
        self.args = args

    def run(self):
        self.func(*args)


def loop(nloop, nsec):
    print(f'start loop {nloop} at:{ctime()}')
    sleep(nsec)
    print(f'loop {nloop} done at:{ctime()}')


def main():
    print(f'starting at:{ctime()}')
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()
    print(f'all done at:{ctime()}')


if __name__ == '__main__':
    main()


5. MyThread作为专门的模块，并添加功能，使其更通用

In [None]:
# %load scripts\MyThread.py
#!python3
# MyThread.py - MyThread作为专门的模块，并添加功能，使其更通用

import threading
from time import ctime


class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        print('starting ', self.name, 'at:', ctime())
        self.res = self.func(*self.args)
        print(self.name, ' finished at:', ctime())

### 单线程与多线程比较

In [None]:
# %load scripts\mtfacfib.py
#!python3
# mtfacfib.py - 单线程与多线程分别执行斐波那契、阶乘与累加

from MyThread import MyThread
from time import ctime, sleep


def fib(x):
    sleep(0.05)
    if x < 2:
        return 1
    return fib(x-2)+fib(x-1)


def fac(x):
    sleep(0.1)
    if x < 2:
        return 1
    return x*fac(x-1)


def my_sum(x):
    sleep(0.1)
    if x < 2:
        return 1
    return x+my_sum(x-1)


func = [fib, fac, my_sum]
n = 120

def main():
    nfuncs = range(len(func))
    print('*** SINGLE THREAD')
    for i in nfuncs:
        print('Starting ', func[i].__name__, 'at:', ctime())
        print(func[i](n))
        print(func[i].__name__, 'finished at:', ctime())

    print('\n***MULTIPILE THREADs')
    threads = []
    for i in nfuncs:
        t = MyThread(func[i], (n,), func[i].__name__)
        threads.append(t)
    for i in nfuncs:
        threads[i].start()
    for i in nfuncs:
        threads[i].join()
        print(threads[i].getResult())
    print('ALL DONE')


if __name__ == '__main__':
    main()

## 多线程实践

### 图书排名查询

In [None]:
# %load scripts\bookrank.py
#!python3
# bookrank.py - 多线程下载图书排名信息调用

from atexit import register
from re import compile
from threading import Thread
from time import ctime
from urllib.request import urlopen

regex = compile('#([\d,]+) in Books')
amzn = 'http://amazon.com/dp/'
isbns = {'0132269937': 'Core Python Programming',
         '0132356139': 'Python Web Development with Django',
         '0137143419': 'Python Fundamentals'}


def get_ranking(isbn):
    page = urlopen('%s%s'%(amzn, isbn))
    data = page.read()
    page.close()
    return regex.findall(data)[0]


def _show_ranking(isbn):
    print('- %r ranked %s'%(isbns[isbn], get_ranking(isbn)))


def _main():
    print('At ', ctime(), ' on Amazon...')
    for isbn in isbns:
        Thread(target=_show_ranking, args=(isbn,)).start()
        

@register()
def _atexit():
    print('all Done at:', ctime())


if __name__ == '__main__':
    _main()


- **同步**：  
    1. 多线程代码中，通常有特定代码块或函数不应该被多个线程同时执行，包括修改数据库、更新文件或其它会产生竞态条件`(race condition)`的类似情况
    2. 当任意数量的线程可以访问临界区`(critical section)`的代码时，但在给定时刻只有一个线程可以通过，使用同步操作   
- 同步原语：
    1. 锁：`Lock().acquire(); Lock().release()`
    2. 信号量：

1. 产生竞态条件(race condition)的情况

In [None]:
# %load scripts\mtsleepF.py
#!python3
# mtsleepF.py - 产生竞态条件(race condition)的情况


from atexit import register
from threading import Thread, current_thread
from time import ctime, sleep
from random import randrange


class CleanOutSet(set):
    def __str__(self):
        return ','.join(x for x in self)


loops = (randrange(2, 5) for x in range(randrange(3, 7)))
remaining = CleanOutSet()


def loop(nsec):
    myname = current_thread().name
    remaining.add(myname)
    print(f'{ctime()} started {myname}')
    sleep(nsec)
    remaining.remove(myname)
    print(f'{ctime()} completed {myname} at {nsec}s')
    print('remaining:%s'%(remaining or 'None'))


def _main():
    for pause in loops:
        Thread(target=loop, args=(pause,)).start()


@register
def _atexit():
    print('All done at:', ctime())

if __name__ == '__main__':
    _main()

> `remaining`参数为可变参数，多个线程同时对其进行操作，最终`print`输出结果不规则

2. 使用锁避免输出混乱，获得锁才能进行相应操作

In [None]:
lock = Lock()

def loop(nsec):
    myname = current_thread().name

    lock.acquire()
    remaining.add(myname)
    print(f'{ctime()} started {myname}')
    lock.release()

    sleep(nsec)

    lock.acquire()
    remaining.remove(myname)
    print(f'{ctime()} completed {myname} at {nsec}s')
    print('remaining:%s'%(remaining or 'None'))
    lock.release()

3. 使用上下文管理器

In [8]:
def loop(nsec):
    myname = current_thread().name
    with lock:
        remaining.add(myname)
        print(f'{ctime()} started {myname}')
    sleep(nsec)
    with lock:
        remaining.remove(myname)
        print(f'{ctime()} completed {myname} at {nsec}s')
        print('remaining:%s'%(remaining or 'None'))

4. 使用信号量  
信号量为一个计时器，资源消耗时递减，资源释放时递增

In [None]:
# %load scripts\candy.py
#!python3
# candy.py - 利用锁和信号量来模拟糖果机。该糖果机有5个可用的卡槽保存糖果，所有的槽满了，就不能添加糖果；都空了，就不能买到糖果

from atexit import register
from random import randrange
from threading import BoundedSemaphore, Lock, Thread
from time import sleep, ctime

lock = Lock()
max = 5
candytray = BoundedSemaphore(max)


def refill():
    lock.acquire()
    print('Refilling candy...')
    try:
        candytray.release()
    except ValueError:
        print('full,skipping')
    else:
        print('OK')
    lock.release()


def buy():
    lock.acquire()
    print('Buying candy...')
    if candytray.acquire(False):
        print('OK')
    else:
        print('empty,skipping')
    lock.release()

def producer(loops):
    for i in range(loops):
        refill()
        sleep(randrange(3))


def consumer(loops):
    for i in range(loops):
        buy()
        sleep(randrange(3))


def _main():
    print('starting at:', ctime())
    nloops = randrange(2, 6)
    print('THE Candy Machine (full with %d bars!)' % max)
    Thread(target=consumer, args=(randrange(nloops, nloops+max+2),)).start()
    Thread(target=producer, args=(nloops,)).start()


@register
def _atexit():
    print('All done at:', ctime())


if __name__ == '__main__':
    _main()



5. 利用queue模块提供线程间通信机制，相互间发现数据

In [None]:
# %load scripts\prodcons.py
#!python3
# prodcons.py - 使用queue对象，以及随机生产和消费对的商品的数量。生产者和消费者独立并且并发的执行线程

from random import randint
from time import sleep
from queue import Queue
from MyThread import MyThread


def writeQ(queue):
    print('producing object for Q...')
    queue.put('xxx',1)
    print('size now ', queue.qsize())


def readQ(queue):
    val = queue.get(1)
    print('consumed object from Q... size now ', queue.qsize())


def writer(queue, loops):
    for i in range(loops):
        writeQ(queue)
        sleep(randint(1, 3))


def reader(queue, loops):
    for i in range(loops):
        readQ(queue)
        sleep(randint(2, 5))

funcs = [writer, reader]
nfuncs = range(len(funcs))


def main():
    nloops = randint(2, 5)
    q = Queue(32)
    threads = []
    for i in nfuncs:
        t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
        threads.append(t)
    for i in nfuncs:
        threads[i].start()
    for i in nfuncs:
        threads[i].join()
    print('all Done')


if __name__ == '__main__':
    main()

6. 线程的替代方案   
    - `subprocess`模块：派生进程的主要替代方案，可以单纯的执行任务，或通过了标准文件(stdin,stdout,stderr)进行进程间通信
    - `multiprocessing`模块：允许为多核或多CPU派生进程 
    - `concurrent.futures`模块：在‘任务’级别进行操作，无需过分关注同步及进程\线程的管理了

In [None]:
# %load scripts\bookrank3CF.py
#!python3
# bookrank3CF.py - 利用 concurrent.futures模块的图书排名


from concurrent.futures import ThreadPoolExecutor
from re import compile
from time import ctime
from urllib.request import urlopen

regex = compile('#([\d,]+) in Books')
amzn = 'http://amazon.com/dp/'
isbns = {'0132269937': 'Core Python Programming',
         '0132356139': 'Python Web Development with Django',
         '0137143419': 'Python Fundamentals'}


def get_ranking(isbn):
    with urlopen('%s%s' % (amzn, isbn)) as page:
        return str(regex.findall(page)[0], 'utf-8')


def _main():
    print('At ', ctime(), ' on Amazon...')
    with ThreadPoolExecutor(3) as excutor:
        for isbn, ranking in zip(isbns, excutor.map(get_ranking, isbns)):
            print('- %r ranked %s' % (isbns[isbn], ranking))
    print('all Done at:', ctime())


if __name__ == '__main__':
    _main()


## threading 模块

In [16]:
import threading
print([method for method in dir(threading) if not method.startswith('_')])

['Barrier', 'BoundedSemaphore', 'BrokenBarrierError', 'Condition', 'Event', 'Lock', 'RLock', 'Semaphore', 'TIMEOUT_MAX', 'Thread', 'ThreadError', 'Timer', 'WeakSet', 'activeCount', 'active_count', 'currentThread', 'current_thread', 'enumerate', 'get_ident', 'local', 'main_thread', 'setprofile', 'settrace', 'stack_size']


**threading 模块函数**
- `threading.active_count()`：返回当前活动的 Thread 对象的个数，等价于 enumerate()方法返回的列表的长度   <p> 
- `threading.current_thread()` ：返回当前的 Thread 对象  <p> 
   
- ` threading.get_ident()`：返回当前线程的标识符。非零整数，无直接含义。可作为`magic cookie`,例如，用来索引一个线程数据词典    <p> 
  
- `thereading.enumerate()`：返回当前活动的 Thread 对象列表  <p> 
  
- `threading.main_thread()`：返回主进程对象    <p> 
  
- `threading.settrace(func)`：为所有线程设置一个 trace 函数   <p> 
  
- `threading.setprofile(func)`：为所有线程设置一个 profile 函数  <p>   
   
- `threading.stack_size([size])`：返回新创造线程时所使用的stack size<p> 

**threading 模块对象**：
- `class threading.local`：表示线程特有数据     
>    `mydata=threading.local()`    
    `mydata.x=1`
    
- `class threading.Thread(group=None,target=None,name=None,args=(),kwargs={},*,daemeon=None)`：表示在独立的线程中运行的任务

- `class threading.Lock()`
- `class threading.RLock()`
- `class threading.Semaphore()`