## 进程间通信

- 多线程中的 Queue 不再能适用多进程

In [None]:
from multiprocessing import Process
from queue import Queue
import time

def producer(queue):
    queue.put("a")

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print("data = ", data)

if __name__ == "__main__":
    queue = Queue(2)
    my_producer = Process(target=producer, args=(queue, ))
    my_consumer = Process(target=consumer, args=(queue, ))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

- 多进程中，可以使用多进程中的 multiprocessing::Queue

In [10]:
from multiprocessing import Process, Queue
import time

def producer(queue):
    queue.put("a")

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print("data = ", data)

if __name__ == "__main__":
    queue = Queue(2)
    my_producer = Process(target=producer, args=(queue, ))
    my_consumer = Process(target=consumer, args=(queue, ))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

data =  a


- 共享全局变量通信，不适用于多进程编程

In [11]:
from multiprocessing import Process, Queue
import time

def producer(a):
    a += 1
    time.sleep(2)

def consumer(a):
    time.sleep(2)
    print("a = ", a)

if __name__ == "__main__":
    a = 1
    my_producer = Process(target=producer, args=(a, ))
    my_consumer = Process(target=consumer, args=(a, ))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

a =  1


- multiprocessing 中的 Queue 不能用于 pool 进程

In [12]:
from multiprocessing import Process, Queue, Pool
import time

def producer(queue):
    queue.put("a")

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print("data = ", data)

if __name__ == "__main__":
    queue = Queue(2)
    pool = Pool(2)

    pool.apply_async(producer, args=(queue, ))
    pool.apply_async(consumer, args=(queue, ))

    pool.close()
    pool.join()

-  pool 进程间的通信，可以使用 multiprocessing 中的 Manager 中的 Queue

In [13]:
from multiprocessing import Process, Pool, Manager
import time

def producer(queue):
    queue.put("a")

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print("data = ", data)

if __name__ == "__main__":
    queue = Manager().Queue(2)
    pool = Pool(2)

    pool.apply_async(producer, args=(queue, ))
    pool.apply_async(consumer, args=(queue, ))

    pool.close()
    pool.join()

data =  a


- 通过 pipe（管道） 实现进程间的通信
    - pipe 只能适用于两个进程
    - 性能比 queue 高，锁相对于 queue 更少

In [None]:
from multiprocessing import Process, Pipe
import time

def producer(queue):
    queue.send("a")
    time.sleep(1)

def consumer(queue):
    time.sleep(2)
    data = queue.recv()
    print("data = ", data)

if __name__ == "__main__":
    rec_pipe, send_pipe = Pipe()
    my_producer = Process(target=producer, args=(send_pipe, ))
    my_consumer = Process(target=consumer, args=(rec_pipe, ))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

- 进程间共享内存

In [None]:
from multiprocessing import Process, Manager
import time

def producer(p_dict):
    p_dict["a"] = "b"
    time.sleep(1)

def consumer(p_dict):
    time.sleep(2)
    data = p_dict["a"]
    print("data = ", data)

if __name__ == "__main__":
    progress_dict = Manager().dict()
    my_producer = Process(target=producer, args=(progress_dict, ))
    my_consumer = Process(target=consumer, args=(progress_dict, ))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
