# Fast Python For Beginners
___

## Multiprocessing

In [1]:
import os

In [2]:
print("Process (%s) start..." % os.getppid())
pid = os.fork()
if pid == 0:
    print("I am child process (%s) and my parent is %s" % (os.getpid(), os.getppid()))
else:
    print("I (%s) just created a child process (%s)" % (os.getpid(), pid))

Process (15227) start...
I (13835) just created a child process (13852)
I am child process (13852) and my parent is 13835


> 创建子进程时，只需要传入一个执行函数和函数的参数，创建一个Process实例，用start()方法启动，这样创建进程比fork()还要简单。

> join()方法可以等待子进程结束后再继续往下运行，通常用于进程间的同步。

In [3]:
from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))
    
if __name__ == '__main__':
    print("Parent process %s" % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print("Child process will start.")
    p.start()
    p.join()
    print("Child process end")

Parent process 23338
Child process will start.
Run child process test (23357)...
Child process end


### Pool
> 如果要启动大量的子进程，可以用进程池的方式批量创建子进程：

In [4]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print("Run child process %s (%s)" % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    
if __name__ == '__main__':
    print("Parent process %s." % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print("Waiting for subprocessing done...")
    p.close()
    p.join()
    print("All subprocessing done ")

Parent process 23338.
Run child process 0 (23361)
Run child process 2 (23362)
Run child process 3 (23363)
Run child process 1 (23360)
Waiting for subprocessing done...
Task 2 runs 0.73 seconds.
Run child process 4 (23362)
Task 4 runs 0.70 seconds.
Task 0 runs 1.52 seconds.
Task 1 runs 1.98 seconds.
Task 3 runs 2.98 seconds.
All subprocessing done 


### Subprocessing

In [5]:
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

$ nslookup www.python.org
Exit code: 0


> 如果子进程还需要输入，则可以通过communicate()方法输入：

In [6]:
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

$ nslookup
Server:		10.53.216.182
Address:	10.53.216.182#53

Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:


Exit code: 0


### 进程间通信
> Python的multiprocessing模块包装了底层的机制，提供了Queue、Pipes等多种方式来交换数据。

In [7]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()

Process to write: 23401
Put A to queue...
Process to read: 23402
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.


### 分布式进程
> Python的multiprocessing模块不但支持多进程，其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者，将任务分布到其他多个进程中，依靠网络通信。

In [None]:
import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列
task_queue = queue.Queue()
# 接收结果的队列
result_queue = queue.Queue()

class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 8888), authkey=b'abc')

# 启动Queue：
manager.start()

# 获得通过网络访问的Queue对象：
task = manager.get_task_queue()
result = manager.get_result_queue()

# 放几个任务进去：
for i in range(10):
    n = random.randint(0, 100000)
    print("Put task %d..." % n)
    task.put(n)
    
# 从result队列读取结果：
print('Try get result...')
for i in range(10):
    r = result.get(timeout=10)
    print('Redult,%s:' % r)

# 关闭
manager.shutdown()
print('master exit')


### Shared Memory
> 可以通过使用Value数据存储在一个共享的内存表中

In [9]:
import multiprocessing as mp

value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14)

> Array类，可以和共享内存交互，来实现在进程之间共享数据 ,这里的Array和numpy中的不同，它只能是一维的，不能是多维的。

In [10]:
array = mp.Array('i', [1, 2, 3, 4])

In [11]:
import multiprocessing as mp
import time

l = mp.Lock()

def job(v, num):
    l.acquire()
    try:
        for _ in range(10):
            time.sleep(0.1)
            v.value += num
            print(v.value)
            
    finally:
        l.release()
        
def multicore():
    v = mp.Value('i', 0)
    p1 = mp.Process(target=job, args=(v, 1))
    p2 = mp.Process(target=job, args=(v, 3))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("End")
    
if __name__ == '__main__':
    multicore()

1
2
3
4
5
6
7
8
9
10
13
16
19
22
25
28
31
34
37
40
End
