# 多进程和多线程
- 程序：一堆代码以文本形式存入一个文档
- 进程：程序运行的一个状态，每一个进程提供执行程序所需要的所有资源（进程本质是资源的集合）
    
    - 包含地址空间，可执行的代码，操作系统的接口，安全的上下文（记录保存启动该进程的用户和权限），唯一的进程ID，内存空间，数据栈等，还有至少有一个线程。
    - 每一个进程启动时都会最先产生一个线程，即主线程，然后主线程再创建其他的子线程。
    - 每个进程有自己完全独立的运行环境，多进程数据不共享，除非采用一定的方法
- 线程：操作系统能够进行运算调度的最小单位，被包含在进程中，是进程的实际运作单位
    - 一个线程指的是进程中一个单一顺序的控制流，一个进程中可有多个并发线程
    - 一个线程就是一个执行上下文
- 进程与线程的区别：
    - 同一个进程中的线程共享同一内存空间，进程之间相互独立
    - 同一个进程中的所有线程的数据是共享的（进程通讯），进程之间的数据是独立的
    - 对主线程的修改可能会影响其他线程的行为，但是父进程的修改（除非删除）不会影响子进程
    - 线程是一个上下文的执行指令，进程则是运算相关的资源集合
    - 同一个进程的线程之间可以直接通信，但是进程之间的交流需要借助中间代理来实现
    - 创建新的线程很容易，但是创建新的进程需要对父进程做一次复制
    - 一个线程可以操作同一进程的其他线程，但是进程只能操作其子进程
    - 线程启动速度快，进程启动速度慢（但是两者的运行速度没有可比性）
    

### 理解多任务——理解操作系统的CPU上下文
            CPU一个时间段只能运行单个任务，只不过在很短的时间内，CPU快速切换到不同的任务执行，造成一种多任务同时执行的错觉
            而CPU在切换到其他任务执行之前，为了确保切换任务之后还能够继续切换到原来的任务执行，并且看起来是一种连续的状态，就必须将任务的状态保持起来，以便恢复上个任务的执行状态，而该状态保存在CPU的寄存器和程序计数器中。寄存器：CPU内置的容量小，但速度极快的内存，用来保存程序的堆栈信息即数据段信息。程序计数器：保存程序的下一条指令的位置即代码段信息。
            所以，CPU上下文就是指CPU寄存器和程序计数器中保存的任务状态信息，CPU上下文切换就是把前一个任务的CPU上下文保存起来，然后加载下一个任务的上下文到这些寄存器和程序计数器，再跳转到程序计数器所指定的位置运行程序。
- 多任务的执行方法
    - Python程序默认都是执行单任务的程序，也就是只有一个线程，若要执行多任务，办法有以下两种：
        - 第一种：启动多个进程，每个进程只有一个线程，但多个进程可以一块执行多个任务
        - 第二种：启动一个进程，在这个进程内启动多个线程
        - 第三种：启动多个进程，每个进程再启动多个线程。此方法目前不常用
        
# 全局解释器锁（GIL）
- 上面说到，Python只能默认执行单任务，究其原因就是全局解释器的存在
- 其全称为Global Interpreter Lock(全局解释器锁）
            设计之初是为了数据安全所做的决定：某个线程想要执行，必须拿到GIL,否则就不允许进入CPU执行。所以可以把它看成是通行证，在一个进程中，GIL只有一个、只有cpython中有GIL，pypy和jpython中没有它。
- python代码的执行时由Python虚拟机进行控制
- 在主循环中只能够有一个控制线程在运行


# 多线程
- Python多线程的工作过程
    - Python在使用多线程的时候，调用的是C语言的原生线程
        - 1.拿到共有数据
        - 2.申请GIL
        - 3.Python解释器调用os原生线程
        - 4.os操作CPU执行运算
        - 5.当该线程执行时间到后，无论运算是否执行完，GIL都内要求释放
        - 6.GIL被另一个线程拿到，重复上述操作
        - 7.等到其他线程执行完毕，又会切换到之前的线程（根据其保存的执行上下文继续执行。Python3中由程序计数器改成程序计时器，时间一到，GIL就释放
        
# Python实现多线程的方法
- python2中，采用thread方法，Python3中_thread，threading在两者中通用
### 创建线程常用的两种方法
- threading
    - 直接利用threading.Thread生成Thread实例
        - t = threading.Thread(target=func_name, args=元组），元组内容是前面函数所要输入的参数。注意，只有一个参数时末尾要加逗号如：args = (age,)
- 继承threading.Thread,其本质是重写Thread类中的run函数以及__init__方法
### threading.Thread()提供的线程对象方法和属性
- start()：创建线程后通过start启动线程，等待CPU调度，为run函数执行做准备；
- ident：整数类型的线程标识符，线程开始执行前（调用start之前）为None；
- isAlive()、is_alive()：测试线程是否处于活跃状态；
- daemon、isDaemon()&setDaemon()：守护线程相关；
    - 守护线程的含义：守护线程会在主线程结束的时候一起结束，无论它有没有执行完毕
    - t.dameon = True 和 t.setDaemon(True)都是将t设置为守护线程，必须在start()之
    前执行
    - 不适用于IDLE环境中的交互模式或者脚本运行模式,如jupyternotebook
- setName()：为线程设置名称
- getName()：获取线程名称

- join([join])：阻塞当前线程，等待被调线程结束或超时后（即让调用了join方法的线程必须同时执行完或超时后才能继续执行之后的代码）再继续执行当前线程的后续代码，参数 timeout 用来指定最长等待时间，单位为秒。
- run()：线程被cpu调度后自动执行线程对象的run方法，如果想自定义线程类，直接重写run方法就行了


### threading提供的方法和属性
- active_count(),activeCount()返回活跃的线程对象数量 
- current_thread(), currentThread():返回当前的Thread对象
- get_ident()：返回当前线程的线程标识符，线程标识符是一个非负整数，并无特殊含义，只是用来标识线程，该整数可能会被循环利用
- enumerate():返回当前活跃状态的所有线程对象列表
- stack_size([size]):返回创建线程时使用的栈的大小，如果指定size参数，则用啊指定后续创建的线程使用的栈的大小size必须是0或者大于大于32000的正整数
- main_thread() 返回主线程对象，即启动Python解释器的线程对象


# 多线程实现同步的方法:锁，条件变量，信号量，事件类

## 线程锁
- 在一个进程中，线程间可以共享向系统申请的内存空间，当同一个资源被多个线程竞争使用时，例如线程共享进程的变量，其就有可能被任何一个线程修改。所以对这种共享资源的访问就需要加上锁来保护，以免该资源对象被改的乱七八糟。


### 互斥锁
- 为了防止上面情况的发生，就出现了互斥锁（LOCK)
- 互斥锁只允许一个线程更改数据
    - lock = threading.Lock():产生一把锁
        - lock.acquire():获取锁
        - lock.release():释放锁
            Lock是 比较低级的同步原语，当被锁定以后不属于特定的线程。一个锁有两种状态: locked和unlocked。 如果锁处于unclocked状态，acquire(方 法将其修改为locked并立即返回;如果锁已处于locked状态，则阻塞当前线程并等待其他线程释放锁然后将其修改为locked并立即返回，或等待一定的时间后返回但不修改锁的状态。release()方法将锁状态由locked修改为unlocked并立即返回，如果锁状态本来已经是unlocked,调用该方法将会抛出异常。
- 线程安全问题
    - 如果一个资源/变量，它对于多线程来说，不用枷锁也不会引起任何问题，则称为线程安全
    - 线程不安全的变量类型：list,set,dict
    - 线程安全变量类型：queue
    
    
### 可重入锁，也称递归锁
- 一个锁，可以被一个线程多次申请
- 主要解决递归调用的时候，需要申请锁的情况
- 与LOCK的使用方法一样，但他支持嵌套，在多个锁没有释放的时候一般会使用RLOCK类
    - lock = threading.RLock()
            RLock与Lock的主要区别：在同一线程内，对RLock进行多次acquire()操作，程序不会阻塞。也就是说，在一个线程内，可以执行多个lock.acquire()，同样当我们想要解除阻塞的时候需要执行同样个数的lock.release()才可以。
            
            
### 信号量（Semaphore和BoundedSemaphore）
#### threading.Semaphore(num)普通信号量
- 控制能够并发执行的线程数，超出的线程先阻塞，直到前面有线程运行完毕再进去执行
    - Semaphore管理一个内置计数器，每当调用acquire（）时内置计数器-1，调用release（）时+1。计数器不能小于0，当计数器为0时，acquire（）将阻塞线程直到其他线程调用release
    - 限制一个时间点内进程的数量为num，保证如果在主机执行IO密集型任务的时候再执行这种类型的程序时，不会出现计算机宕机情况的发生
    - 普通信号量可以无限制释放,记住请求信号量一定要小于等于释放的信号量，否则程序可能无法正常运行
    - semaphore = threading.Semaphore(num)
    - semaphore.acquire()
    - semaphore.release()
    

### threading.BoundedSemaphore(num)限制信号量
- 和普通的信号量一样，有限的信号量内部维护一个计数器，该计数器=initialValue+release-acquire。当 计数器的值为0时，acquire方法调用会被阻塞。计数器初始值为1。
- 限制信号量只能被释放num次,也就是说信号量不能通过释放来大于初始设定值num

In [1]:
# 没有设置多线程时的运行代码
import time

def loop1():
    start_time = time.time()
    print("Start loop 1 at :", time.ctime(start_time))
    time.sleep(4)
    end_time = time.time()
    print("End loop 1 at:", time.ctime(end_time))
    print("We have spent %.5f seconds"%(end_time - start_time))

def loop2():
    start_time = time.time()
    print("Start loop 2 at :", time.ctime(start_time))
    time.sleep(2)
    end_time = time.time()
    print("End loop 2 at:", time.ctime(end_time))
    print("We have spent %.5f seconds"%(end_time - start_time))

    
def main():
    print("Starting at:", time.ctime())
    loop1()
    loop2()
    print("All done at:", time.ctime())
    
if __name__ == '__main__':
    main()

Starting at: Wed Sep  9 21:45:20 2020
Start loop 1 at : Wed Sep  9 21:45:20 2020
End loop 1 at: Wed Sep  9 21:45:24 2020
We have spent 4.00088 seconds
Start loop 2 at : Wed Sep  9 21:45:24 2020
End loop 2 at: Wed Sep  9 21:45:26 2020
We have spent 2.00002 seconds
All done at: Wed Sep  9 21:45:26 2020


In [1]:
# _thread案例，_thread.start_new_thread新建一个线程
import time
import _thread as thread

def loop1():
    start_time = time.time()
    print("Start loop 1 at :", time.ctime(start_time))
    time.sleep(4)
    end_time = time.time()
    print("End loop 1 at:", time.ctime(end_time))
    print("We have spent %.5f seconds"%(end_time - start_time))

def loop2():
    start_time = time.time()
    print("Start loop 2 at :", time.ctime(start_time))
    time.sleep(2)
    end_time = time.time()
    print("End loop 2 at:", time.ctime(end_time))
    print("We have spent %.5f seconds"%(end_time - start_time))

    
def main():
    t1 = time.time()
    print("Starting at:", time.ctime())
    thread.start_new_thread(loop1, ())
    print("----------")
    thread.start_new_thread(loop2, ())
    t2 = time.time()
    print("All done spent {}:", time.ctime())
    
if __name__ == '__main__':
    main()
    
# 注意打印结果，按道理来说主线程运行完毕，子线程就结束不再运行，所以这里的结果又问题
# 使用pycharm

Starting at: Sun Sep 27 21:10:51 2020
----------
All done spent {}: Sun Sep 27 21:10:51 2020
Start loop 1 at : Sun Sep 27 21:10:51 2020
Start loop 2 at : Sun Sep 27 21:10:51 2020
End loop 2 at: Sun Sep 27 21:10:53 2020
We have spent 2.00374 seconds
End loop 1 at: Sun Sep 27 21:10:55 2020
We have spent 4.00450 seconds


In [None]:
# threading.Thread创建多线程
import time
from threading import Thread

def loop1(num):
    start_time = time.time()
    print("Start loop 1 at :", time.ctime(start_time))
    time.sleep(4)
    print("hihiahihia {0}".format(num))
    end_time = time.time()
    print("We have spent %.5f seconds"%(end_time - start_time))

def loop2(num):
    start_time = time.time()
    print("Start loop 2 at :", time.ctime(start_time))
    time.sleep(2)
    print("huhahuhahuha {0}".format(num))
    end_time = time.time()
    print("We have spent %.5f seconds"%(end_time - start_time))

    
def main():
    sta = time.time()
    print("Starting at:", time.ctime())
    t1 = Thread(target=loop1, args=("pa",))
    t1.start()
    
    t2 = Thread(target=loop2, args=("pia",))    
    t2.start()
    t1.join()
    t2.join()
    end = time.time()
    print("All done at:", (end - sta))
    
if __name__ == '__main__':
    main()

Start loop 2 at : Wed Sep  9 21:45:26 2020
Starting at: Wed Sep  9 21:45:26 2020
Start loop 1 at : Wed Sep  9 21:45:26 2020
Start loop 2 at : Wed Sep  9 21:45:26 2020
End loop 2 at: Wed Sep  9 21:45:28 2020
We have spent 2.00585 seconds
huhahuhahuha pia
We have spent 2.00528 seconds


In [None]:
# 守护线程案例
from threading import Thread
import time
def fun():
    print("Start fun")
    time.sleep(2)
    print("End fun")
    
if __name__  == "__main__":
    print("Main thread")
    t1 = Thread(target=fun, args=())
    t1.setDaemon(True)
    # t1.daemon = True和上面功能一样
    t1.start()
    time.sleep(1)
    print("Main thread end")

In [None]:
# 非守护线程案例
from threading import Thread
import time
def fun():
    print("Start fun")
    time.sleep(2)
    print("End fun")
    
if __name__  == "__main__":
    print("Main thread")
    t1 = Thread(target=fun, args=())
    t1.start()
    time.sleep(1)
    print("Main thread end")
    # 得到的结果和上面一样，是由于daemon在jupyter中不起作用

In [None]:
# threading.Semaphore(num),num默认值为1
import threading
import time

def fun(semaphore, num):
    # 获得信号量，信号量减一
    """#semaphore.acquire():若加上该语句，则表明信号量再减一，
    而后面又没有释放该信号量，所以后面的线程会一直等待该信号量释放然后拿到它去执行
    ，一直等一直等但他永远不会释放，所有后面的线程就永远不可能执行，所以打印结果只有
    thread 1 is running"""
    semaphore.acquire()
    print("Thread %d is running." % num)
    time.sleep(3)
    # 释放信号量，信号量加一
    semaphore.release()



if __name__ == '__main__':
    # 初始化信号量，数量为2
    semaphore = threading.Semaphore(2)

    # 运行4个线程
    for num in range(6):
        t = threading.Thread(target=fun, args=(semaphore, num))
        t.start()


In [None]:
# threading.BoundedSemaphore(num)

import threading
import time


def fun(semaphore, num):
    # 获得信号量，信号量减一
    semaphore.acquire()
    print("Thread %d is running." % num)
    time.sleep(3)
    # 释放信号量，信号量加一
    semaphore.release()
    # 再次释放信号量，信号量加一，这是超过限定的信号量数目，这时会报错ValueError: Semaphore released too many times
    #semaphore.release()


if __name__ == '__main__':
    # 初始化信号量，数量为2，最多有2个线程获得信号量，信号量不能通过释放而大于2
    semaphore = threading.BoundedSemaphore(3)

    # 运行4个线程
    for num in range(4):
        t = threading.Thread(target=fun, args=(semaphore, num))
        t.start()


In [None]:
# 创建线程的第二种方法：继承threading.Thread()

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, arg):
        super(MyThread, self).__init__()
        self.arg = arg
    # 必须重写run函数
    def run(self):
        time.sleep(2)
        print("the args for this class is {0}".format(self.arg))

print(time.ctime())
for i in range(4):
    t = MyThread(i)
    t.start()
    t.join()
print("main thread is done!!!!!!!!!")
print(time.ctime())

In [None]:
# threading.Thread()的工业写法
import threading
import time

loop = [4,2]
class ThreadFunc():
    def __init__(self, name):
        self.name = name
    def loop(self, nloop, nsec):
        """
        param nloop:loop函数的名称
        param nsec:系统休眠时间
        """
        print("Start loop:", nloop, 'at', time.ctime())
        time.sleep(nsec)
    
def main():
    print("Starting at:", time.ctime())
    # ThreadingFunc("loop").loop与 t = ThreadFunc("loop") 再 t.loop相同
    # 传入的是类中的函数
    t = ThreadFunc("loop")
    t1 = threading.Thread(target = t.loop, args=("LOOP one", 3))
    # 下面这种写法更加工业
    t2 = threading.Thread(target = ThreadFunc('loop').loop, args=("LOOP two", 1))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("All done")
    
if __name__ == "__main__":
    main()

In [None]:
# threading.Lock():给变量上锁，保护它被使用的唯一性
import time
import threading
sum = 0
loopsum = 1000000
lock = threading.Lock()
def myAdd():
    global sum, loopsum
    for i in range(1, loopsum):
        lock.acquire()
        sum += 1
        lock.release()
def myMinu():
    global sum, loopsum
    for i in range(1, loopsum):
        lock.acquire()
        sum -= 1
        lock.release()


if __name__ == "__main__":
    print("start at:", time.ctime())
    print("starting from {}".format(sum))
    t1 = threading.Thread(target=myAdd, args=())
    t2 = threading.Thread(target=myMinu, args=())
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Done to {}".format(sum))
    print("end at ", time.ctime())

In [None]:
# 可重入锁（递归锁）示例---RLock
import threading, time

class MyThread(threading.Thread):
    def run(self):
        global num
        time.sleep(1)
        if mutex.acquire(timeout=1):
            num += 1
            msg = self.name + 'set num to ' + str(num)
            print(msg)
            mutex.acquire()
            mutex.release()
            mutex.release()
            
num = 0
mutex = threading.RLock() # 若该处为 mutex = threading.Lock(),则就会报错

def testing():
    for i in range(5):
        t = MyThread()
        t.start()
        
if __name__ == "__main__":
    testing()

# 队列类型
## 栈（Stack）：后进先出（LIFO：last in, first out）的数据结构
### Python中后进先出队列：queue.LifoQueue()类
- 栈：又名堆栈，是一种运算受限的线性表
        其限制是仅允许在表的一端进行插入和删除运算，这一端被称为栈顶，另一端被称为栈底。向一个栈插入新的元素称为进栈、入栈或压栈。它是把新的元素放到栈顶元素的上面，使之成为新的栈顶元素。从一个栈删除元素又称作出栈或者退栈，它是先从栈顶元素开始删除，而第二层的元素成为新的栈顶元素
- 栈的非正式定义如下：
            栈是一个满足如下条件的数据结构。

            （1）数据项一个叠一个呈线性排列；

            （2）所有的插入与删除操作在一端进行；

            （3）最后插入的记录是第一个被删除的记录；即后进先出

            （4）栈的基本操作有三个：

            ①入栈，压入一个元素或记录到栈顶；

            ②出栈，将栈顶元素或记录移除；

            ③读栈，取出栈顶元素或记录。
            


            
## 队列（Queue）：先进先出（FIFO:first in, first out）的数据结构
### python中先进先出队列：queue.Queue()类
- 队列是一种受限制的线性表，特殊之处在于它只允许在表的前端进行删除操作，在表的后端进行插入操作。进行插入操作的端称为队尾，进行删除操作的端称为队头。
- 队列是满足如下条件的数据结构

        （1）数据项一个挨着一个呈线性；

        （2）所有的插入操作在一端进行（对头）；

        （3）所有删除操作在另一端进行（对尾）；

        （4）最先插入的记录是第一个被删除的记录；即先进先出

        （5）队列的根本操作有三个：

        ①入队，在队尾加入一个元素或记录；

        ②出队，将队头元素或记录移除；

        ③读队，取出队头元素或记录。
        

        
   
# 优先级队列(priority)
- 优先级队列是一种容器型数据结构，它能管理一队记录，并按照排序字段（例如一个数字类型的权重值）为其排序。由于是排序的，所以在优先级队列中你可以快速获取到最大的和最小的值。
            优先级队列依据优先级来获取下一个记录，而优先级取决于排序字段的值，经常用来解决调度问题，比如给更紧急的任务更高的优先级。
            
## 实现优先级队列的方法
### 手动维护排序列表
- sorted()函数：
- 使用排序列表你可以快速地获取或删除最大的或者最小的元素，缺点是向列表中插入元素是一个很慢的操作，复杂度在O(n)。

### heapa模块
- heapq是一个二叉堆的实现，它内部使用内置的list对象，它无论插入还是获取最小元素复杂度都在O(log n)。
- 还需要添加很多额外的代码来保证顺序以及其它必不可少的功能

### queue.PriorityQueue类
- 这个优先级队列内部使用了heapq，所以它的时间复杂度和heapq是相同的。
- 不同的是PriorityQueue的操作是同步的，提供锁操作，支持并发的生产者和消费者
- 通常来说它的基于类接口要比heapq的基于函数的接口更友好。

# Queue、LifoQueue、PriorityQueue可用方法
- put(block=True, timeout=None)：block用于设置是否阻塞默认为True，timeout用于设置阻塞时的等待时长
    - 该方法则最多阻塞timeout秒并抛出Full异常。如果block是False并且队列满，则直接抛出Full异常（这时timeout将被忽略）
- q.qsize() 返回队列的大小
- q.empty() 如果队列为空，返回True,反之False
- q.full() 如果队列满了，返回True,反之False
- q.full 与 maxsize 大小对应
- q.get(item, block=True, timeout=None) 从队列中移除这个值
    - 如果队列为空的话，且blocking = False 则直接报 empty异常。如果blocking = True，就是等一会，timeout必须为 0 或正数。为None则一直等下去，为0不等待，正数n为等待n秒还不能读取，报empty异常。
- q.get_nowait() 相当q.get(False)：没有等待时间，一旦队列为空会立即抛出异常
- 非阻塞 q.put(item) 写入队列，timeout等待时间
- q.put_nowait(item) 相当q.put(item, False)，没有等待时间，一旦队列为满无法加入会立即抛出异常
- q.task_done() 在完成一项工作之后，q.task_done() 函数向任务已经完成的队列发送一个信号
- q.join() 实际上意味着等到队列为空，再执行别的操作

# 生产者消费者模型（主要用于解耦）

            在多线程开发当中，如果生产线程处理速度很快，而消费线程处理速度很慢，那么生产线程就必须等待消费线程处理完，才能继续生产数据。同样的道理，如果消费线程的处理能力大于生产线程，那么消费线程就必须等待生产线程。为了解决这个问题于是引入了生产者和消费者模式
            生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯，而通过阻塞队列来进行通讯，所以生产者生产完数据之后不用等待消费者处理，直接扔给阻塞队列，消费者不找生产者要数据，而是直接从阻塞队列里取，阻塞队列就相当于一个缓冲区，平衡了生产者和消费者的处理能力。

In [None]:
from collections import deque # 先进先出
import queue

q = queue.Queue(maxsize=10)
for i in range(10):
    q.put(i)
q.get()
q.get()
print(q.full())
print("the size is :", q.qsize())
q.put("haha")
while not q.empty():
    print(q.get())

In [None]:
from collections import deque # 先进先出
import queue

q = queue.Queue(maxsize=10)
for i in range(10):
    q.put(i)
q.get()
q.get()
q.get()
q.get()
q.get()
q.get()
q.get()
q.get()
q.get()
q.get()

print(q.full())
print("the size is :", q.qsize())
q.put("haha")
while not q.empty():
    print(q.get())



In [None]:
# 生产者消费者模型举例
import threading
import time 
import queue

def producer():
    count = 1
    while True:
        q.put("No.%d"%count)
        print("Producer put No.%d"%count)
        time.sleep(0.5)
        for i in range(10):
            count += i
        
def consumer(name):
    while True:
        print("%s get %s"% (name, q.get()))
        time.sleep(1.5)
        
q = queue.Queue(maxsize=5)
p = threading.Thread(target=producer, args=())
c = threading.Thread(target=consumer, args=("Tom", ))
#p.start()
#c.start()
# 这段代码停不下来

In [None]:
import queue
import time
class Producer(threading.Thread):
    def run(self):
        global q
        count = 0
        while True:
            if q.qsize() < 1000:
                for i in range(100):
                    count += 1
                    msg = "Producing" + str(count) + "product"
                    q.put(msg)
                    print(msg)
            time.sleep(0.5)
            
class Consumer(threading.Thread):
    def run(self):
        global q
        while True:
            if q.qsize() > 100:
                for i in range(3):
                    msg = self.name + "consums" + q.get()
                    print(msg)
            time.sleep(1)

if __name__ == "__main__":
    q = queue.Queue()
    for i in range(500):
        q.put("初始产品：" + str(i))
    for i in range(2):
        p = Producer()
      """  p.start()
    for i in range(5):
        c = Consumer()
        c.start()
    print(threading.active_count())"""

# 多线程死锁问题
- 死锁的一个原因是互斥锁。两个线程互相等待对方的锁，互相占用着资源不放。归根结底，还是由于线程同时获取多个锁造成的。
- 比如：一个线程获取了第一个锁，然后在获取第二个锁的时候发生阻塞，那么这个线程就可能阻塞其他线程的执行，从而导致整个程序假死。
- 死锁是每一个多线程程序都会面临的一个问题。根据经验来讲，尽可能保证每一个线程只能同时保持一个锁，这样程序就不会被死锁。
- 解决死锁的一种方法是：在进程获取锁的时候严格按照对象id升序排列获取
- 解决死锁问题没有什么太优雅的办法，凭借经验


In [None]:
# 死锁实例
import threading,time
lock_1 = threading.Lock()
lock_2 = threading.Lock()

def func_1():
    print("func 1 starting..............")
    lock_1.acquire()
    print("func_1 申请了lock_1....")
    time.sleep(2)#延长程序时间，保证可以产生死锁
    print("func 1 等待 lock_2......")
    lock_2.acquire()
    print("func 1 申请了 lock_2.......")
    lock_2.release()
    print("func_1 释放了 lock_2......")
    lock_1.release()
    print("func_1 释放了 lock_1......")
    
def func_2():
    print("func 2 starting..............")
    lock_2.acquire()
    print("func_2 申请了lock_2....")
    time.sleep(4)#延长程序时间，保证可以产生死锁
    print("func 2 等待 lock_1......")
    lock_1.acquire()
    print("func 2 申请了 lock_1.......")
    lock_1.release()
    print("func_2 释放了 lock_1......")
    lock_2.release()
    print("func_2 释放了 lock_2......")

    
t1 = threading.Thread(target=func_1, args=())
t2 = threading.Thread(target=func_2, args=())
t1.start()
t2.start()
print("All done")

# 多线程事件（Event类）：同步条件
- threading.Event()
- 同步：当线程在系统中运行时，线程的调度具有一定的透明性，通常程序无法准确控制线程的轮换执行。因此，如果有需要，可通过线程通信来保证线程协调运行，即达到线程同步的目的。
- 实现同步的方法：Event
- Event：是线程间通信机制之一：一个线程发送一个event信号，其他的线程则等待这个信号。常用在一个线程需要根据另外一个线程的状态来确定自己的下一步操作的情况。
- Event原理：
    - 通过生成一个“事件”对象（可以看成是一个传令兵）来管理内部的一个标志，传令兵通过set()方法将标志设置为True，并使用clear()方法将标志设置为False。wait（）方法就是号令所有线程等待，即阻塞，直到标志变成True（标志要靠set方法将其变为True），才继续进行下面的代码。该标志初始为False
    

### Event类常用方法：
    - is_set()：当且仅当内部标志为True时返回True。
    - set()：将内部标志设置为True。所有等待它成为True的线程都被唤醒。当标志保持在True的状态时，线程调用wait()是不会阻塞的（好比传令兵发信号要成员去攻打敌人，即set将标志设置为True，如果没有把该信号清除，然后只是大叫要成员等待，那么成员是不会停止的，他们只认信号。除非用clear()重置，wait才能生效）。
    - clear()：将内部标志重置为False。随后，调用wait()的线程将阻塞，直到另一个线程调用set()将内部标志重新设置为True。
    - wait(timeout=None)：阻塞直到内部标志变成真。如果内部标志在wait()方法调用时为True，则立即返回。否则，则阻塞，直到另一个线程调用set()将标志设置为True，或发生超时。该方法总是返回True，除非设置了timeout并发生超时。

In [None]:
# Event类示例:直接调用threading.Thread
import threading, time

event = threading.Event()
def chihuoguo(name):
    print("{} 已经启动".format(threading.current_thread().getName()))
    print("小伙伴 %s 已经进入就餐状态" % name)
    time.sleep(1)
    event.wait()
    print("%s 收到通知了"% threading.current_thread().getName())
    print("小伙伴 {} 开始动筷子了".format(name))
    
t1 = threading.Thread(target=chihuoguo, args=('xiaoming', ))
t2 = threading.Thread(target=chihuoguo, args=('xiaobai', ))
t1.start()
t2.start()                                  
time.sleep(1)
print("主线程告诉大家可以开始吃喽")
event.set()

In [None]:
# Event类示例:继承threading.Thread
event = threading.Event()

def chihuoguo(name):
    print("{} 已经启动".format(threading.current_thread().getName()))
    print("小伙伴 %s 已经进入就餐状态" % name)
    time.sleep(1)
    event.wait()
    print("%s 收到通知了"% threading.current_thread().getName())
    print("小伙伴 {} 开始动筷子了".format(name))
    
    
class Myhuoguo(threading.Thread):
    def __init__(self,name):
        threading.Thread.__init__(self)
        self.people = name
    
    def run(self):
        chihuoguo(self.people)
        print("结束线程：{}".format(threading.current_thread().getName()))
        
for i in ['xb', 'xh', 'xc']:
    t = Myhuoguo(i)
    t.start()
time.sleep(0.2)
print("都到齐了，大家开始吃吧")
event.set()

In [None]:
# Event类wait中timeout超时案例
import threading, time

event = threading.Event()
def chihuoguo(name):
    print("{} 已经启动".format(threading.current_thread().getName()))
    print("小伙伴 %s 已经进入就餐状态" % name)
    time.sleep(1)
    event.wait(timeout=0.1)
    print("%s 收到通知了"% threading.current_thread().getName())
    print("小伙伴 {} 开始动筷子了".format(name))


t1 = threading.Thread(target=chihuoguo, args=('xiaoming', ))
t2 = threading.Thread(target=chihuoguo, args=('xiaobai', ))
t1.start()
t2.start()                                  
time.sleep(3)
print("主线程告诉大家可以开始吃喽")
event.set()
# 可以看出最后才打印主线程告诉大家可以开始吃了，因为wait超时，就直接运行下去了
# 就好比约定大家几点开始一起进攻，但是主帅睡过头了，其他人不管他就按照约定去攻打了

In [None]:
# 一旦标志被set激活，而又没clear，则wait不会阻塞
import threading, time

event = threading.Event()
def chihuoguo(name):
    print("{} 已经启动".format(threading.current_thread().getName()))
    print("小伙伴 %s 已经进入就餐状态" % name)
    time.sleep(1)
    event.wait()
    print("%s 收到通知了"% threading.current_thread().getName())
    print("小伙伴 {} 开始动筷子了".format(name))

event.set()
t1 = threading.Thread(target=chihuoguo, args=('xiaoming', ))
t2 = threading.Thread(target=chihuoguo, args=('xiaobai', ))
t1.start()
t2.start()                                  
time.sleep(1)
print("主线程告诉大家可以开始吃喽")
event.set()

# 多线程条件类（Condition）:条件变量同步
- 满足条件之后才能够执行
- class threading.Condition(lock=None)
    - 这个类实现条件变量对象。条件变量允许一个或多个线程等待，直到它们被另一个线程唤醒。
    - 如果给出了lock参数而不是None，则它必须是Lcok或RLock对象，并以它作为底层的锁。否则将默认创建一个RLock对象。
    - Condition遵循上下文管理协议。
    - 和Event事件类差不多，只是多了锁功能

- 实例方法
    - acquire()：申请锁
    - release()
        - 释放锁。这个方法调用底层锁的相应方法。

    - wait(timeout=None)
        - 当前线程处于等待状态，并且会释放锁，可以被其他线程使用notify或者notify_all唤醒，被唤醒后等待上锁，上锁后继续执行下面的代码
        - 线程挂起，等待被唤醒(其他线程的notify方法)或者发生超时。调用该方法的线程必须先获得锁，否则引发RuntimeError。
        该方法会释放底层锁，然后阻塞，直到它被另一个线程中的相同条件变量的notify()或notify_all()方法唤醒，或者发生超时。一旦被唤醒或超时，它会重新获取锁并返回。
        返回值为True，如果给定timeout并发生超时，则返回False。

    - wait_for(predicate, timeout=None)
        - 等待直到条件变量的返回值为True。predicate应该是一个返回值可以解释为布尔值的可调用对象。可以设置timeout以给定最大等待时间。
        - 该方法可以重复调用wait()，直到predicate的返回值解释为True，或发生超时。该方法的返回值就是predicate的最后一个返回值，如果发生超时，返回值为False。它与wait()的规则相同：调用前必须先获取锁，阻塞时释放锁，并在被唤醒时重新获取锁并返回。
    - notify(n=1)
        - 通知某个正在等待的线程，默认是1个等待的线程
        - 默认情况下，唤醒等待此条件变量的一个线程(如果有)。调用该方法的线程必须先获得锁，否则引发RuntimeError。
        - 该方法最多唤醒n个等待中的线程，如果没有线程在等待，它就是要给无动作的操作。
        - 注意：要被唤醒的线程实际上不会马上从wait()方法返回(唤醒)，而是等到它重新获取锁。这是因为notify()并不会释放锁，需要线程本身来释放(通过wait()或者release())

     - notify_all()
         - 此方法类似于notify()，但唤醒的是所有等待的线程。
         - notify和notify_all是不会释放锁的，并且在release之前使用



In [1]:
# 多线程条件类示例：threading.Condition()
import threading
import time


num = 0
con = threading.Condition()


class Producer(threading.Thread):
    """生产者"""
    def run(self):
        global num
        # 获取锁
        con.acquire()
        while True:
            num += 1
            print('生产了1个，现在有{0}个'.format(num))
            time.sleep(1)
            if num >= 5:
                print('已达到5个，不再生产')
                # 唤醒消费者
                con.notify()
                # 等待-释放锁；被唤醒-获取锁
                con.wait()
        # 释放锁
        con.release()


class Customer(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.money = 3

    def run(self):
        global num
        while self.money > 0:
            # 由于场景是多个消费者进行抢购，如果将获取锁操作放在循环外(如生产者),
            # 那么一个消费者线程被唤醒时会锁住整个循环，无法实现另一个消费者的抢购。
            # 在循环中添加一套"获取锁-释放锁",一个消费者购买完成后释放锁，其他消费者
            # 就可以获取锁来参与购买。
            con.acquire()
            if num <= 0:
                print('没货了，{0}通知生产者'.format(
                    threading.current_thread().name))
                con.notify()
                con.wait()
            self.money -= 1
            num -= 1
            print('{0}消费了1个, 剩余{1}个'.format(
                threading.current_thread().name, num))
            con.release()
            time.sleep(1)
        print('{0}没钱了-回老家'.format(threading.current_thread().name))


if __name__ == '__main__':
    p = Producer(daemon=True)
    c1 = Customer(name='Customer-1')
    c2 = Customer(name='Customer-2')
    p.start()
    c1.start()
    c2.start()
    c1.join()
    c2.join()

生产了1个，现在有1个
生产了1个，现在有2个
生产了1个，现在有3个
生产了1个，现在有4个
生产了1个，现在有5个
已达到5个，不再生产
Customer-1消费了1个, 剩余4个
Customer-2消费了1个, 剩余3个
Customer-1消费了1个, 剩余2个
Customer-2消费了1个, 剩余1个
Customer-1消费了1个, 剩余0个
没货了，Customer-2通知生产者
生产了1个，现在有1个
Customer-1没钱了-回老家
生产了1个，现在有2个
生产了1个，现在有3个
生产了1个，现在有4个
生产了1个，现在有5个
已达到5个，不再生产
Customer-2消费了1个, 剩余4个
Customer-2没钱了-回老家


In [1]:
# condition案例2
import threading
import random
import time

gProduct = 1000
gCondition = threading.Condition()
gTotalTimes = 10
gTimes = 0


class Producer(threading.Thread):
    def run(self):
        global gTimes
        global gProduct
        while True:
            gCondition.acquire()
            product = random.randint(100, 1000)
            if gTimes >= gTotalTimes:
                gCondition.release()
                break
            gProduct += product
            print(f"第{gTimes + 1}次生产：{threading.current_thread().getName()}生产了{product}件产品，现有{gProduct}件产品")
            gTimes += 1
            # 通知等待的线程，但不会释放锁
            gCondition.notify_all()
            # 需要手动释放锁
            gCondition.release()
            time.sleep(0.5)


class Consumer(threading.Thread):
    def run(self):
        global gTimes
        global gProduct
        while True:
            gCondition.acquire()
            product = random.randint(100, 1000)
            while gProduct < product:
                if gTimes >= gTotalTimes:
                    gCondition.release()
                    return
                print("商品正在备货中，请耐心等待")
                gCondition.wait()
            gProduct -= product
            print(f"{threading.current_thread()} 消费了{product}件产品, 还剩余{gProduct}件产品")
            gCondition.release()
            time.sleep(0.5)

            


def main():
    # 5个消费者
    for i in range(5):
        c = Consumer(name=f"消费者{i}")
        c.start()
    # 4个生产者
    for i in range(4):
        p = Producer(name=f"生产者{i}")
        p.start()

if __name__ == "__main__":
    main()
# 不知道为啥运行不成功，pycharm可以

<Consumer(消费者0, started 9544)> 消费了407件产品, 还剩余593件产品
<Consumer(消费者1, started 16320)> 消费了133件产品, 还剩余460件产品
商品正在备货中，请耐心等待
商品正在备货中，请耐心等待
<Consumer(消费者4, started 1352)> 消费了404件产品, 还剩余56件产品
第1次生产：生产者0生产了198件产品，现有254件产品
商品正在备货中，请耐心等待
商品正在备货中，请耐心等待
第2次生产：生产者1生产了473件产品，现有727件产品
第3次生产：生产者2生产了823件产品，现有1550件产品
<Consumer(消费者2, started 15952)> 消费了887件产品, 还剩余663件产品
商品正在备货中，请耐心等待
第4次生产：生产者3生产了704件产品，现有1367件产品
<Consumer(消费者3, started 15520)> 消费了775件产品, 还剩余592件产品
商品正在备货中，请耐心等待
商品正在备货中，请耐心等待
商品正在备货中，请耐心等待
第5次生产：生产者0生产了792件产品，现有1384件产品
<Consumer(消费者0, started 9544)> 消费了606件产品, 还剩余778件产品
<Consumer(消费者1, started 16320)> 消费了662件产品, 还剩余116件产品
商品正在备货中，请耐心等待
第6次生产：生产者1生产了589件产品，现有705件产品
商品正在备货中，请耐心等待
<Consumer(消费者2, started 15952)> 消费了152件产品, 还剩余553件产品
第7次生产：生产者2生产了965件产品，现有1518件产品
<Consumer(消费者4, started 1352)> 消费了947件产品, 还剩余571件产品
第8次生产：生产者3生产了626件产品，现有1197件产品
<Consumer(消费者3, started 15520)> 消费了290件产品, 还剩余907件产品
第9次生产：生产者0生产了551件产品，现有1458件产品
<Consumer(消费者1, started 16320)> 消费了458件产品, 还剩余1000件产品
<Consumer(消费者0

In [3]:
import threading
import random
import time

gProduct = 1000
gTotalTimes = 10
gTimes = 0
gCondition = threading.Condition()


class Producer(threading.Thread):
    def run(self):
        global gProduct
        global gTimes
        while True:
            new_product = random.randint(100, 1000)
            gCondition.acquire()
            if gTimes >= gTotalTimes:
                gCondition.release()
                break
            gProduct += new_product
            print(f"第{gTimes + 1}次生产：{threading.current_thread().getName()}生产了{new_product}件产品，现有{gProduct}件产品")
            gTimes += 1
            gCondition.notify_all()
            gCondition.release()
            time.sleep(0.5)


class Consumer(threading.Thread):
    def run(self):
        global gProduct
        global gTimes
        while True:
            con_prod = random.randint(100, 1000)
            gCondition.acquire()
            while gProduct < con_prod:
                if gTimes >= gTotalTimes:
                    gCondition.release()
                    # 使用return直接退出循环，而break的话只退出一层循环
                    return
                print("商品不足，正在备货中，请耐心等待")
                gCondition.wait()
            gProduct -= con_prod
            print(f"{threading.current_thread()}消费了{con_prod}件产品，还剩余{gProduct}件产品")
            gCondition.release()
            time.sleep(0.4)


def main():
    for i in range(5):
        p = Producer(name=f"生产者{i}")
        p.start()

    for i in range(5):
        c = Consumer(name=f"消费者{i}")
        c.start()


if __name__ == "__main__":
    main()

第1次生产：生产者0生产了336件产品，现有1336件产品
第2次生产：生产者1生产了750件产品，现有2086件产品
第3次生产：生产者2生产了280件产品，现有2366件产品
第4次生产：生产者3生产了930件产品，现有3296件产品
第5次生产：生产者4生产了391件产品，现有3687件产品
<Consumer(消费者0, started 14352)>消费了747件产品，还剩余2940件产品
<Consumer(消费者1, started 14456)>消费了356件产品，还剩余2584件产品
<Consumer(消费者2, started 9724)>消费了184件产品，还剩余2400件产品
<Consumer(消费者3, started 7356)>消费了778件产品，还剩余1622件产品
<Consumer(消费者4, started 1232)>消费了614件产品，还剩余1008件产品
<Consumer(消费者0, started 14352)>消费了584件产品，还剩余424件产品
商品不足，正在备货中，请耐心等待
商品不足，正在备货中，请耐心等待
<Consumer(消费者3, started 7356)>消费了375件产品，还剩余49件产品
商品不足，正在备货中，请耐心等待
第6次生产：生产者0生产了844件产品，现有893件产品
<Consumer(消费者1, started 14456)>消费了653件产品，还剩余240件产品
商品不足，正在备货中，请耐心等待
商品不足，正在备货中，请耐心等待
第7次生产：生产者1生产了453件产品，现有693件产品
商品不足，正在备货中，请耐心等待
<Consumer(消费者4, started 1232)>消费了284件产品，还剩余409件产品
第8次生产：生产者2生产了381件产品，现有790件产品
<Consumer(消费者2, started 9724)>消费了741件产品，还剩余49件产品
第9次生产：生产者3生产了327件产品，现有376件产品
第10次生产：生产者4生产了451件产品，现有827件产品
<Consumer(消费者0, started 14352)>消费了326件产品，还剩余501件产品
<Consumer(消费者3, started 7356)>消费了174件产品，还剩余3

# 多线程定时类（Timer）:threading.Timer(interval, function)
- 定时器 就是隔多长时间去触发任务执行,指定n秒后执行某操作,可以使用cancel提前取消。
- interval 第一个参数传 间隔时间;function  传执行任务的函数  隔了多少秒后执行这个函数
- Timer从Thread派生，没有增加实例方法。

In [None]:
from threading import Timer
 
def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

# 多进程
- fork()函数：也可以创建多进程
    - 在Unix/Linux系统中，提供了一个fork()函数调用，相较于普通函数调用一次，返回一次的机制，fork()调用一次，返回两次，具体表现为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程)，然后分别在父进程和子进程内返回。
    - 子进程永远返回0，而父进程返回子进程的ID，这样一个父进程可以轻松fork出很多子进程。且父进程会记下每个子进程的ID，而子进程只需要调用getppid()就可以拿到父进程的ID。
    - 通过fork调用这种方法，一个进程在接到新任务时就可以复制出一个子进程来处理新任务，例如nginx就是由父进程(master process)监听端口，再fork出子进程(work process)来处理新的http请求。
    - Windows没有fork调用，所以在window pycharm上运行以上代码无法实现以上效果。
- 在os中可以通过os.getppid(),os.getpid()查看父进程和自身进程的ID
- 守护进程：父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束
    - 特点：
        1.守护进程会在主进程结束的时候而终止
        2.守护进程将无法再创建子进程
- 主进程创建守护进程
    - 同线程创建方法一样
- 值得注意的是：如果在window操作系统下，所有和进程相关的代码都必须放在__name__ == "__main__"下面，否则会报错
        
        
# Python中实现多进程之multiprocessing模块
- 本质来说，multiprocessing不是一个模块，而是一个包
- 其包含内容大致可以分为四个部分：
    - 创建多进程部分：multiprocessing.Process,subprocess模块（不是multiprocessing中的模块）
    - 多进程同步部分：进程锁multiprocessing.Lock
    - 进程池部分：multiprocessing.Pool
    - 多进程数据共享部分:multiprocessing.Queue, multiprocessing.Pipe
    

# 创建进程的方法
- 直接实例化multiprocessing.Process()
- 派生multiprocessing.Process()的子类
- subprocess模块
- 进程池方法

# multiprocessing.Process()方法
- 实例化一个multiprocessing.Process的对象，并传入一个初始化函数对象（initial function )作为新建进程执行入口；


In [2]:
import threading, multiprocessing
print(multiprocessing.__all__)
print("------------------")
print(threading.__all__)


['Array', 'AuthenticationError', 'Barrier', 'BoundedSemaphore', 'BufferTooShort', 'Condition', 'Event', 'JoinableQueue', 'Lock', 'Manager', 'Pipe', 'Pool', 'Process', 'ProcessError', 'Queue', 'RLock', 'RawArray', 'RawValue', 'Semaphore', 'SimpleQueue', 'TimeoutError', 'Value', 'active_children', 'allow_connection_pickling', 'cpu_count', 'current_process', 'freeze_support', 'get_all_start_methods', 'get_context', 'get_logger', 'get_start_method', 'log_to_stderr', 'reducer', 'set_executable', 'set_forkserver_preload', 'set_start_method']
------------------
['get_ident', 'active_count', 'Condition', 'current_thread', 'enumerate', 'main_thread', 'TIMEOUT_MAX', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size']


In [None]:
import os
from random import randint
import time

def download(filename):
    print("进程号是%s"% os.getpid())
    print("现在开始下载{}".format(filename))
    time.sleep(1)
    
def runtask():
    start_time = time.time()
    download("新白娘子传奇")
    download("哪吒")
    stop_time = time.time()
    print("下载耗时{}".format(stop_time-start_time))
    
if __name__ == "__main__":
    runtask()

In [3]:
# 调用Process模拟开启两个进程
import time
import os
import multiprocessing

def download(filename):
    print("进程号是%s"% os.getpid())
    print("父进程是{}".format(os.getppid()))
    print("现在开始下载{}".format(filename))
    time.sleep(1)
    
def main():
    start_time = time.time()
    p1 = multiprocessing.Process(target=download, args=("新白娘子传奇", ))
    p2 = multiprocessing.Process(target=download, args=("西游记", ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end_time = time.time()
    print("总共耗时：{}".format(end_time - start_time))

if __name__ == "__main__":
    main()
# 运行有bug，不知道为啥，可以使用pycharm

总共耗时：0.5265917778015137


# 继承multiprocessing.Process()
- 要构造__init__函数
- 重写run函数

In [None]:
import multiprocessing
import os, time

class myProcess(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        
    def run(self):
        print("子进程开始>>> pid={0}, ppid={1}".format(os.getpid(), os.getppid()))
        time.sleep(1)
        print("子进程终止>>> pid={}".format(os.getpid()))
        
def main():
    print("主进程开始{}".format(__name__))
    p1 = myProcess()
    p2 = myProcess()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("程序运行完毕")
    
if __name__ == "__main__":
    main()

# subprocess模块
- 目的是允许产生一个新的进程，连接输入(input)/输出(output)/错误(error)的管道，返回子进程的返回值，替代老模块如os.system, os.spawn
- Sunprocess模块可以在程序执行过程中，调用外部的程序。
- 比如我们可以在python程序中打开记事本，打开cmd，或者在某个时间点关机


In [6]:
import subprocess
import os
a = os.system("df - Th")
print(a)
#subprocess.run(["df", "-h"])

1


# 进程同步：进程锁
- 加锁可以保证多个进程修改同一块数据时，同一时间只能有一个任务可以进行修改，即串行的修改，牺牲了速度却保证了数据安全。进程锁的问题：

        效率低（共享数据基于文件，而文件是硬盘上的数据）
        需要自己加锁处理


# 进程池Pool
- 用来装多个进程的池子。有时工作需要创建多个子进程，但子进程数量过多，会给计算机带来负担，为了避免无限的消耗资源，通过Pool可以规定最多创建几个子进程，同时让子进程循环使用。
- 初始化Pool时，可以规定一个最大的进程数，当有新的请求提交到Pool中时，如果池还没满，就会创建一个新的进程执行该请求，但若进程中的进程数已达到最大值，那么直到池中某个进程结束才会用该进程执行该请求。

# Pool常用函数
- apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
    - 异步非阻塞式的方法，子进程与父进程并行执行
    - callback:回调函数，若callback= func, 子进程执行完后，才会执行callback中的函数，否则callback不执行（而且callback中的函数是由父进程来执行了）
                进程池中任何一个任务一旦处理完了，就立即告知主进程：我好了额，你可以处理我的结果了。主进程则调用一个函数去处理该结果，该函数即回调函数
- apply(func, args, kwds)
    - 同步阻塞式方法，子进程与父进程同步执行
    - 同步与异步的的区分
        - 同步
            - 同步是指一个进程执行结束后才能执行另外一个进程，父进程是最后执行完毕的
        - 异步
            - 异步是指进程在执行某个请求时，不管其他的进程的状态，这个进程就执行后续操作；当有消息返回时系统会通知进程进行处理，这样可以提高执行的效率。所以异步执行父进程不会等待子进程，所以需要使用join()
- terminate():不管任务是否完成， ⽴即关闭进程池
- join():主进程等待所有子进程执行完毕，必须在close或terminete之后
- close():关闭Pool， 使其不再接受新的任务；等待所有进程结束才关闭线程池

In [None]:
# 异步非阻塞执行apply_async()
# 同步阻塞速度很慢，建议弃用
from multiprocessing import Pool
from random import randint
import os,time

def download(taskname):
    print("进程号是{}".format(os.getpid()))
    downloadtime = randint(1,3)
    print("现在开始下载{}".format(taskname))
    time.sleep(downloadtime)

def runtask():
    start_time = time.time()
    pool = Pool(4)
    for task in range(8):
        pool.apply_async(download, args=(task, ))
    pool.close()
    pool.join()
    end_time = time.time()
    print("下载耗时：{}".format(end_time - start_time))
    
if __name__ == "__main__":
    runtask()

In [None]:
# apply_async中的callback回调函数
from multiprocessing import Pool
import os, time, random

# 子进程任务函数
def download(f):
    print("进程中的进程：pid = {}, ppid = {}".format(os.getpid(), os.getppid()))
    for i in range(3):
        print(f, "--文件--%d"%i)
        time.sleep(1)
    return {"result":1, "info": "下载完成！"}

# 主进程调用回调函数
# 此处msg参数是从子进程任务函数return得到的字典
def alterUser(msg):
    print("callback function: pid = {}".format(os.getpid()))
    print("get result:", msg['info'])

if __name__ == "__main__":
    p = Pool(3)
    for arg in ['1111', '2222', '3333', '4444']:
        p.apply_async(download, args=(arg, ), callback=alterUser)
    p.close()
    p.join()
    print("程序结束")

In [1]:
# 进程锁
from multiprocessing import Process, Lock
import os


def f(l, i):
    l.acquire()
    try:
        print(os.getpid())
        print('hello world', i)
    finally:
        l.release()


if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

# 进程间通信
- 用来解决进程锁存在的问题
- 由于进程间数据是不共享的，所以不会出现多线程GIL带来的问题，多进程之间的通信通过Queue或Pipe来实现

## 队列：multiprocessing.Queue()
- 使用方法和threading里面的Queue差不多
- 底层队列使用管道和锁实现
- Queue常用方法介绍
    - q = Queue([maxsize]) 
        - 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数，则无大小限制。底层队列使用管道和锁定实现。另外，还需要运行支持线程以便队列中的数据传输到底层管道中。 
    - Queue的实例方法：

    - q.get( [ block [ ,timeout ] ] ) 
        - 返回q中的一个项目。如果q为空，此方法将阻塞，直到队列中有项目可用为止。block用于控制阻塞行为，默认为True. 如果设置为False，将引发Queue.Empty异常（定义在Queue模块中）。timeout是可选超时时间，用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用，将引发Queue.Empty异常。

    - q.get_nowait( ) 
        - 同q.get(False)方法。
    - q.put(item [, block [,timeout ] ] ) 
        - 将item放入队列。如果队列已满，此方法将阻塞至有空间可用为止。block控制阻塞行为，默认为True。如果设置为False，将引发Queue.Empty异常（定义在Queue库模块中）。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    - q.qsize() 
        - 返回队列中目前项目的正确数量。此函数的结果并不可靠，因为在返回结果和在稍后程序中使用结果之间，队列中可能添加或删除了项目。在某些系统上，此方法可能引发NotImplementedError异常。
    - q.empty() 
        - 如果调用此方法时 q为空，返回True。如果其他进程或线程正在往队列中添加项目，结果是不可靠的。也就是说，在返回和使用结果之间，队列中可能已经加入新的项目。
    - q.full() 
        - 如果q已满，返回为True. 由于线程的存在，结果也可能是不可靠的（参考q.empty（）方法）
    - q.close() 
        - 关闭队列，防止队列中加入更多数据。调用此方法时，后台线程将继续写入那些已入队列但尚未写入的数据，但将在此方法完成时马上关闭。如果q被垃圾收集，将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如，如果某个使用者正被阻塞在get（）操作上，关闭生产者中的队列不会导致get（）方法返回错误。
    - q.cancel_join_thread() 
        - 不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
    - q.join_thread() 
        - 连接队列的后台线程。此方法用于在调用q.close()方法后，等待所有队列项被消耗。默认情况下，此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
        
### 解决发送信号结束问题：JoinableQueue
- 但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
- 除了与Queue对象相同的方法之外还具有：
    - q.task_done()：使用者使用此方法发出信号，表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量，将引发ValueError异常
    - q.join():生产者调用此方法进行阻塞，直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止，也就是队列中的数据全部被get拿走了。


## 管道：multiprocessing.Pipe（）
- 本质是数据传递而不是数据共享，管道有两道口，两个进程分别位于管道的两端，一端用来发送数据，一端用来接收数据。每端都有send()和recv()方法，如果两个进程试图在同一时间的同一端进行读取和写入，那么有可能会损坏管中的数据。
- 在进程间创建一条管道，并返回元组（conn1,conn2）,其中conn1和conn2是表示管道两端的Connection对象。默认控制下管道是双向的。如果将duplex设为False，conn1只能接受，conn2只能用于发送。(conn1, conn2) = multiprocessing.Pipe(duplex=True)
- 实例方法：
    - conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收，recv方法会一直阻塞。如果连接的另外一端已经关闭，那么recv方法会抛出EOFError。
    - conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

    - conn1.close():关闭连接。如果conn1被垃圾回收，将自动调用此方法

    - conn1.fileno():返回连接使用的整数文件描述符

    - conn1.poll([timeout]):如果连接上的数据可用，返回True。timeout指定等待的最长时限。如果省略此参数，方法将立即返回结果。如果将timeout射成None，操作将无限期地等待数据到达。

    - conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息，超过了这个最大值，将引发IOError异常，并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭，再也不存在任何数据，将引发EOFError异常。

    - conn.send_bytes(buffer [, offset [, size]])：通过连接发送字节数据缓冲区，buffer是支持缓冲区接口的任意对象，offset是缓冲区中的字节偏移量，而size是要发送字节数。结果数据以单条消息的形式发出，然后调用c.recv_bytes()函数进行接收    

    - conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息，并把它保存在buffer对象中，该对象支持可写入的缓冲区接口（即bytearray对象或类似的对象）。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间，将引发BufferTooShort异常。
- 如果是生产者或消费者中都没有使用管道的某个端点，就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端，在消费者中关闭管道的输入端。
- 多消费者时调用需要加锁

## multiprocessing.Manager()
- 实现进程间数据共享。Manager()返回的manager对象会通过一个服务进程，来使其他进程通过代理的方式操作python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array.
- 常用方法与属性与multiprocessing中的一致

In [12]:
# multiprocessing.Queue简单举例
from multiprocessing import Process, Queue
import time
import os

q = Queue(15)
def put(q):
    try:
        for i in range(3):
            q.put_nowait(i)
            print("{}放入了{}".format(os.getpid(), i))
    except:
        print("放入程序被阻塞了")

def out(q):
    try:
        for i in range(3):
            q.get_nowait()
            print("{}取出了{}".format(os.getpid(), i))
    except:
        print("{}的取出程序被阻塞了".format(os.getpid()))
if __name__ == "__main__":
    start_time = time.time()
    print("主程序开始了")
    q.put("游戏开始")
    put_l, out_l = [], []
    for i in range(5):
        o = Process(target=out, args=(q, ))
        p = Process(target=put, args=(q,))
        p.start()
        o.start()
        put_l.append(p)
        out_l.append(o)
    for p in put_l:
        p.join()
    for o in out_l:
        o.join()
    end_time = time.time()
    # 一旦取出程序被阻塞，那么q中的数据就会还有剩余
    print("q中含有的数据为：{}".format(q.qsize()))
    print("总共耗时:{}".format(end_time - start_time))

soryy, the queue is full
0
1
2
抱歉啊，队列已经空空的了


In [13]:
# 消费者生产者模型
from multiprocessing import Process, Queue
import time
import os

def prodecer(q):
    for i in range(10):
        s = "厨师制作的" + str(i) + "号菜品"
        q.put(s)

def consumer(q):
    while True:
        time.sleep(0.2)
        try:
                ifo = q.get_nowait()
                print("{} 吃掉了 {}".format(os.getpid(), ifo))
        except:
            break

if __name__ == '__main__':
    print("主程序开始运行")
    q = Queue()
    p = Process(target=prodecer, args=(q, ))
    c = Process(target=consumer, args=(q, ))
    p.start()
    c.start()
    p.join()
    c.join()
    print("程序运行结束")
    
"""
#另外一种写法
def prodecer(q):
    for i in range(10):
        s = "厨师制作的" + str(i) + "号菜品"
        q.put(s)
    q.put(None)
def consumer(q):
    while True:
        time.sleep(0.2)
        ifo = q.get_nowait()
        if ifo is None:
            break
        print("{} 吃掉了 {}".format(os.getpid(), ifo))
"""

主程序开始了
所有程序结束


In [14]:
# 多生产者多消费者模型，发送结束信号是个麻烦
from multiprocessing import Process, Queue
import time
import os

def prodecer(q, j):
    print("第{}个子进程开始生产".format(j+1))
    for i in range(10):
        s = "第"+ str(j+1) +"个进程制作的" + str(i+1) + "号菜品"
        q.put(s)
    print("这是进程{}".format(os.getpid()))
def consumer(q):
    while True:
        try:
            time.sleep(0.2)
            ifo = q.get_nowait()
            print("{} 吃掉了 {}".format(os.getpid(), ifo))
        # 下面语法是先让程序休眠，因为消费者可能比生产者跑得快，还没生产出来
        # 为了等一会生产者就休眠，一旦等到生产者生产完毕不再生产，
        # 消费者等了0.1秒后没反应就直接结束程序
        # 虽然能够结束程序，但是不好的点是休眠时间是自己设定的，
        # 时间设定的不好就会过早或过晚结束程序，造成错误或浪费资源
        except:
            time.sleep(0.1)
            if q.empty() is True:
                break
            else:
                pass

if __name__ == '__main__':
    print("主程序开始运行")
    q = Queue()
    pro, con = [], []
    for i in range(10):
        p = Process(target=prodecer, args=(q, i))
        c = Process(target=consumer, args=(q, ))
        p.start()
        c.start()
        pro.append(p)
        con.append(c)
    for p in pro:
        p.join()
    for c in con:
        c.join()
    print("程序运行结束")

主程序开始运行
程序运行结束


In [15]:
# JoinableQueue举例实现多生产者多消费者模型
import multiprocessing
from multiprocessing import Process, JoinableQueue
import time, os


def producer(q, j):
    print("第{}个进程开始生产".format(j + 1))
    for i in range(10):
        s = "进程" + str(j+1) + "生产" + "第" + str(i + 1) + "个产品"
        q.put(s)
    print("进程{}的生产完毕".format(j + 1))
    # 生产完毕，使用此方法进行阻塞，直到队列中所有项目均被消费者消费完。
    # 等到收到消费者返还的task_done信息，新的进程才开始接着生产
    q.join()


def consumer(q):
    while True:
        ifo = q.get()
        print("进程{}吃掉了{}".format(os.getpid(), ifo))
        q.task_done()  # 向q.join()发送一次信号,表明队列中的所有项目已经被消费完毕


if __name__ == "__main__":
    print("主程序开始运行")
    p_l, c_l = [], []
    q = JoinableQueue()
    for i in range(5):
        p = Process(target=producer, args=(q, i))
        c = Process(target=consumer, args=(q,))
        # 如果不加守护，那么子进程永远结束不了，但是加了守护之后，
        # 必须确保生产者的内容生产完并且被处理完了，
        # 所以必须还要在主进程给生产者设置join，才能确保生产者生产的任务被执行完了
        # 并且能够确保守护进程在所有任务执行完成之后才随着主进程的结束而结束。
        c.daemon = True  # 或者在c中加入daemon=True,daemon必须要设置在start以前
        p.start()
        c.start()
        p_l.append(p)


    for p in p_l:
        p.join()
    # 不能再加c.join(),加了之后主程序无法结束
    print("程序运行完毕")

SyntaxError: invalid syntax (<ipython-input-15-1dc88417c600>, line 19)

In [5]:
# pipe实现生产者消费者模型
from multiprocessing import Process, Pipe
import os

def producer(seq, pipe):
    print("生产者开始生产")
    prd, con = pipe
    con.close()
    for i in seq:
        s = str(os.getpid()) + "发送红包" + str(i)
        prd.send(s)
        print(s)


def consumer(pipe):
    while True:
        try:
            prd, con = pipe
            prd.close()
            rv = con.recv()
            print("消费者接受了{}".format(rv))
        except EOFError: #当出现recv阻塞情况时退出，有点类似Queue中发送结束信号
            break

def main():
    seq = [i for i in range(10)]
    pro, con = Pipe()

    c = Process(target=consumer, args=((pro, con), ))
    c.start()
    producer(seq, (pro, con))
    pro.close()
    con.close()
    c.join()

if __name__ == '__main__':
    print("程序开始")
    main()
    print("程序结束")

In [16]:
# pipe示例
import multiprocessing
import time

def consumer(output_p):
    while True:
        item = output_p.recv()
        print("item {} is received".format(item))


def producer(input_p):
    for item in range(10):
        input_p.send(item)


if __name__ == "__main__":
    (output_p, input_p) = multiprocessing.Pipe()
    producer(input_p)

    consumer_p = multiprocessing.Process(target=consumer, args=(output_p,  ))
    consumer_p.start()
    time.sleep(5)
    consumer_p.terminate()
    consumer_p.join()
    print("程序运行结束")


程序运行结束


# 为什么多处理模块需要调用特定的 freeze_support模块才能“冻结”来生成Windows可执行文件？
- 原因是在Windows上缺少fork()(这是 not完全正确).因此,在Windows上,通过创建一个新的进程来模拟叉,其中正在运行在Linux上运行的代码(在子进程中运行).由于代码将在技术上无关的流程中运行,因此必须在代码运行之前交付代码.它的交付方式首先是被腌制,然后通过管道从原始流程发送到新的流程.此外,这个新进程被通知它必须运行通过管道传递的代码,通过将–multiprocessing-fork命令行参数传递给它.如果您查看了一个关于freeze_support()函数的 implementation,那么它的任务是检查它正在运行的进程是否运行通过管道传递的代码.

In [3]:
# multiprocessing.Manager示例

import multiprocessing


# 定义简单函数对不可变元素进行修改
def simple_test(id, test_dict):
    test_dict['name'] = id
    print(test_dict['name'])

# 定义复杂函数对可变对象进行修改
def complex_test(id, test_dict):
    """
    test_dict['name'] = ['jiaojiao', 'xiaoxiao']
    test_dict['name'][0] = id
    """
    # 以上方法无法对可变元素进行修改，只有赋值才能解决问题
    test_dict['name'] = ['jiaojiao', 'xiaoxiao']
    value = test_dict['name']
    value[0] = id
    test_dict['name'] = value

    print(test_dict)

if __name__ == "__main__":
    # windows中必须要有这句语句，否则程序报错
    multiprocessing.freeze_support()
    with multiprocessing.Manager() as manager:  # 使用with语句可自动给manager上锁，否则需自己上锁
        test_dict = manager.dict()
        test_dict['name'] = 'xiaoxiao'
        for i in range(10):
            p = multiprocessing.Process(target=complex_test, args=(i, test_dict))
            p.start()
        p.join()

In [None]:
# Pool+ Manager
# multiprocessing.Manager示例

import multiprocessing
from multiprocessing import Pool

# 定义简单函数对不可变元素进行修改
def simple_test(id, test_dict):
    test_dict['name'] = id
    print(test_dict['name'])

# 定义复杂函数对可变对象进行修改
def complex_test(id, test_dict):
    """
    test_dict['name'] = ['jiaojiao', 'xiaoxiao']
    test_dict['name'][0] = id
    """
    # 以上方法无法对可变元素进行修改，只有赋值才能解决问题
    test_dict['name'] = ['jiaojiao', 'xiaoxiao']
    value = test_dict['name']
    value[0] = id
    test_dict['name'] = value

    print(test_dict)

if __name__ == "__main__":
    # windows中必须要有这句语句，否则程序报错
    multiprocessing.freeze_support()
    pool = Pool(4)
    with multiprocessing.Manager() as manager:  # 使用with语句可自动给manager上锁，否则需自己上锁
        test_dict = manager.dict()
        test_dict['name'] = 'xiaoxiao'
        for i in range(20):
            pool.apply_async(complex_test, args=(i, test_dict))
        pool.close()
        pool.join()

# 多进程之信号量Semaphore
- 互斥锁同时只允许一个线程更改数据，而信号量Semaphore是同时允许一定数量的线程更改数据 。
- 信号量同步基于内部计数器，每调用一次acquire()，计数器减1；每调用一次release()，计数器加1.当计数器为0时，acquire()调用被阻塞。这是迪科斯彻（Dijkstra）信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
- 和多线程Semaphore的用法很像


In [1]:
# 信号量举例
from multiprocessing import Process, Semaphore
import time, random


def go_ktv(sem, user):
    sem.acquire()
    print('%s 占到一间ktv小屋' % user)
    time.sleep(random.randint(0, 3))  # 模拟每个人在ktv中待的时间不同
    sem.release()


if __name__ == '__main__':
    sem = Semaphore(4)
    p_l = []
    for i in range(13):
        p = Process(target=go_ktv, args=(sem, 'user%s' % i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')



# 多进程之事件类
- python线程的事件(Event)用于主线程控制其他线程的执行，事件主要提供了三个方法 set、wait、clear。
    - 事件处理的机制：全局定义了一个“Flag”，如果“Flag”值为 False，那么当程序执行 event.wait 方法时就会阻塞，如果“Flag”值为True，那么event.wait 方法时便不再阻塞。
    - clear：将“Flag”设置为False
    - set：将“Flag”设置为True

In [None]:
# 事件方法的使用
from multiprocessing import Event

e = Event()  # 创建一个事件对象
print(e.is_set())  # is_set()查看一个事件的状态，默认为False，可通过set方法改为True
print('look here！')
# e.set()          #将is_set()的状态改为True。
# print(e.is_set())#is_set()查看一个事件的状态，默认为False，可通过set方法改为Tr
# e.clear()        #将is_set()的状态改为False
# print(e.is_set())#is_set()查看一个事件的状态，默认为False，可通过set方法改为Tr
e.wait()  # 根据is_set()的状态结果来决定是否在这阻塞住，is_set()=False那么就阻塞，is_set()=True就不阻塞
print('give me！！')

# set和clear  修改事件的状态 set-->True   clear-->False
# is_set     用来查看一个事件的状态
# wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞

False
look here！
