# 一、多进程
## 1. 多进程

In [1]:
import os

print('Process (%s) start...' % os.getpid())

pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Process (28361) start...
I (28361) just created a child process (28461).
I am child process (28461) and my parent is 28361


`getpid()`获取当前进程（线程）PID，`getppid()`获得父进程PID

## 2. multiprocessing
windows没有`fork`调用，可以使用`multiprocessing`

In [2]:
from multiprocessing import Process
import os

def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))
    
if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

Parent process 28361.
Child process will start.
Run child process test (28538)...
Child process end.


`join`方法可以等待子进程结束后再往下运行，通常用于进程间的同步

## 3. Pool
进程池批量创建子进程

In [3]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

    
if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

Parent process 28361.
Run task 2 (28593)...
Run task 1 (28592)...
Run task 0 (28591)...
Run task 3 (28594)...
Task 1 runs 0.00 seconds.
Run task 4 (28592)...
Waiting for all subprocesses done...
Task 0 runs 0.91 seconds.
Task 2 runs 1.32 seconds.
Task 4 runs 1.72 seconds.
Task 3 runs 2.11 seconds.
All subprocesses done.


调用`close`之后就不能添加新的进程了。`Pool`的默认大小是CPU的核数。

## 4. 子进程
很多时候我们还要控制子进程的输入和输出，这可以使用`subprocess`模块

In [7]:
import subprocess

print('$ nslookup www.baidu.org')
r = subprocess.call(['nslookup', 'www.baidu.org'])
print('Exit code: ', r)

$ nslookup www.baidu.org
Exit code:  0


In [9]:
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 加入选项，set q=mx
# python.org
# exit
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit Code: ', p.returncode)

$ nslookup
Server:		127.0.0.53
Address:	127.0.0.53#53

Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:


Exit Code:  0


## 5.进程通信
`Queue`、`Pipes`用来交换数据

In [10]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程，往进程q中写数据
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
        
# 读数据进程，从q中读数据
def read(q):
    print('Process to read %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
        
if __name__ == '__main__':
    # 父进程创建queue，并传递给各个子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    
    pw.start()
    
    pr.start()
    # 等待pw结束
    pw.join()
    pr.terminate()

Process to write: 28825
Put A to queue...
Process to read 28828
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.


# 二、多线程
使用`threading`模块来操作线程。启动一个线程就是把一个函数传入`Thread`实例，然后调用`start()`执行

In [11]:
import time, threading

def loop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)
    
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.


MainThread为默认的主线程，在程序运行的时候就会启动，`LoopThread`是我们自行创建的线程。

## 1. Lock
多线程中所有属于该进程的资源是共享的，因此，线程之间共享数据时容易发生危险。

In [13]:
import time, threading

# 假定这是你的银行存款:
balance = 0

def change_it(n):
    # 先存后取，结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(1000000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

5


可以看到，创建了两个线程，`balance`变量是两个线程共享的，而线程之间的调度由操作系统来完成，当循环次数过多的时候，由于两个线程t1，t2交替执行，会造成指令执行顺序发生变化，导致最终结果不为0. <br>
要确保结果正确，就要给`change_it`加上一把锁。

In [14]:
import time, threading

# 假定这是你的银行存款:
balance = 0
lock = threading.Lock()

def change_it(n):
    # 先存后取，结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(1000000):
        lock.acquire()
        try:
            change_it(n)
        finally:
            lock.release()

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

0


Python不能使用多线程来实现多核任务，只能通过多进程实现。
# 三、ThreadLocal

In [15]:
import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
    # 获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)


全局变量`local_school`就是一个`ThreadLocal`对象，每个`Thread`对它都可以读写`student`属性，但互不影响。你可以把`local_school`看成全局变量，但每个属性如`local_school.student`都是线程的局部变量，可以任意读写而互不干扰，也不用管理锁的问题，`ThreadLocal`内部会处理。

# 四、进程 VS. 线程
# 五、分布式进程


In [16]:
# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

Put task 2062...
Put task 2827...
Put task 6076...
Put task 7493...
Put task 7519...
Put task 1849...
Put task 1657...
Put task 1193...
Put task 6016...
Put task 6910...
Try get results...


Empty: 