In [23]:
from multiprocessing import  Process, Pool

# 实现多进程的两种方式

In [None]:
def fun1(name):
    print('测试%s多进程' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  #开启5个子进程执行fun1函数
        p = Process(target=fun1,args=('Python',)) #实例化进程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('结束测试')

In [None]:
class MyProcess(Process): #继承Process类
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name = name

    def run(self):
        print('测试%s多进程' % self.name)


if __name__ == '__main__':
    process_list = []
    for i in range(5):  #开启5个子进程执行fun1函数
        p = MyProcess('Python') #实例化进程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('结束测试')

In [24]:

def show(num):
    print('num : ' + str(num))

if __name__=="__main__":
    pool = Pool(processes = 3)
    for i in range(6):
        # 维持执行的进程总数为processes，当一个进程执行完毕后会添加新的进程进去
        pool.apply_async(show, args=(i, ))
    print('======  apply_async  ======')
    pool.close()
    #调用join之前，先调用close函数，否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    pool.join()



KeyboardInterrupt: 

In [None]:
import multiprocessing as mp

def job(q):
    res = 0
    for i in range(1000):
        res += i+i**2+i**3
    q.put(res) # queue

if __name__ == '__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1+res2)

In [None]:
import multiprocessing as mp

def job(x):
    # 使用进程池的时候允许函数将结果返回
    return x*x


def multicore():
    # 定义一个进程池，并声明进程池中进程的个数
    pool = mp.Pool(processes=2)
    # 将任务放入进程池中, map函数会将job函数作用于range(10)中的每一个元素
    res = pool.map(job, range(10))
    print(res)
    # 使用异步的方式将任务放入进程池中，但一次性只会放入一个任务
    res = pool.apply_async(job, (2,))
    print(res.get())
    multi_res =[pool.apply_async(job, (i,)) for i in range(10)]
    print([res.get() for res in multi_res])

if __name__ == '__main__':
    multicore()

# 进程之间共享内存

In [None]:
# 声明一个可以被进程共享的全局变量 i 代表整数，  'd' 表示双精度浮点数， f代表float
value = mp.Value('i', 0)
# 声明一个可以被进程共享的全局数组，只能是一维数组
array = mp.Array('i', [1, 2, 3, 4, 5])

In [None]:
import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire()
    for _ in range(10):
        time.sleep(0.1)
        # 通过value.value拿到共享变量的值
        v.value += num
        print(v.value)
    l.release()
def multicore():
    # 声明一个进程锁对象
    l = mp.Lock()
    v = mp.Value('i', 0)
    # 需要把目标函数，参数，进程锁对象传入进程中
    p1 = mp.Process(target=job, args=(v, 1, l))
    p2 = mp.Process(target=job, args=(v, 3, l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
if __name__ == '__main__':
    multicore()

# 官方给的样例分析

In [None]:
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

In [9]:
import os
import time
import multiprocessing as mp
import PIL.Image as Image

img_base_path = "/datassd2/sswang/image_matching/data/isc_data/training_imgs/training"

img_list = os.listdir(img_base_path)
img_list = [os.path.join(img_base_path, img_name) for img_name in img_list]

In [None]:
"""
使用多进程进行数据增强的设计思路：
    1. 将数据增强函数封装维一个完整的函数，当多进程的target函数时，可以直接调用
    2. 将累加变量设置成为一个进程共享的全局变量，线程拿到累加变量之后先锁定，然后进行累加，最后解锁，之后再进行一次数据增强操作
    3. 进程之间需要共享数据，使用进程池的时候，可以使用进程池的map函数，将数据增强函数作用于数据集中的每一个元素

对多进程使用效果的评估
    1. 让程序IO训练集中的7W张图片，观察时间的差异
    2. 通过time.time()函数，计算数据IO的时间
"""

# def job(v, num, l):
#     l.acquire()
#     for _ in range(10):
#         time.sleep(0.1)
#         # 通过value.value拿到共享变量的值
#         v.value += num
#         print(v.value)
#     l.release()
# def multicore():
#     # 声明一个进程锁对象
#     l = mp.Lock()
#     v = mp.Value('i', 0)
#     # 需要把目标函数，参数，进程锁对象传入进程中
#     p1 = mp.Process(target=job, args=(v, 1, l))
#     p2 = mp.Process(target=job, args=(v, 3, l))
#     p1.start()
#     p2.start()
#     p1.join()
#     p2.join()
# if __name__ == '__main__':
#     multicore()





# 定义一个计时器
def timer(func):
    def func_wrapper(*args, **kwargs):
        time_start = time.time()
        result = func(*args, **kwargs)
        time_end = time.time()
        time_spend = time_end - time_start
        print('%s cost time: %.3f s' % (func.__name__, time_spend))
        return result
    return func_wrapper


# 读取单张图片
def img_IO(img_path):
    img = Image.open(img_path)
    img.close()


In [None]:

# 使用单线程读取所有图片
@timer
def img_IO_singleprocessing(img_list):
    counter = 0
    for img_path in img_list:
        img = Image.open(img_path)
        img.close()
        counter += 1
        
    return counter


if __name__ == '__main__':
    counter = img_IO_singleprocessing(img_list)
    print("一共读取了{}张图片".format(counter))


In [22]:
# 单个进程读取图片
def img_IO_with_ptocessing(img_path, counter, counter_lock):
    # print(counter.value)
    img = Image.open(img_path)
    img.close()
    # counter_lock.acquire()
    counter.value += 1
    # counter_lock.release()


# 使用多进程读取图片
@timer
def img_IO_multiprocessing(img_list, counter, counter_lock):
    # 定义一个进程池
    pool = mp.Pool(processes = 10)
    for i in range(70000):
        # pass
        pool.apply_async(img_IO_with_ptocessing, args=(img_list[i], counter, counter_lock))
    pool.close()
    pool.join()
    print("一共读取了{}张图片".format(counter.value))



if __name__ == '__main__':
    counter = mp.Value('i', 0)
    counter_lock = mp.Lock()
    img_IO_multiprocessing(img_list, counter, counter_lock)

KeyboardInterrupt: 

In [20]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
