#  parallel processing

并行处理:一种运行模式，一台电脑上多个处理器同时执行任务
    
然而对于小任务而言，并行处理实际上增加运行时间，因此处理器间的通信

## 线程与进程 

对于操作系统来说，一个任务就是一个进程(process),使用top查看进程,进程是电脑应用程序执行的实例

有些进程还不止同时干一件事，比如word，可以同时打字、拼写检查等。在一个进程内部，要同时干多件事，就需要同时运行多个'子任务',我们把进程内的这些'子任务'称为线程(thread)

由于每个线程至少要干一件事，所以一个进程至少有一个线程，进程内的多个线程共享内存空间　

当然，真正地同时执行多线程需要多核CPU才可能实现

技术细节:

    1. 进程内的所有线程共享相同的内存空间，然而进程有自己独立的内存空间
    
    2. 线程是轻量级的，有更低的开销相比于进程,生成进程的比生成线程慢一点
    
    3. 线程间共享对象很方便，因为它们共享内存空间，然而进程间需要一些IPC(进程间通信)模型，通常有操作系统提供


并行计算的陷进:

    1. 稀缺环境:
    
        在上面我们谈到不同线程间共享着内存空间，然而当多个线程尝试同时改变一个变量这个稀缺环境就产生了。这时候，线程调度器将随意地在不同线程间进行切换，因此我们无法知道线程改变数据的顺序。为了防止这个现象产生，在修改变量的代码块中放置一个互斥锁，保证在某个时刻只能有一个线程可以修改变量

    2. 饥饿:
    
        一个或者多个线程因为种种原因无法获得所需要的资源，导致一直无法执行的状态，这将导致程序的整体速度下降，这个通常发生在线程调度算法设计欠佳所带来的影响


    3. 死锁:
        
        过度使用互斥锁(mutex lock)有一个缺点，它可能在程序中引入死锁。死锁是一种状态: 如果线程A锁住了记录1并等待记录2，而线程B锁住了记录2并等待记录1，这样两个线程就发生了死锁现象，死锁被认为是饥饿的极端，为了避免这个我们不能引入太多的互斥锁
        
    4. 活锁
        当线程一直在一个循环中运行，然而没有任何进展。这种状态也是由于设计欠佳或者不正确的使用互斥锁导致
        
  
　　活锁指的是任务或者执行者没有被阻塞，由于某些条件没有满足，导致一直重复尝试，失败，尝试，失败。 活锁和死锁的区别在于，处于活锁的实体是在不断的改变状态，所谓的“活”， 而处于死锁的实体表现为等待；活锁有可能自行解开，死锁则不能

### python中的并行计算

我们编写的python程序都是执行单任务的进程，也就是只有一个线程

那么我们怎么同时执行多个任务:

1. 启动多个进程，每个进程虽然只有一个线程，但多个进程可以一块执行多个任务

2. 启动一个进程，在一个进程内启动多个线程，这样多个线程也可以执行多个任务

3. 第三种就是，启动多个进程，每个进程再启动多个线程，这样同时执行的任务就更多了

总结一下，多任务实现的3中方式:

1. 多进程模式
2. 多线程模式
3. 多进程+多线程

线程是最小的执行单元，而进程由至少一个线程组成。如何调度进程和线程，完全由操作系统决定，程序自己不能决定什么时候执行，执行多长时间。

多进程和多线程的程序涉及到同步、数据共享的问题，编写起来更复杂。

### python全局解释锁(GIL,Global Interpreter Lock)

当涉及到python时，有一些特别的地方需要记住:

    我们知道线程共享内存空间，因此必须执行特殊的预防措施以使两个线程不同写入同一个内存位置。
    CPython解释器使用GIL机制解决了这个问题
    
GIL:

    CPython中，GIL是一个互斥锁，防止多个线程同时执行Python字节流，这个锁是必须的，由于CPython的内存管理不是线程安全的
    
    GIL虽然保护了线程安全，但是一定程度上付出了一些代价。GIL有效的在编译水平串行化了指令。工作机制是: 任何线程上的任何函数，特必须获得一个GIL，一次只能有一个线程获得全局锁。解释器最终按顺序运行指令。这个设计让内存管理线程安全，但是这种机制它不能利用多个CPU核
    GIL在单核CPU中不存在太多问题，然而在多核CPU中全局锁将会是一个瓶颈
    
    如果你的程序在其他地方存在更严重的瓶颈，那么GIL导致的瓶颈将变得无关紧要，例如网络、IO、用户交互等。在一些情况下，线程是完全有效的并行化方法。但是受CPU约束的程序，线程化最终让程序变慢

### 多线程使用案例

GUI程序使用线程化让程序响应。如文本编辑器中，一个线程关心用户输入，一个线程负责展示文本，还有一个线程负责拼写检查等，在这里等待用户交互是最大的瓶颈。因此使用多进程不会让程序更快

另一个使用案例是程序受到IO或网络约束。如网页爬取，多个线程同时爬取多个网页,爬取网页时网络是最大的瓶颈

还有一个案例是Tensorflow，使用一个线程池并行转换数据

### 多进程使用案例

当程序占用大量CPU以及不需要任何IO或者用户交互时，使用多进程优于多线程。例如任何仅处理数字的程序多进程明显提高速度

## Threading

In [2]:
import threading
import random
from functools import reduce

def func(number):
    random_list = random.sample(range(1000000), number)
    return reduce(lambda x, y: x*y, random_list)

number = 50000
thread1 = threading.Thread(target=func, args=(number,))
thread2 = threading.Thread(target=func, args=(number,))

# start() 异步地开启线程
thread1.start()
thread2.start()

# join() 等待线程终止然后执行主线程
thread1.join()
thread2.join()


## Multiprocessing

In [3]:
import multiprocessing
import random
from functools import reduce


def func(number):
    random_list = random.sample(range(1000000), number)
    return reduce(lambda x, y: x*y, random_list)

    
number = 50000
process1 = multiprocessing.Process(target=func, args=(number,))
process2 = multiprocessing.Process(target=func, args=(number,))

process1.start()
process2.start()

process1.join()
process2.join()


## 总结

对于CPU-bound,computation-intensive programs,多进程优于多线程

对于IO-bound,user interaction，多线程优于多进程

进程调度由操作系统处理，然而线程调度由python解释器处理

子进程是可以中断和杀死的，然而子线程不能，你必须等待线程结束或者join

## Threading module

### 线程启动

threading.Thread(
    group=None,
    target=None,
    name=None,
    args=(),
    kwargs=None,
    *,
    daemon=None,
)

target 线程调用的对象，就是目标函数
name　为线程起个名字
args 　为目标函数传递实参，元组
kwargs 为目标函数关键字传参，字典

In [4]:
import threading

# 线程程序
def jobs():
    print('i am working')
    print('finished')
    
# 定义线程对象

t = threading.Thread(target=jobs,name="worker")

# 启动
t.start()

i am working
finished


In [5]:
print(help(threading.Thread()))

Help on Thread in module threading object:

class Thread(builtins.object)
 |  Thread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
 |  
 |  A class that represents a thread of control.
 |  
 |  This class can be safely subclassed in a limited fashion. There are two ways
 |  to specify the activity: by passing a callable object to the constructor, or
 |  by overriding the run() method in a subclass.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
 |      This constructor should always be called with keyword arguments. Arguments are:
 |      
 |      *group* should be None; reserved for future extension when a ThreadGroup
 |      class is implemented.
 |      
 |      *target* is the callable object to be invoked by the run()
 |      method. Defaults to None, meaning nothing is called.
 |      
 |      *name* is the thread name. By default, a unique name is constructed of
 |      

### 线程的退出

python没有提供线程退出的方法，线程在下面情况时退出:

1. 线程函数内语句执行完毕
2. 线程函数中抛出异常

In [6]:
import threading
from time import sleep
def worker():
    i = 0
    while i < 5:
        sleep(2)
        print('i am a thread')
        i += 1
t = threading.Thread(target=worker,name="thread")

t.start()

print('==end==')

==end==


### 线程的传参

线程传参本质上就是函数传参

In [7]:
def re_seq(seq):
    
    if isinstance(seq,str):
        print(seq[::-1].translate(str.maketrans('ATCGatcg','TAGCtagc')))
        print(threading.current_thread())
    else:
        exit()
        
t = threading.Thread(target=re_seq,args=('atcgatcg',),name="chenzhi")

t.start()
print(threading.current_thread())

cgatcgat
<Thread(chenzhi, started 140353684367104)>
<_MainThread(MainThread, started 140354176636736)>


### threading的属性和方法

#### active_count()

返回目前活跃的线程数量

In [8]:
print(threading.active_count())

6


#### current_thread()

返回目前线程对象

In [9]:
print(threading.current_thread())

<_MainThread(MainThread, started 140354176636736)>


#### get_ident()

返回目前线程的标识ID

In [10]:
print(threading.get_ident())

140354176636736


#### enumerate()

返回一个list,包含所有活跃的线程

In [11]:
for thread in threading.enumerate():
    print(thread)

<_MainThread(MainThread, started 140354176636736)>
<Thread(Thread-2, started daemon 140354081605376)>
<Heartbeat(Thread-3, started daemon 140354073212672)>
<HistorySavingThread(IPythonHistorySavingThread, started 140354048034560)>
<ParentPollerUnix(Thread-1, started daemon 140354038331136)>
<Thread(thread, started 140353692759808)>


#### main_thread()

返回主线程对象，正常情况下，主线程是python解释器启动所在的线程

In [12]:
print(threading.main_thread())

<_MainThread(MainThread, started 140354176636736)>


### 线程对象的方法

一旦线程对象被创建，start()启动线程,is_alive()检查线程是否存活
join()使得主线程等待子线程结束后才继续执行

### 锁

锁是控制多个线程对共享资源进行访问的工具。通常，锁提供了对共享资源的独占访问，每次只能有一个线程对 Lock 对象加锁，线程在开始访问共享资源之前应先请求获得 Lock 对象。当对共享资源访问完成后，程序释放对 Lock 对象的锁定。



In [13]:
num = 1
lock = threading.Lock()
def add1():
    global num,lock
    lock.acquire()
    for i in range(10):
        num += 1
        print('add1',num)
    lock.release()
    
def add2():
    global num, lock
    lock.acquire()
    for i in range(10):
        num += 10
        print('add2', num)
    lock.release()
    
t1 = threading.Thread(target=add1)
t2 = threading.Thread(target=add2)

t1.start()
t2.start()

t1.join()
t2.join()

add1 2
add1 3
add1 4
add1 5
add1 6
add1 7
add1 8
add1 9
add1 10
add1 11
add2 21
add2 31
add2 41
add2 51
add2 61
add2 71
add2 81
add2 91
add2 101
add2 111


In [14]:
import threading
import time
class Account:
    # 定义构造器
    def __init__(self, account_no, balance):
        # 封装账户编号、账户余额的两个成员变量
        self.account_no = account_no
        self._balance = balance
        self.lock = threading.RLock()
    # 因为账户余额不允许随便修改，所以只为self._balance提供getter方法
    def getBalance(self):
        return self._balance
    
    # 提供一个线程安全的draw()方法来完成取钱操作
    def draw(self, draw_amount):
        # 加锁
        self.lock.acquire()
        try:
            # 账户余额大于取钱数目
            if self._balance >= draw_amount:
                # 吐出钞票
                print(threading.current_thread().name\
                    + "取钱成功！吐出钞票:" + str(draw_amount))
                time.sleep(0.001)
                # 修改余额
                self._balance -= draw_amount
                print("\t余额为: " + str(self._balance))
            else:
                print(threading.current_thread().name\
                    + "取钱失败！余额不足！")
        finally:
            # 修改完成，释放锁
            self.lock.release()

count = Account(1,200)
t1 = threading.Thread(target=count.draw,args=(100,))
t2 = threading.Thread(target=count.draw,args=(200,))

t1.start()
t2.start()

t1.join()
t2.join()

Thread-9取钱成功！吐出钞票:100
	余额为: 100
Thread-10取钱失败！余额不足！


### daemon属性

daemon设置是否为后台线程(又称为守护线程)，它的任务是为其他线程提供服务。后台线程有一个特征，如果所有的前台线程都死亡了，那么后台线程会自动死亡。

python中，在构造对象时，可以设置dameon属性，这个设置必须在start方法前设置好

daemon属性可以不设置，默认是None,主线程默认是False,主线程是程序启动的第一个线程，主线程可以再启动n个子线程

每个进程至少要有一个线程，病作为程序的入口，这个就是主线程

父线程: 如果线程A启动了一个线程B,A就是B的父线程
子线程: B就是A的子线程

#### daemon=False

当daemon为False,父线程在运行完毕后，会等待所有子线程退出才结束程序(非后台线程)

In [15]:
import time

def foo():

    for i in range(3):
        print(f"i={i},foo thread daemon is {threading.current_thread().isDaemon()}")
        time.sleep(1)

t = threading.Thread(target=foo,daemon=False)
t.start()

print(f"Main thread daemon is {threading.current_thread().isDaemon()}")
print("Main thread Exit")

# sleep(1)模拟IO,CPU发现子线程阻塞了，就立即切换到主线程
# 主线程执行到结尾，因为子线程设置了daemon属性为False,这时右切换到子线程
# 子线程执行完毕后，主线程看到子线程退出后也立即退出，整个程序结束

i=0,foo thread daemon is FalseMain thread daemon is False

Main thread Exit


#### daemon=True

当daemon为True时，父线程在运行完毕后，子线程无论是否正在运行，都会伴随主线程一起退出

In [5]:
import threading,time
def foo1():
    for i in range(3):
        print(f"i={i},foo1 thread daemon is {threading.current_thread().isDaemon()}")
        time.sleep(1)
        
t = threading.Thread(target=foo1,daemon=True)
t.start()

print(f"Main thread daemon is {threading.current_thread().isDaemon()}")
print("Main thread Exit")

i=0,foo1 thread daemon is True
Main thread daemon is False
Main thread Exit
i=1,foo1 thread daemon is True
i=2,foo1 thread daemon is True


#### daemon=None

daemon属性可以不设置，默认值是None,该目标线程的值就取父线程的daemon值作为自己的daemon的值

In [6]:
import threading
import time
 
 
def bar():
    while True: # 无限循环的子子线程
        print('【bar】 daemon is {}'.format(threading.current_thread().isDaemon()))
        time.sleep(1)
 
def foo():
    for i in range(3): #启动3个子线程
        print('i={},【foo】 thread daemon is {}'.format(i,threading.current_thread().isDaemon()))
        t1 = threading.Thread(target=bar,daemon=None) 
        t1.start()
 
t = threading.Thread(target=foo,daemon=True)
t.start()
 
print("Main thread daemon is {}".format(threading.current_thread().isDaemon()))
time.sleep(2)
print("Main Thread Exit.")

i=0,【foo】 thread daemon is True
【bar】 daemon is Truei=1,【foo】 thread daemon is True
【bar】 daemon is TrueMain thread daemon is False


i=2,【foo】 thread daemon is True
【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

Main Thread Exit.
【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is True

【bar】 daemon is True
【bar】 daemon is True【bar】 daemon is Tru

### join()方法

使用了join()方法后，daemon线程执行完了，主线程才退出来

一个线程中调用另一个线程的join方法，调用者被阻塞，直至被调用进程终止。

调用谁的join方法，就要等谁

In [4]:
import threading,time
def job():
    for i in range(10):
        print("hello world")
        time.sleep(1)
        
t = threading.Thread(target=job)
t.start()
t.join()

print("==end==")

hello world
==end==
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world


### threading.local

我们知道多线程情况下，每一个线程均可以使用所属进程的全局变量。如果一个线程对全局变量进行了修改，将会影响其他所有的线程。为了避免多个线程同时对变量进行修改，引入线程同步机制，通过互斥锁来控制对全局变量的访问

很多时候线程还需要拥有自己的私有数据，这些数据对其他线程也是不可见的

有时候使用局部变量不太方便，因此python提供了ThreadLocal变量，它本身是一个全局变量，但是每个线程可以用它来保存自己的私有数据

In [12]:
num = 1
l = threading.Lock()
def cal():
    global num,l
    l.acquire()
    for i in range(1000):
        num += 1 
    l.release()
    
threads = []

for i in range(10):
    threads.append(threading.Thread(target=cal))
    threads[i].start()
for i in range(10):
    threads[i].join()
    
print(num)

10001


In [None]:
def show(num):
    print threading.current_thread().getName(), num
def thread_cal():
    local_num = 0
    for _ in xrange(1000):
        local_num += 1
    show(local_num)
threads = []
for i in range(10):
    threads.append(threading.Thread(target=thread_cal))
    threads[i].start()
for i in range(10):
    threads[i].join()
    
print(num)

# 每个线程将自己的局部变量传递给show()函数，但在实际生产环境中，
# 我们可能会调用很多函数，每个函数都需要很多局部变量，这时候传递
# 参数的方式会很不友好

为了更好的解决这个问题，python线程库实现了ThreadLocal变量，真正做到了线程之间的数据隔离

In [None]:
data = threading.local()
def show(num):
    print threading.current_thread().getName(), num
def thread_cal():
    data.num = 0
    for _ in xrange(1000):
        data.num += 1
    show(data.num)
    
threads = []
for i in range(10):
    threads.append(threading.Thread(target=thread_cal))
    threads[i].start()
for i in range(10):
    threads[i].join()

## Multiprocessing

Queue \ Pipe 只是实现进程间数据的传递

Manager 实现了进程间数据的共享，即多个进程可以修改同一份数据

Lock 进程锁，主要用在输出到屏幕的时候独占屏幕，即多份数据不会出现打印一半就去打印别的

In [13]:
from multiprocessing import Pool

def f(x):
    return x*x

with Pool(5) as p:
    print(p.map(f,[1,2,3]))

[1, 4, 9]


In [14]:
#查看机器CPU数量
import multiprocessing
multiprocessing.cpu_count()

4

### Process

创建进程，参数形式与multithreading.Thread()一样

#### 方法

##### is_alive()

In [54]:
def fun2():
    print('hello, i am a child process')
    print(multiprocessing.current_process().is_alive())
    
p = multiprocessing.Process(target=fun2)

p.start()

p.join()
print(p.is_alive())

hello, i am a child process
True
False


In [5]:
import os 
import multiprocessing

def foo():
    print(f"父进程id: {os.getppid()}")
    print(f"当前子进程id: {os.getpid()}")
    print('-----------------')
    
for _ in range(4):
    p = multiprocessing.Process(target=foo)
    p.start()

父进程id: 3677
父进程id: 3677
当前子进程id: 4515
-----------------
父进程id: 3677
当前子进程id: 4514
当前子进程id: 4518
-----------------
-----------------
父进程id: 3677
当前子进程id: 4523
-----------------


#### 属性

authkey,name,pid,daemon

In [None]:
def fun2():
    print('hello, i am a child process')
    print(multiprocessing.current_process().is_alive())
    print(multiprocessing.current_process().name)
p = multiprocessing.Process(target=fun2)

p.start()

p.join()

#### 进程定义为类

In [58]:
import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval
    # start()方法自动调用run()
    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1


p = ClockProcess(3)
p.start() # 调用run()



the time is Fri Jan 31 20:09:54 2020
the time is Fri Jan 31 20:09:57 2020
the time is Fri Jan 31 20:10:00 2020
the time is Fri Jan 31 20:10:03 2020
the time is Fri Jan 31 20:10:06 2020


In [16]:
def testing():
    print('works')
    
def square(n):
    print('the number squares to',n**2)
    
def cube(n):
    print(f"the number cubes to {n**3}")
    
p1 = multiprocessing.Process(target=square,args=(7,))
p2 = multiprocessing.Process(target=cube,args=(5,))
p3 = multiprocessing.Process(target=testing)

p1.start()
p2.start()
p3.start()

p1.join()
p2.join()
p3.join()

print('we are done')

the number squares to 49
the number cubes to 125
works
we are done


In [18]:
def prin(string = "asia"):
    print(f"the name of continent is: {string}")
    
pro = []
name = ['africa','latein']

pro2 = multiprocessing.Process(target=prin)
pro2.start()
pro.append(pro2)
for ne in name:
    pro1 = multiprocessing.Process(target=prin,args=(ne,))
    pro.append(pro1)
    pro1.start()
    
for p in pro:
    p.join()

the name of continent is: asia
the name of continent is: africa
the name of continent is: latein


### 进程间交互对象

multiprocessing提供两个类型的进程间的交流通道｀

#### Queues

In [None]:
from multiprocessing import Process,Queue

def f(q):
    q.put([42,None,"hello"])
    
q = Queue()
p = Process(target=f,args=(q,))
p.start()
print(q.get())
p.join()

#### Pipes

Pipe()函数返回通过pipe进程间的连接对象

类似于电话连线，pipe两端相当于两个进程，一端send()发送数据，另一端recv()接收数据

In [16]:
from multiprocessing import Process,Pipe

def f(conn):
    conn.send([24,"hello"])
    conn.close()


parent_conn,child_conn = Pipe()
p = Process(target=f,args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
print('hello')

[24, 'hello']
hello


### 进程间同步

使用锁,使得多线程同步编程，一次只能一个进行运行

In [27]:
from multiprocessing import Process,Lock

def f(l,i):
    l.acquire()
    try:
        print("hello",i)
    finally:
        l.release()
        
lock = Lock()

for num in range(10):
    Process(target=f,args=(lock,num)).start()

hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 9
hello 8


In [23]:
def f(i):
    print('hello',i)
    
for num in range(4):
    Process(target=f,args=(num,)).start()

hello 0
hello 1
hello 2
hello 3


### 进程间共享变量

在同步编程时，避免使用共享变量，然而在多进程异步编程时可以使用

在python中可以使用使用共享内存和进程服务

Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. 

Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory

#### 共享内存

数据保存在共享内存使用Value或Array

In [29]:
from multiprocessing import Process,Value,Array

def f(n,a):
    n.value = 3.14
    for i in range(len(a)):
        a[i] = -a[i]
        
num = Value('d',0.0)
print(num.value)
arr = Array('i',range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])


0.0
3.14
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


#### 进程服务

通过Manager实现数据共享,Manager会创建一个服务进程，其他的进程都统一来访问这个进程，从而达到多进程之间的数据通信

In [11]:
# 普通list进程间数据不共享
from multiprocessing import Manager,Process

def ap(list):
    list.append("c")
    print(list)
    print(id(list))
li = ["a","b"]

p = multiprocessing.Process(target=ap,args=(li,))

p.start()

p.join()

print(li)
print(id(li))

['a', 'b', 'c']
139949987455856
['a', 'b']
139949987455856


In [12]:
# 使用Manager实现进程间数据共享
m = Manager()
my_list = m.list(["a","b"])

p = multiprocessing.Process(target=ap,args=(my_list,))

p.start()

p.join()

print(my_list)

['a', 'b', 'c']
139949981820496
['a', 'b', 'c']


### Pools

在之前的例子中，我们已经学会如何开启单个进程，但是当涉及到长时间运行的程序时，最好的方式是创建一个进程池(Pool),这样就可以避免每一次有个新任务执行的时候，创建和和销毁一个进程。

使用Pool，会使得程序表现更佳,同时Pool有返回值，而Process没有返回值

在利用Python进行系统管理的时候，特别是同时操作多个文件目录，或者远程控制多台主机，并行操作可以节约大量的时间。当被操作对象数目不大时，可以直接利用multiprocessing中的Process动态成生多个进程，十几个还好，但如果是上百个，上千个目标，手动的去限制进程数量却又太过繁琐，此时可以发挥进程池的功效。
Pool可以提供指定数量的进程，供用户调用，当有新的请求提交到pool中时，如果池还没有满，那么就会创建一个新的进程用来执行该请求；但如果池中的进程数已经达到规定最大值，那么该请求就会等待，直到池中有进程结束，才会创建新的进程来它

#### 常用方法

                  | Multi-args   Concurrence    Blocking     Ordered-results
---------------------------------------------------------------------
Pool.map          | no           yes            yes          yes
Pool.map_async    | no           yes            no           yes
Pool.apply        | yes          no             yes          no
Pool.apply_async  | yes          yes            no           no
Pool.starmap      | yes          yes            no           yes
Pool.stamap_async | yes          yes            no           no

##### 非阻塞Async

multiprocessing中有两类非阻塞进程池方法，apply_async()/map_async(),两者都存在回调

Async methods submit all the processes at once and retrieve the results once they are finished. 

Use get method to obtain the results

###### apply_async()

apply_async(func[,args[,kwds[,callback = None]]])

非阻塞且支持结果返回进行回调，返回结果为AsyncResult对象，你需要使用get()方法获取结果，get()方法阻塞直到进程结束

你也可以在进程结束后将结果赋给回调函数，可以代替get()方法

结果发现主进程直接执行完毕,没有等待子进程,async是真异步.如果要改变成通常状态,需要在所有的任务提交完毕之后,先用pool.close()让进程池不再接受新的任务.此后再用pool.join()来感知进程池中的任务执行结束(进程池中的进程一直存在,但是任务会结束

对于多个子任务，要分别多次调用apply_async()一一添加，不过这可以通过列表解析实现，以让多个进程的结果返回保存在一个列表中

In [42]:
import time
from multiprocessing import Pool
import os


def test(n):
    print('This is fun{} running'.format(n))
    print('My id is', os.getpid())
    time.sleep(1)
    print('This is fun{} ending'.format(n))


if __name__ == '__main__':
    pool = Pool(9)
    for i in range(10):
        pool.apply_async(test, args=(i,))
    pool.close()
    pool.join()

This is fun1 running
This is fun2 running
This is fun3 running
This is fun5 running
This is fun0 running
This is fun7 running
This is fun8 running
This is fun6 running
My id is 30309
My id is 30313
My id is 30306
My id is 30308
This is fun4 running
My id is 30307
My id is 30314
My id is 30312
My id is 30310
My id is 30311
This is fun3 ending
This is fun2 ending
This is fun0 ending
This is fun7 ending
This is fun9 running
This is fun1 ending
This is fun4 ending
This is fun6 ending
This is fun8 ending
My id is 30308
This is fun5 ending
This is fun9 ending


In [69]:
# 使用回调，避免使用get()
from multiprocessing import Pool

def fun(x):
    return x*x
li = []
def callback(x):
    li.append(x)

p = Pool(4)

[p.apply_async(func=fun,
              args=(i,),
               callback = callback) for i in range(6)]

p.close()
p.join()
print(li)

[0, 1, 4, 25, 16, 9]


从输出结果也可以看出，apply_async()输出结果的顺序与参数的顺序不一致，然而map()、map_async()传入结果与参数顺序一致

###### map_async()

map_async(
    func,
    iterable,
    chunksize=None,
    callback=None,
    error_callback=None,
)

如果多个子任务通过同一函数执行，只是参数不同，那么可以把拆分后的参数以列表形式通过iterable传入

非阻塞且支持结果返回进行回调，返回结果为MapResult对象，你需要使用get()方法获取结果，get()方法阻塞直到进程结束

In [84]:

p = Pool(4)

def fun(x):
    return x*x
res = p.map_async(fun,[3,4,5,1])

p.close()
p.join()
print(res)
print(res.get())

<multiprocessing.pool.MapResult object at 0x7f48a4731e50>
[9, 16, 25, 1]


In [85]:
# 使用回调，避免使用get()
from multiprocessing import Pool

def fun(x):
    return x*x
li = []
def callback(x):
    li.append(x)

p = Pool(4)

p.map_async(func = fun,iterable=[3,4,5,1],
               callback = callback)

p.close()
p.join()
print(li)

[[9, 16, 25, 1]]


###### map_async() vs apply_async()

若是通过apply_async()方法，由于是手动指定进程并添加任务，这样每个进程的执行结果之间是独立的，会分别保存，这样的好处在于，尽管可能其中某几个进程出现了错误，抛出异常，但是并不会导致其他的进程也失败，其他的进程并不会受影响，而且当获取这个抛出异常的进程的结果时，还会返回异常信息；但是如果是map_async()方法，其子参数任务并不是独立的，如果其中的某个子参数任务抛出异常，同时也会导致其他的子参数任务停止，也就是说，并不是通过独立线程来执行不同的子参数任务的

两者都可以回调

###### 回调函数

编程分为两类: 系统编程和应用编程

系统编程: 简单来说就是编写库

应用编程: 利用写好的各种库来编写某种功能的程序，即应用

系统程序员会给自己写的库留下一些接口，即API，以供应用程序员使用

当程序跑起来，一般情况下，应用程序会通过API调用库里所预先备好的函数。但是有些库函数却要求应用先传给它一个函数，好在适合时调用。这个被传入、后又被调用的函数就成为回调函数(callback function)

回调函数通常和应用处于同一抽象层，而回调就成了一个高层调用底层，底层再回头调用高层的过程

  需要回调函数的场景：进程池中任何一个任务一旦处理完了，就立即告知主进程：我好了额，你可以处理我的结果了。主进程则调用一个函数去处理该结果，该函数即回调函数
  
  回调函数是在主进程里执行

apply_async 和 map_async 可选参数 callback，将在主进程中调用它来处理返回结果。注意若 callback 耗时太长会阻塞主进程

In [40]:
from multiprocessing import Process,Pool


def a(x):
    print("this is a start")
    print(x)
    print("this is a stop")


def b(num):
    return(num)


if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
    # 这里表示，当b函数执行完成之后就会调用a函数，并且把b函数的返回值传给a函数。
        p.apply_async(b, args=(i,), callback=a)
    p.close()
    p.join()

TypeError: apply() got an unexpected keyword argument 'callback'

##### 阻塞

pool.apply(f, args): f is only executed in ONE of the workers of the pool. So ONE of the processes in the pool will run f(args)

pool.map(f, iterable): This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. So you take advantage of all the processes in the pool

Pool.map(or Pool.apply)methods are very much similar to Python built-in map(or apply). They block the main process until all the processes complete and return the result

###### apply()

使用阻塞方式调用指定方法，上一个进程退出后，下一个进程才运行

In [32]:
from multiprocessing import Pool
import time
import os
import random

def fun(name):
    print(f"running,pid={os.getpid()},parent={os.getppid()}")
    t_start = time.time()
    time.sleep(random.random()*3)
    t_end = time.time()
    print(f"time is {t_end - t_start}")
    
pool=Pool(4)#设置线程池中最大线程数量为5
for _ in range(0,7):
    #非阻塞运行
    pool.apply_async(fun,("mark"+str(id),))
print("--start1--")
pool.close()#关闭线程池，关闭后不再接受进的请求
pool.join()#等待进程池所有进程都执行完毕后，开始执行下面语句
print("--end1--")

pool=Pool(4)#设置线程池中最大线程数量为5
for _ in range(0,7):
    #阻塞运行
    pool.apply(fun,("mark"+str(id),))
print("--start2--")
pool.close()#关闭线程池，关闭后不再接受进的请求
pool.join()#等待进程池所有进程都执行完毕后，开始执行下面语句
print("--end2--")

running,pid=25154,parent=3677
running,pid=25153,parent=3677
running,pid=25155,parent=3677
running,pid=25156,parent=3677
--start1--
time is 0.8222508430480957
running,pid=25153,parent=3677
time is 0.42098402976989746
running,pid=25153,parent=3677
time is 1.5531089305877686
running,pid=25156,parent=3677
time is 1.6559772491455078
time is 0.5978152751922607
time is 2.621100902557373
time is 2.427583932876587
--end1--
running,pid=25188,parent=3677
time is 2.5747923851013184
running,pid=25189,parent=3677
time is 1.2197203636169434
running,pid=25190,parent=3677
time is 0.5291950702667236
running,pid=25191,parent=3677
time is 2.189556121826172
running,pid=25188,parent=3677
time is 0.8408067226409912
running,pid=25189,parent=3677
time is 0.4978172779083252
running,pid=25190,parent=3677
time is 2.132107973098755
--start2--
--end2--


In [54]:
from multiprocessing import Pool

def job(n):
    return n-1

pool = Pool(4)
n = 10
l = []
for i in range(6):
    res = pool.apply_async(func=job,args=(n,)) # 得到线程池中的运算结果
    l.append(res)

print([x.get() for x in l])

[9, 9, 9, 9, 9, 9]


#### map()

map()放入迭代参数，返回多个结果

用map方法执行的时候,默认异步调用,然后自带.close()和.join()方法

In [5]:
import multiprocessing as mp

def func(x):
    return x**x

pool = mp.Pool(2)
result = pool.map(func,range(10))
print(result)

[1, 1, 4, 27, 256, 3125, 46656, 823543, 16777216, 387420489]


In [45]:
def my_func(x):
  print(mp.current_process())
  return x**x


pool = mp.Pool(3)

# map()将第二个参数(可迭代)依次传递给函数
result = pool.map(my_func, [4,2,3,5,3,2,1,2])
result_set_2 = pool.map(my_func, [4,6,5,4,6,3,23,4,6])

print(result)
print(result_set_2)


<ForkProcess(ForkPoolWorker-26, started daemon)>
<ForkProcess(ForkPoolWorker-25, started daemon)>
<ForkProcess(ForkPoolWorker-24, started daemon)>
<ForkProcess(ForkPoolWorker-24, started daemon)>
<ForkProcess(ForkPoolWorker-24, started daemon)>
<ForkProcess(ForkPoolWorker-26, started daemon)>
<ForkProcess(ForkPoolWorker-24, started daemon)>
<ForkProcess(ForkPoolWorker-25, started daemon)>
<ForkProcess(ForkPoolWorker-24, started daemon)>
<ForkProcess(ForkPoolWorker-25, started daemon)>
<ForkProcess(ForkPoolWorker-26, started daemon)>
<ForkProcess(ForkPoolWorker-25, started daemon)>
<ForkProcess(ForkPoolWorker-24, started daemon)>
<ForkProcess(ForkPoolWorker-26, started daemon)>
<ForkProcess(ForkPoolWorker-24, started daemon)>
<ForkProcess(ForkPoolWorker-26, started daemon)>
<ForkProcess(ForkPoolWorker-24, started daemon)>
[256, 4, 27, 3125, 27, 4, 1, 4]
[256, 46656, 3125, 256, 46656, 27, 20880467999847912034355032910567, 256, 46656]


Process ForkPoolWorker-24:
Process ForkPoolWorker-26:
Process ForkPoolWorker-25:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = g

In [None]:
pool = mp.Pool(2)
pool.c

### Queue进程间传递数据

python multiprocessing模块提供Queue类，这是一个先进先出的数据结构，在不同进程间共享数据

In [50]:
from multiprocessing import Queue,Process
import random
def rand_num(queue):
    num = random.random()
    queue.put(num)

queue = Queue()

processes = [Process(target=rand_num, args=(queue,)) for x in range(4)]

for p in processes:
    p.start()

for p in processes:
    p.join()

results = [queue.get() for p in processes]

print(results)

[0.7663637549273589, 0.10272604880025493, 0.15532028967453892, 0.9202729571339653]


In [60]:
import multiprocessing as mp

def job(q):
    res = 0
    for i in range(1000):
        res += i+i**2+i**3
    q.put(res) # queue


q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1+res2)

249833583000
249833583000
499667166000


In [63]:
import multiprocessing as mp
import threading as td
import time

def job(q):
    res = 0
    for i in range(1000000):
        res += i+i**2+i**3
    q.put(res) # queue

def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:' , res1+res2)

def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i+i**2+i**3
    print('normal:', res)

def multithread():
    q = mp.Queue()
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1+res2)


st = time.time()
normal()
st1= time.time()
print('normal time:', st1 - st)
multithread()
st2 = time.time()
print('multithread time:', st2 - st1)
multicore()
print('multicore time:', time.time()-st2)

normal: 499999666667166666000000
normal time: 3.4349758625030518
multithread: 499999666667166666000000
multithread time: 4.513598442077637
multicore: 499999666667166666000000
multicore time: 1.7812421321868896


In [19]:
col = ['red','blue','green']
cnt = 1
queue = multiprocessing.Queue()
print('pushing items to queue:')

for cl in col:
    print(f"item no: {cnt} {cl}")
    queue.put(cl)
    cnt += 1
    
print('\npopping items from queue:')
cnt = 0
while not queue.empty():
    print('item no: ', cnt, ' ', queue.get())
    cnt += 1

pushing items to queue:
item no: 1 red
item no: 2 blue
item no: 3 green

popping items from queue:
item no:  0   red
item no:  1   blue
item no:  2   green


In [64]:
import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.Pool(processes=2)
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    print(res.get())
    multi_res =[pool.apply_async(job, (i,)) for i in range(10)]
    print([res.get() for res in multi_res])


multicore()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


#### queue模块

queue模块定义了四种不同类型的队列，它们之间的区别在于数据入队列之后出队列的顺序不同

##### Queue

先进先出(First In First Out: FIFO)队列，最早进入队列的数据拥有出队列的优先权

queue.Queue([maxsize])

入参 maxsize 是一个整数，用于设置队列的最大长度。一旦队列达到上限，插入数据将会被阻塞，直到有数据出队列之后才可以继续插入。如果 maxsize 设置为小于或等于零，则队列的长度没有限制

In [26]:
import queue

# 构造队列，可以定义容量，也可以不指定，表示无限多
q = queue.Queue(10)

q.put({'name':'chenzhi','age':14})
q.put(['a','b','c'])

print(q.get())
print(q.get())

{'name': 'chenzhi', 'age': 14}
['a', 'b', 'c']


##### LifoQueue

后进先出(Last In First Out: LIFO)队列，最后进入队列的数据拥有出队列的优先权，就像栈一样

In [28]:
q1 = queue.LifoQueue()

for i in range(3):
    q1.put(i)
    
for i in range(3):
    print(q1.get())

2
1
0


##### PriorityQueue

优先级队列，比较队列中每个数据的大小，值最小的数据拥有出队列的优先权。数据一般以元组的形式插入，典型形式为(priority_number, data)。

In [29]:
q2 = queue.PriorityQueue()  # 创建 PriorityQueue 队列
data1 = (1, 'python')
data2 = (2, '-')
data3 = (3, '100')
style = (data2, data3, data1)
for i in style:
    q2.put(i)  # 在队列中依次插入元素 data2、data3、data1
for i in range(3):
    print(q2.get())

(1, 'python')
(2, '-')
(3, '100')


##### SimpleQueue

先进先出类型的简单队列，没有大小限制。由于它是简单队列，相比于 Queue 队列会缺少一些高级功能

In [30]:
import queue
q3 = queue.SimpleQueue()  # 创建 SimpleQueue 队列
for i in range(3):
    q3.put(i)  # 在队列中依次插入0、1、2元素
for i in range(3):
    print(q3.get())

0
1
2


##### 异常

###### queue.Empty

当队列中没有数据元素时，取出队列中的数据会引发 queue.Empty 异常，主要是不正当使用 get() 和 get_nowait() 引起的

In [31]:
import queue
try:
    q = queue.Queue(3)  # 设置队列上限为3
    q.put('python')  # 在队列中插入字符串 'python'
    q.put('-') # 在队列中插入字符串 '-'
    q.put('100') # 在队列中插入字符串 '100'
    for i in range(4):  # 从队列中取数据，取出次数为4次，引发 queue.Empty 异常
        print(q.get(block=False))
except queue.Empty:
    print('queue.Empty')

python
-
100
queue.Empty


###### queue.Full

当队列数据元素容量达到上限时，继续往队列中放入数据会引发 queue.Empty 异常，主要是不正当使用 put() 和 put_nowait() 引起的

In [None]:
import queue
try:
    q = queue.Queue(3)  # 设置队列上限为3
    q.put('python')  # 在队列中插入字符串 'python'
    q.put('-') # 在队列中插入字符串 '-'
    q.put('100') # 在队列中插入字符串 '100'
    q.put('stay hungry, stay foolish', block=False)  # 队列已满，继续往队列中放入数据，引发 queue.Full 异常
except queue.Full:
    print('queue.Full')

##### queue对象基本使用方法

###### qsize()

返回队列中数据元素的个数

In [34]:
q.put('string')
print(q.qsize())

1


###### empty()

如果队列为空，返回 True，否则返回 False

In [35]:
q.get()
print(q.empty())

True


###### full()

如果队列中元素个数达到上限，返回 True，否则返回 False

In [38]:
q4 = queue.Queue(1)
q4.put('a')
print(q4.full())

True


###### put

put(item, block=True, timeout=None)

block,当队列中元素个数达到上限继续往里放数据时：如果 block=False，直接引发 queue.Full 异常；如果 block=True，且 timeout=None，则一直等待直到有数据出队列后可以放入数据；如果 block=True，且 timeout=N，N 为某一正整数时，则等待 N 秒，如果队列中还没有位置放入数据就引发 queue.Full 异常

timeout，设置超时时间

In [39]:
q5 = queue.Queue(1)
q5.put(1)
q5.put(2,block = True,timeout = 2)

Full: 

###### put_nowait()

put_nowait(item)

相当于 Queue.put(item, block=False)，当队列中元素个数达到上限继续往里放数据时直接引发 queue.Full 异常

In [40]:
q5.put_nowait(2)

Full: 

###### get

get(block=True, timeout=None)

block，当队列中没有数据元素继续取数据时：如果 block=False，直接引发 queue.Empty 异常；如果 block=True，且 timeout=None，则一直等待直到有数据入队列后可以取出数据；如果 block=True，且 timeout=N，N 为某一正整数时，则等待 N 秒，如果队列中还没有数据放入的话就引发 queue.Empty 异常

timeout，设置超时时间

In [41]:
q6 = queue.Queue()
q6.get(block=True,timeout=1)

Empty: 

###### get_nowait()

相当于 Queue.get(block=False)block，当队列中没有数据元素继续取数据时直接引发 queue.Empty 异常

In [42]:
q6.get_nowait()

Empty: 