In [15]:
# 11-1 python 中的 GIL

#gil global interpreter lock （cpython）
#python中一个线程对应于c语言中的一个线程
#gil使得同一个时刻只有一个线程在一个cpu上执行字节码, 无法将多个线程映射到多个cpu上执行

#gil会根据执行的字节码行数以及时间片释放gil，gil在遇到io的操作时候主动释放  ---  该特性使得在处理io密集型的操作时是适用的
# import dis
# def add(a):
#     a = a+1
#     return a
# 
# print(dis.dis(add))

total = 0

def add():
    #1. dosomething1
    #2. io操作
    # 1. dosomething3
    global total
    for i in range(1000000):
        total += 1
def desc():
    global total
    for i in range(1000000):
        total -= 1

import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print(total)   # 出现由于GIL导致的数据错乱的问题

'''
加锁版本解决问题：
    def add():
        #1. dosomething1
        #2. io操作
        # 1. dosomething3
        global total
        for i in range(1000000):
            lock.acquire()
            total += 1
            lock.release()
    def desc():
        global total
        for i in range(1000000):
            lock.acquire()
            total -= 1
            lock.release()
    
    import threading
    lock = threading.Lock()
'''
print()     #  实际上 Python的gil使得其内置的一些类型的实现为多线程的效率高            GIL锁的是字节码 
# gil使得当一个线程到达最高的gil释放条件的时候释放gil，此时其他的线程就能执行操作   ---  说Python多线程不好，就是因为gil导致释放的问题导致使得同一个时刻只有一个线程在一个cpu上执行字节码(锁住字节码，其他cpu核心无法使用), 
# 无法将多个线程映射到多个cpu上执行，则实际上只利用了一个核心
# 所以Python多线程不适应计算密集型操作做因为计算密集型不会主动释放gil，只有等到gil达到释放条件才释放，（且同一个时刻只允许一个线程在一个cpu上执行字节码），多线程的优势没有体现出来
'''
因为使用多线程就是想让你同时执行多个线程的（这在Python中本身就不成立，不能并行，而只能并发，即无法使用多核cpu的性能），或者是线程阻塞的在间隔时间段内执行其他的线程，但是计算密集型的操作被阻塞的时间段，
其他的线程想插上来，没有空挡，无法执行其他的线程，只能等待gil去主动是释放，所以并发的性能就不好了（且还可以会有数据错误，这里不讨论），但是io密集型的操作就不一样了，因为一旦执行io操作，gil就会被迫释放，
所以对io密集型的操作，Python多线程效率高
'''
# Python多线程适应io密集型的操作，因为io操作时，gil会主动释放，这样多线程的优势就体现出来了（当然还不如替他的语言，任然是同一个时刻只有一个线程在一个cpu上执行，但切换频繁了）   --- 还是有意义的，毕竟会释放gil（io阻塞或者达到释放gil条件），
# 此时就可以执行其他的线程

# 此外也由于gil的原因，会导致Python多线程的时候，容易导致数据错乱的问题  上面的例子就是这个问题  ---  可以对共享数据加锁解决             一般只讨论性能问题，不说这个，Python的多线程性能远不如其他的语言

89051



In [9]:
total = 0

def add():
    #1. dosomething1
    #2. io操作
    # 1. dosomething3
    global total
    for i in range(1000000):
        print("add")
        lock.acquire()
        total += 1
        lock.release()
def desc():
    global total
    for i in range(1000000):
        print("desc")
        lock.acquire()
        total -= 1
        lock.release()
    
import threading
lock = threading.Lock()
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print(total)   # 出现由于GIL导致的数据错乱的问题

0


In [2]:
# 11-2 多线程编程 - threading

#对于io操作来说，多线程和多进程性能差别不大
#1.通过Thread类实例化
# socket编程属于io编程，当请求的等待网络返回的时候，可以切换线程给其他的线程适用
import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")


#2. 通过集成Thread来实现多线程


class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)          # 重写后，一定要调用父类的原方法

    def run(self):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


class GetDetailUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")

if  __name__ == "__main__":
    thread1 = GetDetailHtml("get_detail_html")
    thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join() 
    #当主线程退出的时候， 子线程kill掉
    print ("last time: {}".format(time.time()-start_time))


get detail html started
get detail url started
get detail html end
get detail url end
last time: 4.022992849349976


In [None]:
# 11-3 线程间通信 - 共享变量和 Queue

In [23]:
# 可变类型可以局部修改，不可变类型不可以，需使用global进行声明
'''
此外，在多线程中可以使用list的特性（也可以声明golbal不可变类型），进行多线程之间的通信（变量共享），但是    多进程  不行
'''
a = []   # a是list，为可变类型，可变类型的变量可以在局部进行修改 
def aa():
    a.append(21)
    print(a)
    
aa()

# 而当a是int不可变对象的时候，只能在内部进行使用，当不能更改操作   可变类型 如list就可以

[21]


<font color='red' size='12'> 可变类型可以局部修改，不可变类型不可以，需使用global进行声明  </font>  </br> 
<font color='green' size='5'> 所以使用多线程的时候，可以使用可变类型作为共享变量（这个是可以作为参数传递进函数，也可以在函数中直接修改，即使要修改也行，因为是可变类型，如Queue），或者不可变类型（不可变类型，必须使用global声明，才能在函数中修改） </font>
<font color='black' size='5'> 由于多进程之间不能共享变量（除非一些手段），所以有参数的进程函数的参数需要手动当做参数传进去，如pipe、Queue

In [22]:
a = 1   
def aa(a):  # 此外，当函数传递接受的参数名和全局变量名同名的时候，使用的是参数名 
    global a  # SyntaxError: name 'a' is parameter and global    ，变量不能同时作为global变量和函数之间传递的变量
    a += 1
    print()
    
aa(1)

SyntaxError: name 'a' is parameter and global (cell_name, line 6)

In [None]:
# https://blog.csdn.net/weixin_43431189/article/details/90812510

#线程间通信   爬取文章列表后将url 与 爬取文章详情页线程进行共享

import time
import threading
# form chapter11 import variables   # 将共享变量放在一个文件中单独管理，便于维护
# from chapter11.variables import detail_url_list  # 这样做很容易错误
import variables  # 但是由于gil的存在，实际不推荐使用共享变量的方式去实现通信，除非对锁足够了解

from threading import Condition

#1. 生产者当生产10个url以后就就等待，保证detail_url_list中最多只有十个url
#2. 当url_list为空的时候，消费者就暂停

def get_detail_html(lock):
    #爬取文章详情页
    detail_url_list = variables.detail_url_list
    while True:

        if len(variables.detail_url_list):
            lock.acquire()
            if len(detail_url_list):
                url = detail_url_list.pop()
                lock.release()
                # for url in detail_url_list:
                print("get detail html started")
                time.sleep(2)
                print("get detail html end")
            else:
                lock.release()
                time.sleep(1)


def get_detail_url(lock):
    # 爬取文章列表页
    detail_url_list = variables.detail_url_list
    while True:
        print("get detail url started")
        time.sleep(4)
        for i in range(20):
            lock.acquire()
            if len(detail_url_list) >= 10:
                lock.release()
                time.sleep(1)
            else:
                detail_url_list.append("http://projectsedu.com/{id}".format(id=i))
                lock.release()
        print("get detail url end")


#1. 线程通信方式- 共享变量

if  __name__ == "__main__":
    lock = RLock()
    thread_detail_url = threading.Thread(target=get_detail_url, args=(lock,))
    for i in range(10):
        html_thread = threading.Thread(target=get_detail_html, args=(lock,))
        html_thread.start()
    # # thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    # thread_detail_url.start()
    # thread_detail_url1.start()
    #
    # thread1.join()
    # thread2.join()

    #当主线程退出的时候， 子线程kill掉
    print ("last time: {}".format(time.time()-start_time))

In [None]:
# 使用Queue实现线程之间的通信   因为 Queue是线程安全的

#通过queue的方式进行线程间同步
from queue import Queue
from collections import deque

import time
import threading


def get_detail_html(queue):
    #爬取文章详情页
    while True:
        url = queue.get()    # get()是一个阻塞方法，没有内容的时候就一致阻塞，等待内容   queue本省是线程安全的---当有多个线程去操作统一个queue的时候不会造成数据错误
        # for url in detail_url_list:
        print("get detail html started")
        time.sleep(1)
        print("get detail html end")


def get_detail_url(queue):
    # 爬取文章列表页
    while True:
        print("get detail url started")
        time.sleep(1)
        for i in range(20):
            queue.put("http://projectsedu.com/{id}".format(id=i))     # put 会在empty判断为满的时候阻塞 等待内容被get
        print("get detail url end")


#1. 线程通信方式- 共享变量

if  __name__ == "__main__":
    detail_url_queue = Queue(maxsize=1000)  # 设置消息队列的最大值 能放的最大数据量


    thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
    for i in range(2):
        html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
        html_thread.start()
    # # thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    thread_detail_url.start()
    # thread_detail_url1.start()
    #
    # thread1.join()
    # thread2.join()
    # detail_url_queue.task_done()  # 放到 某个线程中当做退出的标记
    # detail_url_queue.join()   # 有了join函数，和Thread的join功能类似，必须等待queue完成，才让程序退出，   但是 需要使用detail_url_queue.task_done()  来通知join queue结束了，而不是像Thread的join那样会自动完成退出


    #当主线程退出的时候， 子线程kill掉
    print ("last time: {}".format(time.time()-start_time))

In [None]:
# 关于 task_down 和 join的用法
# http://www.vuln.cn/8610

'''
首选用Queue去完成线程之间的通信， 但是消息队列Queue是队列，很多时候共用的变量可能是dict，或是各种类型的数据，此时也可以使用  共享变量 的方式来完成（但是使用共享变量（如int类型）的方式会有线程不同步的问题） 
但是Python的标准容器 像list dict都是线程安全的 
Python标准库容器是线程安全的 ---  https://www.jianshu.com/p/9709718b571d


'''


In [None]:
# 11-4 线程同步 - Lock、RLock



In [25]:
a = 1
def add(a):
    a += 1
def desc(a):
    a -= 1
    
import dis
dis.dis(add)
print("---"*30)
dis.dis(desc)
'''
从字节码的角度解释
add的字节码执行过程（将a变量当做全局变量看，为add和desc共用）
1  加载a
2  加载1
3  执行+操作
4  将结果赋值给a

gil的释放是按照字节码角度来的
在执行到某一行字节码的时候gil都有可能会释放（时间片已经满 或执行的字节码数量已经满了）

最极限的可能是每执行一行就是释放了，这里我假设是这样的
假设a是全局变量
----------add---------
1  加载a                     a = 0
2  加载1                     1
3  执行+操作                 a  + 1
4  将结果赋值给a             a = 1

---------desc---------
1  加载a                     a = 0
2  加载1                     1
3  执行-操作                 a - 1
4  将结果赋值给a             a = -1 

以上假设每执行一行字节码gil都会释放，由于a变量共享(我们希望获得a变量后，在desc执行的a是add执行完的1，这样最终结果才为0，但事实上是不是)，就会造成 最后的结果不是 a= 1 就是 a= -1 
但是结果应该是0 
由开始的例子可见：
当数据循环次数较少的时候，是没有问题的，最终结果为0，但是循环次数较大1000000的时候就出现多了数据错误，因为gil的问题（时间片已经满 或执行的字节码数量已经满了的时候会强制释放执行其他的线程，变量a的值可能改变，此时就可能数据错误了），


解决方式：希望能够做到 只有一个代码段在运行  ---  
使用线程同步机制： https://baike.baidu.com/item/%E7%BA%BF%E7%A8%8B%E5%90%8C%E6%AD%A5/4855164?fr=aladdin
1 锁的机制  （锁完了  要释放） 
        问题：使用锁会影响性能，获取锁和释放锁都需要时间（必然存在，无法避免，出现了很多中锁来解决性能问题，在多线程编程中无法解决，后面使用协程，因为是单线程对锁要求不高） 
              其次会引起死锁的问题，死锁有两种形式：如同一把锁没有释放又获取，就会死锁   第二种情况 两个锁相互等待 a要等待b b要等待a      
              解决死锁的方法：使用RLock 可重入锁 在用一个线程里面，可以连续调用多次acquire 但是注意acquire的次数要和release的次数一致  -- 注意 ：RLoc只能在 同一个 线程中使用，多个线程之间不行，多线程之间还是竞争关系，会因为锁而等待
              
2 condition 条件变量  用于复杂的线程之间通信的锁  --  实现当达成某个条件的时候触发其他线程

3 Semaphore 信号量
'''

print()

  3           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_ADD
              6 STORE_FAST               0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
------------------------------------------------------------------------------------------
  5           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_SUBTRACT
              6 STORE_FAST               0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE



In [None]:
# 11-5 线程同步 - condition 使用以及源码分析    源码分析 http://timd.cn/python/threading/condition/

In [None]:
# Condition 中实现了__enter__ 和 __exit__，说明可以使用with方式来使用
# 由代码可见 Condition的内部锁在没有传递锁的情况下，使用默认的RLock锁来完成  Condition的acquire和release方法都是调用的RLock或者Lock的同名方法
# 其主要实现的方法是 wait 和 notify 方法
# wait设置等待某个条件变量的通知 

In [1]:
from threading import RLock 
a = RLock()
print(a)

<unlocked _thread.RLock object owner=0 count=0 at 0x000001BC02C50788>


In [4]:
from itertools import islice

l = [1, 2, 3, 4, 5, 6, 7, 8, 9]
t = iter(l)
for x in islice(t, 1):
    print(x)


1


In [6]:
def a():
    try:
        return 1
    finally:
        print("dfasd")
a()
# 无论是在try还是在except中，遇到return时，只要设定了finally语句，就会中断当前的return语句，跳转到finally中执行，
# 如果finally中遇到return语句，就直接返回，不再跳转回try/excpet中被中断的return语句        

dfasd


1

In [None]:
import threading
#通过condition完成协同读诗

class XiaoAi(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="小爱")
        self.cond = cond

    def run(self):
        with self.cond:   # 上下文管理器，会自动调用Condition 的 __enter__ 和 __exit__ 方法， __enter__ 和 __exit__处理的分别是 获取锁和释放锁
            self.cond.wait()   # 将当前线程处于等待的状况 会释放锁   被唤醒后将自动acquire获得锁 获得的是底层的锁
            print("{} : 在 ".format(self.name))
            self.cond.notify()  # 通知当前正在等待的线程可以运行   不会释放锁

            self.cond.wait()
            print("{} : 好啊 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 君住长江尾 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 共饮长江水 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 此恨何时已 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 定不负相思意 ".format(self.name))
            self.cond.notify()

class TianMao(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="天猫精灵")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小爱同学 ".format(self.name))
            self.cond.notify()   # notify不会释放锁 只会提示wait()  此时上面的wait()得到了信号也不行，因为锁没有释放   一定要接受到 notify的信号才能继续执行
            self.cond.wait()  # 释放了锁， 上面的wait()可以获得锁继续运行

            print("{} : 我们来对古诗吧 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我住长江头 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 日日思君不见君 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 此水几时休 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 只愿君心似我心 ".format(self.name))
            self.cond.notify()
            self.cond.wait()



if __name__ == "__main__":
    from concurrent import futures
    cond = threading.Condition()
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)

    #启动顺序很重要
    #在调用with cond之后才能调用wait或者notify方法
    #condition有两层锁， 一把底层锁会在线程调用了wait方法的时候释放(此时其他的线程才能获得condition启动with cond)， 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中，等到notify方法的唤醒
    xiaoai.start()  # 一定是先启动小爱的线程，因为若先启动天猫的线程，则当小爱的wait还没有执行的时候，天猫的notify信号就已经发出去了，后启动的小爱的wait就得不到信号一直阻塞了。
    tianmao.start()

# allocate_lock 为c实现的锁，Python调用的lock就是该锁

# wait()释放的是condition底层锁，
#  wait()被生产者使用notify唤醒之后，还需要和生产者竞争condition底层锁
# wait()方法获得锁和释放的锁都是condition底层锁    在其内部还有一个锁，用于和notify配合使用


In [None]:
# 11-6 线程同步 - Semaphore 使用以及源码分析

#Semaphore 是用于控制进入数量的锁
#文件， 读、写， 写一般只是用于一个线程写，读可以允许有多个

#做爬虫  控制爬虫的数量 请求url次数过多会被屏蔽， 可以通过Semaphore可以控制某个线程的数量
import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()

class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
            html_thread.start()

if __name__ == "__main__":
    '''
       Semaphore设定一段时间内可以运行的线程数，每调用一次acquire就减少1，release一次就加上1，当允许的数量为0的时候，就会阻塞，无法执行当前线程
       使用的时候
                当前实例中，我们希望控制爬取网站的爬虫HtmlSpider能控制在一定的数量中，所以在产生网站爬虫的线程UrlProducer中使用Semaphore对生产进行数量的控制
                当acquire的数量到0的时候，生产爬虫的线程就被阻塞了，等待爬虫线程release释放数量。
       这样就可以控制线程数量了，防止同时过多的线程同时工作
    '''
    sem = threading.Semaphore(3)
    url_producer = UrlProducer(sem)
    url_producer.start()


# 线程对于操作系统来说，线程过多，整个操作系统的切换回非常的慢，所以控制线程并发的数量是有意义的

# Semaphore内部使用的是Condition实现  所以Semaphore是Condition的一个实践
# Queue内部也是通过Condition是实现的
# 想了解Condition 可以看Queue和Semaphore的实现 

#### 线程池   https://www.cnblogs.com/hoojjack/p/10846010.html

In [3]:
# 11-7 ThreadPoolExecutor线程池

# concurrent.futures是Python3.2中引入此包来做线程池和进程池编程的  且线程池和进程池的接口是一致的

#线程池， 为什么要线程池
#主线程中可以获取某一个线程的状态或者某一个任务的状态，以及返回值
#当一个线程完成的时候我们主线程能立即知道
#futures可以让多线程和多进程编码接口一致
from concurrent.futures import ThreadPoolExecutor
import time

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times



executor = ThreadPoolExecutor(max_workers=2)    # 实例化线程池 max_workers=2 允许同时运行的线程数
#通过submit函数提交执行的函数到线程池中, submit 是立即返回，非阻塞的，提交后会立马返回，即在主线程中提交到线程池后立即返回
'''通过submit函数提交执行的函数到线程池等待中执行，并立即返回一个Future对象，通过Future可以执行一些方法来检测这些任务的状态'''
task1 = executor.submit(get_html, 3)    # 有done方法，用于判定某个任务是否完成
# submit接受参数为函数名， 
task2 = executor.submit(get_html, (2))                                                                                        # 是提交后函数就开始执行吗   是的

print(task1)        # submit得到的是 Future对象 <Future at 0x25b6fb25f28 state=running>
print("task1的状态：",task1.done())
'''
当某个任务在执行中，或者是已经执行完成则该任务是取消不了的，此时会返回False
只有在没有开始执行的时候，才可以取消
'''
print(task2.cancel())  # 将某个任务取消，成功返回True，失败返回False     
# result方法 阻塞的方法，能得到任务执行的返回结果
print(task1.result())  # 得到任务线程的返回值
time.sleep(3)
print("task1的状态：",task1.done())   # 最后的时候 已经是完成的状态，所以此时返回了True
print(task1)   # submit得到的是 Future对象 <Future at 0x25b6fb25f28 state=finished returned int>

<Future at 0x2270807ee48 state=running>
task1的状态： False
False
get page 2 success
get page 3 success
3
task1的状态： True
<Future at 0x2270807ee48 state=finished returned int>


In [4]:
# 通过as_completed得到完成的任务对象
from concurrent.futures import as_completed
executor = ThreadPoolExecutor(max_workers=2) 
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls]  # 实现了批量提交任务并返回对象
'''
    as_completed接受的是一个可迭代对象，是一个Future对象的集合
    分两种情况，第一种是返回执行该语句时已经完成的任务yield出去，
    第二种 在等待完成的任务中不停的yield，一有其他的任务完成就yield出来
    所以能过输出已经完成的任务的future对象
'''
for future in as_completed(all_task):  # as_completed方法 里面有yield语句，所以是个生成器  可以使用for循环取值 得到的值是已经完成的任务线程
    data = future.result()  
    print("get {} page".format(data)) 
   

get page 2 success
get 2 page
get page 3 success
get 3 page
get page 4 success
get 4 page


In [65]:
#通过executor的map获取已经完成的task的值
'''
和Python的map方法类似，将urls的值传进get_html中   但是有弊端就是执行函数是一样的get_html
得到的返回值类型不是future了
帮我们完成了提交任务的步骤
它还帮我们完成了future.result()这个步骤，得到的是data
实际上 map方法也有yield，是个生成器，返回的是 future.result()   
'''
for data in executor.map(get_html, urls):     # map和submit类似都是ThreadPoolExecutor的方法     但是都是继承的_base.py中的   as_completed方法和下面的wait都是_base.py中的，里面定义了Future
    print("get {} page".format(data))
    
# 此外map方法和as_completed还有一点不同，就是
      # as_completed是处理完一个就返回一个值
      # map()首先不是处理完一个就返回一个  而且，它的返回值按照urls的传入顺序 （尽管可能先执行的其他的任务）                             一般推荐使用as_completed方法

get page 2 success
get page 3 success
get 3 page
get 2 page
get page 4 success
get 4 page


In [12]:
# wait()阻塞，等待某些任务完成，才能继续执行被阻塞之后的内容
from concurrent.futures import wait, FIRST_COMPLETED 
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=2) 
urls = [3,2,4]
task6 = executor.submit(get_html, (6))
task6 = [task6]  # 因为wait()和 as_completed()一样都只接受可迭代对象，所以这里放到一个list中
print("将task包装成一个可迭代对象：",task6)
all_task = [executor.submit(get_html, (url)) for url in urls]  # 批量执行任务 并得到返回的任务的Future对象
'''
    接受的是一个可迭代对象，是一个Future对象的集合
    接受任务future对象的集合，指定只有当该集合中的任务都完成的时候，才继续往下执行wait()之后的操作
    
    wait还可以接受一个return_when参数，指定当什么条件下wait()就不阻塞了，有以下四种（使用这些变量的时候需要导入）：
    
        FIRST_COMPLETED = 'FIRST_COMPLETED'  第一个任务完成   （这里的第一个指的是，只要有一个完成，这个就是第一个，而不是按照传进的集合中的任务的顺序）
        FIRST_EXCEPTION = 'FIRST_EXCEPTION'  第一个除外
        ALL_COMPLETED = 'ALL_COMPLETED'      所有的任务都完成  默认是该项
        _AS_COMPLETED = '_AS_COMPLETED'   该项用户不可用，该参数为内部调用，不让用户主动调用

'''
wait(task6)  # 此时会等待所有的任务都完成后 才继续执行主线程的内容
# wait(all_task, return_when=FIRST_COMPLETED  )
print("main")  # 可见只有当task6完成的时候，主线程才继续打印了

将task包装成一个可迭代对象： [<Future at 0x22708118da0 state=running>]
get page 3 success
get page 2 success
get page 6 success
main
get page 4 success


In [None]:
# 11-8 ThreadPoolExecutor源码分析
from concurrent.futures import Future
# 未来对象（因为返回任务状态的时候可能当前任务还没有开始执行，是未来的时候执行的），更合理的叫法应该是task的返回容器（得到该对象，就能通过一些方法知道任务的执行状况）
# 未来对象设计理念在进程池和协程中都是一样的

In [None]:
# 11-9 多线程和多进程对比


#多进程编程
# 耗cpu的操作（计算密集型操作），由于gil锁的原因（同一个时刻只有一个线程在一个cpu上执行，多核心的时候也是这样），使用多线程时无法充分利用多核优势（使用多线程无法达到并行的操作），此时应该使用多进程编程，
# 多cpu的操作用多进程比多线程好
# 对于io操作来说， 使用多线程编程，
#
# 对于操作系统来说，进程切换代价要高于线程   所以能够使用多线程的时候应该使用多线程（如io密集型的操作）

# python多线程适应io密集型的操作，因为io操作时，gil会主动释放，这样多线程的优势就体现出来了（当然还不如替他的语言，任然是同一个时刻只有一个线程在一个cpu上执行，但切换频繁了）
# 还是有意义的，毕竟会释放gil（io阻塞或者达到释放gil条件），此时就可以执行其他的线程

#1. 对于耗费cpu的操作（计算类型的操作：图像处理，数学运算，机器学习），多进程优于多线程
# 因为多线程由于gil锁的原因，同一个时刻只有一个线程在一个cpu上执行
# 而使用多进程的时候，每个进程都有一个单独的gil锁，就可以实现过个进程之间的并发运算了， 一个是同一时间只能执行一个，而另一个是同一时间执行多个运算，效率当然是多进程高了



In [4]:
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor

# 多线程执行fib运算
def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

if __name__ == "__main__":
    with ThreadPoolExecutor(3) as executor:    # 使用with线程池 可以自动打开和关闭 线程池
        all_task = [executor.submit(fib, (num)) for num in range(25,40)]
        start_time = time.time()
        for future in as_completed(all_task):     # 这里因为as_completed的线程和最后打印总时间的线程都是主线程的内容，按照主线程的执行顺序，坑是是先执行as_completed操作后，再计算总时间操作，这个时间计算是没有问题的
            data = future.result()              # 从打印结果看到 是顺序的（其实内部还是多线程的），只是因为 传值进去的时候是从小到大的，25到40，因为实际的运算数越大越耗时，所以即使是多线程的，值也是由小到大的，因为耗时的计算花时间多，就慢
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))  # 打印最后的运行时间

exe result: 75025
exe result: 121393
exe result: 196418
exe result: 317811
exe result: 514229
exe result: 832040
exe result: 1346269
exe result: 2178309
exe result: 3524578
exe result: 5702887
exe result: 9227465
exe result: 14930352
exe result: 24157817
exe result: 39088169
exe result: 63245986
last time is: 70.26613402366638


In [9]:
# 多进程执行fib运算
def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

if __name__ == "__main__":   #  使用进程池在windows系统上会出现如下问题：BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending. 解决方式是，将进程池放到 __main__里面
    with ProcessPoolExecutor(3) as executor:    # 使用with线程池 可以自动打开和关闭 线程池
        all_task = [executor.submit(fib, (num)) for num in range(25,40)]
        start_time = time.time()
        for future in as_completed(all_task):     # 这里因为as_completed的线程和最后打印总时间的线程都是主线程的内容，所以会执行完as_completed操作后，再计算总时间，这个时间计算是没有问题的
            data = future.result()              # 从打印结果看到 是顺序的（其实内部还是多线程的），只是因为 传值进去的时候是从小到大的，25到40，因为实际的运算数越大越耗时，所以即使是多线程的，值也是由小到大的，因为耗时的计算花时间多，就慢
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))  # 打印最后的运行时间

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

In [None]:
# 对于io操作来说，多线程优于多进程
def random_sleep(n):
    time.sleep(n)
    return n

if __name__ == "__main__":
    with ThreadPoolExecutor(3) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]   # [2]*30 产生一个有30个2 的列表
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

In [1]:
# 11-10 multiprocessing 多进程编程

# import os
# #fork只能用于linux/unix中   fork是底层的子进程
# pid = os.fork()   #  fork是会新建一个子进程
# print("haha")
# if pid == 0:  # 为0的是新建的子进程
#   print('子进程 {} ，父进程是： {}.' .format(os.getpid(), os.getppid()))
# else:
#   print('我是父进程：{}.'.format(pid))
# time.sleep(2)
# 打印结果：
# haha
# 我是父进程： 24474
# haha
# 子进程 24474 ， 父进程是： 24473

# 线程通过全局变量是可以进行通信的， 但是进程不可以，数据是完全隔离的，每个进程都有一套完整的数据  这是因为线程在一个进程中，共享数据，所以通过全局变量可以进行通信，但是进程之间是相互隔离的

# 此外在主进程中开辟一个子进程后，当主进程退出的时候，子进程任然会继续执行
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
# ProcessPoolExecutor和multiprocessing都可用于多进程编程，ProcessPoolExecutor使用multiprocessing实现  推荐使用 ProcessPoolExecutor
#多进程编程
import time
def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
    progress = multiprocessing.Process(target=get_html, args=(2,)) # 使用方式和Thread类似
    print(progress.pid)
    progress.start()
    print(progress.pid)
    progress.join()    # 等待子进程完成后 执行主进程
    print("main progress end")


None
14316
main progress end


In [None]:

# 使用multiprocessing中的进程池
pool = multiprocessing.Pool(multiprocessing.cpu_count())     # multiprocessing中的进程池要求传递开启的进程数，没有传递的时候调用os.cpu_count()，开启cpu核心数的进程数（不多于cpu核心数最好）
result = pool.apply_async(get_html, args=(3,)) # 使用apply_async异步提交一个任务 返回ApplyResult对象

#等待所有任务完成
pool.close()  # 在使用pool.join()方法之前，一定先使用pool.close方法，关闭进程池，才能使用join   否则报错 ValueError: Pool is still running
pool.join()   # 给每个进程添加join, 等待所有任务执行完成

print(result.get())  # 使用join等待所有进程都运行完后， 就可以使用get得到执行玩的进程的返回值了
"""multiprocessing线程池的使用方式和进程池类似，都是先声明进程池，然后添加任务，但是没有ProcessPoolExecutor的as_completed类似方法"""

In [None]:
#imap  对应ProcessPoolExecutor 的map方法   完成的顺序也和ProcessPoolExecutor 的map方法一样 ，都是按照传参的顺序打印（即使有先完成的也不打印）
    for result in pool.imap(get_html, [1,5,3]):
        print("{} sleep success".format(result))

    for result in pool.imap_unordered(get_html, [1,5,3]):   # 与map不同的在于不是按照传参顺序打印了，而是按照先完成的顺序打印
        print("{} sleep success".format(result))

In [26]:
# 11-11 进程间通信 - Queue、Pipe，Manager
from multiprocessing import Process, Queue, Pool, Manager, Pipe


def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Queue(10)   # 这里使用的Queue是 multiprocessing进程中的Queue队列，而不是线程调用的dequeue中的Queue，但是两个Queue的使用方式相同
    my_producer = Process(target=producer, args=(queue,))
    my_consumer = Process(target=consumer, args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
# 在使用join的地方的作用都是等待当前线程（进程）执行完，再执行主线程（进程） 但是你要知晓的一点就是 在执行到join语句的时候，因为之前已经开启了其他的子线程，
# 子线程已经开始运行了，可能轮到主线程去设置某个子线程join的时候，该子线程已经执行完了，此时join设置就是无效的，碰巧遇到没有执行完的子线程，子线程与主线程竞争，
# 但是能保证，在执行设置join之后的主线程代码前，一定是会等待所有的子线程完成

In [None]:
# Queue不能使用在multiprocessing的Pool线程池中的

#multiprocessing中的queue不能用于pool进程池
#pool中的进程间通信需要使用manager中的queue
def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Manager().Queue(10)   # 需要使用 Manager().Queue  该Queue的用法和之前都一样
    pool = Pool(2)

    pool.apply_async(producer, args=(queue,))
    pool.apply_async(consumer, args=(queue,))

    pool.close()
    pool.join()


In [None]:
#共享全局变量通信
#共享全局变量不能适用于多进程编程，可以适用于多线程

# 使用global声明全局的时候，会提示两个进程中的 name 'a' is not defined  说明进程之间 变量是不共享的

# 以下这个例子是不对的，即使是多线程中也是不对的，应为a是不可变类型，想在函数中对其进行操作，必须使用global进行声明 而不是直接当作参数传递
from multiprocessing import Process
def producer(a):
    a += 100
    time.sleep(2)

def consumer(a):
    time.sleep(2)
    print(a)

if __name__ == "__main__":
    a = 1                                           
    my_producer = Process(target=producer, args=(a,))
    my_consumer = Process(target=consumer, args=(a,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

In [None]:
# 使用Pipe管道实现进程间的通信
# 通过pipe实现进程间通信    pipe 管道  简化版的queue
# pipe的存在意义：虽然是Queue功能的子集，但是pipe的性能高于queue   因为queue用来做进程间的同步加了很多锁，做同步的时候降低了性能，所以只有两个进程间的通信的时候，优先使用pipe

def producer(pipe):            # 这里使用和 其他的可变类型使用方式不一样，必须要用参数传递进去 而其他的可变类型或者Queue都是可以直接使用的
    pipe.send("bobby")

def consumer(pipe):
    print(pipe.recv())

if __name__ == "__main__":
    recevie_pipe, send_pipe = Pipe()  # 函数Pipe有两个返回值 都是Connection()实例对象 该Connection()是在multiprocessing中实现的
    #pipe只能适用于两个进程间的通信
    my_producer= Process(target=producer, args=(send_pipe, ))
    my_consumer = Process(target=consumer, args=(recevie_pipe,))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
    
    
    
    
#  在multiprocessing的进程池中是可以使用pipe，但只能是两个进程

def producer(pipe):
    pipe.send("a")
    time.sleep(2)

def consumer(pipe):
    time.sleep(2)
    print(pipe.recv())

if __name__ == "__main__":
    receive_pipe, send_pipe = Pipe()  # Queue的用法和之前都一样
    pool = Pool(2)

    pool.apply_async(producer, args=(send_pipe,))
    pool.apply_async(consumer, args=(receive_pipe,))

    pool.close()
    pool.join()


In [None]:
# 多进程之前的共享变量

# 进程间通信的其他方法， 进程间的变量是完全隔离的，希望在进程间维护一个公共的内存模块（变量） ----  共享内存  可以使用Manager模块  使用其中的数据类型 然后像使用多线程那样使用
# 使用Manager()内实现的类型类型（实际是SyncManager的方法）来像线程那样实现进程间的数据共享
# def Manager():
#     return multiprocessing.SyncManager()
# 特别注意： 使用内存共享的时候，一定要注意进程同步，此时需要使用Manager内的Lock Condition 来实现进程同步 像使用多线程那样使用


def add_data(p_dict, key, value):
    p_dict[key] = value

if __name__ == "__main__":
    progress_dict = Manager().dict()

    first_progress = Process(target=add_data, args=(progress_dict, "bobby1", 22))
    second_progress = Process(target=add_data, args=(progress_dict, "bobby2", 23))

    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()

    print(progress_dict)
    
    
    
#     from queue import PriorityQueue    Queue的子类   是可以设置优先级的队列


In [None]:
python主进程或者主线程是否会等待子线程或子进程的问题     https://www.cnblogs.com/ahliucong/p/9404308.html
1.主进程会等待所有子进程结束后才会程序结束

2.主线程也会等待所有子线程结束后才会主线程结束

3.from multiprocessing import Pool这个进程池,并不会等待所有的进程运行完成,而是主线程代码执行完成后程序就立即结束 .

所以这个进程池需要加p.close()和p.join()

4.from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor  的进程池和线程池,主进程或者主线程会等进程池内或者线程 池内的任务运行完成,整 个程序才会结速

5.协程的问题,所有协程,主线程必须加join()阻塞,否则其他协程不运行.

6.关于守护进程的问题,守护进程是在主进程的代码结束后,立刻结束.守护线程是在所有子线程都结束后,守护线程结束.