In [None]:
import os
import time
from multiprocessing import Process


def sonDo(name):
    """进程要做的事情"""
    print("进程的名称：{0}，pid:{1}".format(name, os.getpid()))
    time.sleep(5)
    print("进程要做的事情")


# 使用类的方法
class MyProcess(Process):
    def __init__(self, name, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = name

    def run(self):
        print("进程的名称：{0}，pid:{1}".format(self.name, os.getpid()))  # 进程的名称：my process class，pid:5920
        time.sleep(5)
        print("进程要做的事情")


if __name__ == "__main__":
    # 创建一个进程
    # p = Process(target=sonDo, args=('my process',))
    p = MyProcess('my process class')
    # 启动进程
    p.start()
    # 挂起进程
    p.join()

In [None]:
# 进程之间的通讯
"""进程之间的通讯"""
import random
import time
from multiprocessing import Process, Queue


class WriteProcess(Process):
    """ 写的进程 """

    def __init__(self, q, *args, **kwargs):
        self.q = q
        super().__init__(*args, **kwargs)

    def run(self):
        """ 实现进程的业务逻辑 """
        ls = ["name", "is", "yanyue"]
        for line in ls:
            print("写入内容：{0}".format(line))
            self.q.put(line)
            # 每写入一次休息1-5s
            time.sleep(random.randint(1, 5))


class ReadProcess(Process):
    """读取内容进程"""

    def __init__(self, q, *args, **kwargs):
        self.q = q
        super().__init__(*args, **kwargs)

    def run(self):
        while True:
            content = self.q.get()
            print("读取到的内容：{0}".format(content))


if __name__ == "__main__":
    # 通过Queue共享数据
    q = Queue()
    # 写入内容的进程
    tWrite = WriteProcess(q)
    tWrite.start()
    # 读取进程启动
    tRead = ReadProcess(q)
    tRead.start()

    tWrite.join()
    # tRead.join()
    # 因为读的进程是死循环，无法等待其结束，只能够强制终止 terminate
    tRead.terminate()

In [None]:
# 进程中的锁
""" 多进程中的锁 """
import random
import time
from multiprocessing import Process, Lock


class WriteProcess(Process):
    """写入文件"""

    def __init__(self, fileName, num, lock, *args, **kwargs):
        self.fileName = fileName
        self.num = num
        # 锁对象
        self.lock = lock
        super().__init__(*args, **kwargs)

    def run(self):
        """写入文件的主要业务逻辑"""
        try:
            # 添加锁
            self.lock.acquire()
            for i in range(1, 5):
                content = "现在是：{0}：{1}-{2}\n".format(self.name, self.pid, self.num)
                with open(self.fileName, 'a+', encoding='utf-8') as fileObj:
                    fileObj.write(content)
                    time.sleep(random.randint(1, 5))
                    print(content)
        finally:
            # 释放锁
            self.lock.release()


if __name__ == "__main__":
    fileName = "test.txt"
    for x in range(1, 5):
        # 创建5个进程 -- 5个进程的执行是没有顺序的，希望一个进行执行逻辑函数完毕，再执行下一个进行，需要锁的处理
        # 创建一个锁的对象
        lock = Lock()
        p = WriteProcess(fileName, x, lock)
        p.start()

In [None]:
# 使用进程池
"""使用进程池"""
import random
import time
from multiprocessing import current_process, Pool
import os


def run(fileName, num):
    with open(fileName, 'a+', encoding='utf-8') as fileObj:
        # 当前的进程
        now_process = current_process()
        # 写入内容
        content = "{0}-{1}-{2}".format(now_process.name, now_process.pid, num)
        fileObj.write(content)
        fileObj.write('\n')
        # 写完之后任意休息1-5s
        time.sleep(random.randint(1, 5))
        print(content)
    return "OK"


if __name__ == "__main__":
    fileName = "process_pool.txt"
    print(os.cpu_count())  # 8 也就是说当前系统是一个8核的cpu
    # 创建2个进程的进程池
    pool = Pool(2)
    for i in range(20):
        # 同步apply添加任务
        # result = pool.apply(run, args=(fileName, i))
        # apply_async异步添加任务
        result = pool.apply_async(run, args=(fileName, i))
        print('{0}-{1}'.format(i, result))
    # 关闭线程池
    pool.close()
    pool.join()

# 结果显示。20个线程池，但是执行过程中的线程id一直是那2个，说明线程池中的线程是被复用看的