In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [3]:
import os 
os.uname()
os.environ
os.environ.get('PS1');

## How `os.fork()` works?

In [6]:
a = '<span style="font-family:New York Times; font-size:1em; color:green;">'
b = '<span style="font-family:New York Times; font-size:1em; color:green;">'
b==a 

True

<span style="font-family:New York Times; font-size:1em; color:green;">
With the return value of fork() we can decide in which process we are: 0 means that we are in the child process while a positive return value means that we are in the parent process. A negative return value means that an error occurred while trying to fork. 
    
Unix/Linux操作系统提供了一个fork()系统调用，它非常特殊。普通的函数调用，调用一次，返回一次，但是fork()调用一次，返回两次，因为操作系统自动把当前进程（称为父进程）复制了一份（称为子进程），然后，分别在父进程和子进程内返回。子进程永远返回0，而父进程返回子进程的ID。这样做的理由是，一个父进程可以fork出很多子进程，所以，父进程要记下每个子进程的ID，而子进程只需要调用getppid()就可以拿到父进程的ID。

In [2]:
print('Process (%s) start...' % os.getpid())
print('Process (%s) start...' % os.getpid())

# Only works on Unix/Linux/Mac:
pid = os.fork() # Python的os模块封装了常见的系统调用，其中就包括fork，可以在Python程序中轻松创建子进程
print('pid = {} '.format( pid))
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
    #print('I (%s) just created a child process (%s).' % (os.getpid(), os.getpid()))

Process (1360) start...
Process (1360) start...
pid = 1395 
I (1360) just created a child process (1395).
pid = 0 
I am child process (1395) and my parent is 1360.


[Liao XueFeng](https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431927781401bb47ccf187b24c3b955157bb12c5882d000)

In [None]:
from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

In [11]:
def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)


[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK


Multithreading in Python 

将进程挂起(Suspend) 而非 阻塞(Block)

如果用sleep() 进程将阻塞

假设进程下有两个线程 那么这两个线程会继续运行

In [None]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

## 进程间通信
<span style="font-family:New York Times; font-size:1em; color:green;">
Process之间肯定是需要通信的，操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制，提供了Queue、Pipes等多种方式来交换数据。

我们以Queue为例，在父进程中创建两个子进程，一个往Queue里写数据，一个从Queue里读数据：

In [None]:
from multiprocessing import Process, Queue
import time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()

In [1]:
from threading import Thread
from queue import Queue
class WorkerThread(Thread):
    def __init__(self,*args,**kwargs):
        Thread.__init__(self,*args,**kwargs)
        self.input_queue=Queue()

    def send(self,item):
        self.input_queue.put(item)
    def close(self):
        self.input_queue.put(None)
        self.input_queue.join()
    def run(self):
        while True:
            item=self.input_queue.get()
            if item is None:
                break
            #实际开发中，此处应该使用有用的工作代替
            print(item)
            self.input_queue.task_done()
        #完成，指示收到和返回哨兵
        self.input_queue.task_done()
        return

if __name__=="__main__":
    w=WorkerThread()
    w.start()
    w.send("Mark")
    w.send("好")
    w.send("？")
    w.close()



Mark
好
？


## `concurrent.futures`

* [PYTHON: A quick introduction to the concurrent to the `concurrent.futures`](http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html)
* [ThreadPoolExecutor线程池](https://www.jianshu.com/p/b9b3d66aa0be)
* [https://realpython.com/python-concurrency/](https://realpython.com/python-concurrency/)
* [http://python.jobbole.com/87272/](http://python.jobbole.com/87272/)

### `ThreadPoolExecutor`

In [12]:
from concurrent.futures import ThreadPoolExecutor
from time import sleep

def return_after_5_secs(message):
    sleep(5)
    return message

pool = ThreadPoolExecutor(3)

future = pool.submit(return_after_5_secs, ("hello")) # submit tasks to the pool we constructed
print(future.done()) #  tells us if the future has resolved
sleep(5)
print(future.done())
print(future.result())

False
True
hello


In [3]:
from concurrent.futures import ThreadPoolExecutor
import time

# 参数times用来模拟网络请求的时间
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
# 通过submit函数提交执行的函数到线程池中，submit函数立即返回，不阻塞
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
# done方法用于判定某个任务是否完成
print(task1.done())
# cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
print(task2.cancel())
time.sleep(4)
print(task1.done())
# result方法可以获取task的执行结果
print(task1.result())

# 执行结果
# False  # 表明task1未执行完成
# False  # 表明task2取消失败，因为已经放入了线程池中
# get page 2s finished
# get page 3s finished
# True  # 由于在get page 3s finished之后才打印，所以此时task1必然完成了
# 3     # 得到task1的任务返回值

False
False
get page 2s finished
get page 3s finished
True
3


### `as_completed`

In [9]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# 参数times用来模拟网络请求的时间
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 并不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]

for future in as_completed(all_task):
    data = future.result()
    print("in main: get page {}s success".format(data))

# 执行结果
# get page 2s finished
# in main: get page 2s success
# get page 3s finished
# in main: get page 3s success
# get page 4s finished
# in main: get page 4s success

get page 2s finished
in main: get page 2s success
get page 3s finished
in main: get page 3s success
get page 4s finished
in main: get page 4s success


### `wait`

<span style="font-family:New York Times; font-size:1em; color:green;">wait方法接收3个参数，等待的任务序列、超时时间以及等待条件。等待条件return_when默认为ALL_COMPLETED，表明要等待所有的任务都结束。可以看到运行结果中，确实是所有任务都完成了，主线程才打印出main。等待条件还可以设置为FIRST_COMPLETED，表示第一个任务完成就停止等待。

In [4]:
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time

# 参数times用来模拟网络请求的时间
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 并不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)
print("main")
# 执行结果 
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main

get page 2s finished
get page 3s finished
get page 4s finished
main


In [8]:
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 并不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)
print("main")

get page 2s finished
main
