# 协程
- 参考资料
    - http://python.jobbole.com/86481/
    - http://python.jobbole.com/87310/
    - https://segmentfault.com/a/1190000009781688
    
# 迭代器
- 可迭代(Iterable): 直接作用于for循环的变量
- 迭代器(Iterator): 不但可以作用于for循环, 还可以被next调用
- list 是典型的可迭代对象, 但不是迭代器
- 通过isinstance判断
- iterable和iterator可以转换
    - 通过iter函数


In [2]:
# 可迭代
l = [ i for i in range(5)]

# l 是可迭代的, 但不是迭代器
for index in l:
    print(index)
    
# range 是个迭代器
for i in range(5):
    print(i)

0
1
2
3
4
0
1
2
3
4


In [5]:
# isinstance 判断某个变量是否是一个实例
from collections import Iterable ,Iterator

l1 = [1,2,3,4,5]

print(isinstance(l1, Iterable)) # 是否可迭代
print(isinstance(l1, Iterator)) # 是否迭代器

True
False


In [7]:
# iter 函数
from collections import Iterable ,Iterator

s = "i like you"

print(isinstance(s, Iterable))
print(isinstance(s, Iterator))

s_iter = iter(s)

print(isinstance(s_iter, Iterable))
print(isinstance(s_iter, Iterator))

True
False
True
True


# 生成器
- generator: 一边循环一边计算下一个元素的机制/算法
- 需要满足三个条件:
    - 每次调用都生产出for循环需要的下一个元素
    - 如果达到最后一个后,报出Stoplteration异常
    - 可以被next函数调用
- 如何生成一个生成器
    - 直接使用
    - 如果函数中包含yield, 则这个函数就叫生成器
    - next调用函数, 遇到yield返回

In [8]:
# 直接使用生成器

L = [x*x for x in range(5)] # 放在中括号中的是列表生成器
G = (x*x for x in range(5)) # 放在小括号中就是生成器

print(type(L))
print(type(G))

<class 'list'>
<class 'generator'>


In [None]:
# 

In [9]:
# 一般函数的案例

def odd():
    print("Step 1")
    print("Step 2")
    print("Step 3")
    return None

odd()

Step 1
Step 2
Step 3


In [13]:
# 生成器的案例
def odd():
    print("Step 1")
    yield 1
    print("Step 2")
    yield 2
    print("Step 3")
    yield 3
    
odd()
# 不能直接调用

<generator object odd at 0x0000000004BC1DE0>

In [14]:
# 生成器的案例
def odd():
    print("Step 1")
    yield 1
    print("Step 2")
    yield 2
    print("Step 3")
    yield 3
 
# 此时 odd() 是调用生成器
one = next(odd())
print(one)
# 只执行一次 因为遇到yield

Step 1
1


In [15]:
# 生成器的案例
def odd():
    print("Step 1")
    yield 1
    print("Step 2")
    yield 2
    print("Step 3")
    yield 3

# 如果直接传入odd() 则系统默认为 每个函数都调用了一个生成器 而不是同一个生成器
g = odd()

one = next(g)
print(one)

two = next(g)
print(two)

three = next(g)
print(three)

Step 1
1
Step 2
2
Step 3
3


In [16]:
# for循环调用生成器
def fib(num):
    n, a, b = 0, 0, 1
    while n < num:
        print(b)
        a,b = b, a+b
        n += 1
    return 'Done'

fib(5)
    

1
1
2
3
5


'Done'

In [19]:
# 斐波那契额数列的生成写法
def fib(num):
    n, a, b = 0, 0, 1
    while n < num:
        yield b
        a,b = b, a+b
        n += 1
    return 'Done'
    
g = fib(5) # 生成一个生成器

for i in range(6):
    # 按循环的次数调用生成器
    rst = next(g) 
    print(rst)

1
1
2
3
5


StopIteration: Done

In [20]:
# 生成器的典型用法是在for寻循中使用
# 比较常用的典型生成器就是range

ge = fib(10)

for i in ge: # 在for循环中使用生成器
    print(i)

1
1
2
3
5
8
13
21
34
55


# 协程
- 历史历程
    - 3.4引入协程, 用yield实现
    - 3.5引入协程语法
    - 实现的协程比较好的包有asyncio, tornado, gevent
- 定义: 协程是为非抢占式多任务产生子程序的计算机程序组件,协程允许不同入口点在不用位置暂停或开始执行程序.
- 从技术角度讲: 协程就是一个你可以暂停执行的函数, 或者干脆把协程理解成 生成器
- 协程的实现:
    - yield返回
    - send调用
    
- 协程的四个状态
    - inspect.getgeneratorstate(...)函数确定, 该函数会返回下述字符串中的一个:
        - GEN_CREATED: 等待开始执行
        - GEN_RUNNING: 解释器正在执行
        - GEN_SUSPENED: 在yield表达式出暂停
        - GEN_CLOSED: 执行结束
        - next预激(prime)
        
- 协程终止
    - 协程中未处理的异常会向上冒泡, 传给 next函数或 send方法的调用方 (即触发协程的对象)
    - 终止协程的一种方式: 发送某个哨符值, 让协程退出. 内置 None和 Ellipsis等常亮经常用作哨符值==.
    

In [27]:
# 协程代码案例1

def simple_coroutine():
    print('- > start')
    x = yield
    print('- > recived', x)
    
# 主线程
sc = simple_coroutine()
print(111)
sc.send(None) # 预激 也可以用 next(sc)

print(222)
sc.send('hahaha')

111
- > start
222
- > recived hahaha


StopIteration: 

In [28]:
# 协程代码案例2

def simple_coroutine(a):
    print('- > start')
    
    b = yield a
    print('- > recived', a, b)
    
    c = yield a+ b
    print('- > recived',a, b, c)
    
sc = simple_coroutine(5)

aa = next(sc)
print(aa)

bb = sc.send(6)
print(bb)

cc = sc.send(7)
print(cc)


- > start
5
- > recived 5 6
11
- > recived 5 6 7


StopIteration: 

- yield from
    - 调用协程为了得到返回值,协程必须正常终止
    - 生成器正常终止会发出 StopIteration异常, 异常对象的 value属性保存返回值
    - yield from从内部捕获 StopIteration异常

In [29]:
def gen():
    for c in "AB":
        yield c
        
# list直接用生成器作为参数
print(list(gen()))

def gen_new():
    yield from "AB"
print(list(gen_new()))

['A', 'B']
['A', 'B']


- 委派生成器
    - 包含 yield from 表达式的生成器函数
    - 委派生成器在 yield from 表达式处暂停, 调用方可以直接吧数据发给子生成器
    - 子生成器在把产生的值发给调用方
    - 子生成器在最后, 解释器会抛出 StopIteration, 并且把返回值附加到异常对象上

In [38]:
# 委派生成器
from collections import namedtuple

'''
解释:
    1. 外层for循环每次迭代会新建一个grouper实例, 赋值给coroutine变量, grouper是委派生成器
    2. 调用next(coroutine),预激委派生成器 grouper, 此时传入 while True循环, 调用子生成器
    3. 内层for循环调用coroutine.send(value), 直接把值传给子生成器 averager.同时, 当前的
    4. 内层循环结束后, grouper实例依旧在yield from表达式处暂停, 因此, grouper函数定义体中
    5. coroutine.send(None) 终止 averager 子生成器, 子生成器抛出 Stopteration异常并返回的
'''

ResClass = namedtuple('Res', 'count average')

# 子生成器
def averager():
    total = 0.0 
    count = 0
    average = None
    
    while True:
        term = yield
        if term is None:
            break
        total += term
        count += 1
        average = total / count
    return ResClass(count, average)

# 委派生成器
def grouper(storages, key):
    while True:
        # 获取averager()返回值
        storages[key] = yield from averager()
        
# 客户端代码
def client():
    process_data = {
        'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
        'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46]
        
    }
    
    storages = {}
    for k, v in process_data.items():# 因为process_data是字典, 所以k代表boys_2 v代表[.....]
        # 获得协程
        coroutine = grouper(storages, k) # 每迭代一次 新建一个委派生成器 并赋值给变量 cooroutine 
                                         # 每次传入的参数是键 'boys_2' , 'boys_1'
                                         # 由于grouper委派生成器是storget参数的键接受 averager()的返回值
                                                # 顾传入的参数是键 保持不变
        
        # 预激函数
        next(coroutine)
        
        # 发送数据到协程
        for dt in v:           # 每次迭代 传入键的值 调用委派生成器 获取averget()的返回值 得到次数和平均值
            coroutine.send(dt)
            
        # 终止协程
        coroutine.send(None)
        
    print(storages)
    # 由于storages定义为字典 最终打印出来的是字典
    
client()

{'boys_2': Res(count=9, average=40.422222222222224), 'boys_1': Res(count=9, average=1.3888888888888888)}


#### asyncio
    - python3.4 开始引入标准库当中, 内置对异步io的支持
    - asyncio本身是一个消息循环
    - 步骤:
        - 创建消息队列
        - 把协程导入
        - 关闭
    

In [1]:
import threading
import asyncio


# 使用协程
@asyncio.coroutine
def hello():
    print("Hello world (%s)" % threading.currentThread())
    print("Start......(%s)" % threading.currentThread())
    yield from asyncio.sleep(2)
    print("Done......(%s)" % threading.currentThread())
    print("Hello again! (%s)" % threading.currentThread())


# 启动消息循环
loop = asyncio.get_event_loop()

# 定义任务
tasks = [hello(), hello()]

# asyncio使用wait等待tasks执行完毕
loop.run_until_complete(asyncio.wait(tasks))

# 关闭消息循环
loop.close()

RuntimeError: This event loop is already running

Hello world (<_MainThread(MainThread, started 8396)>)
Start......(<_MainThread(MainThread, started 8396)>)
Hello world (<_MainThread(MainThread, started 8396)>)
Start......(<_MainThread(MainThread, started 8396)>)
Done......(<_MainThread(MainThread, started 8396)>)
Hello again! (<_MainThread(MainThread, started 8396)>)
Done......(<_MainThread(MainThread, started 8396)>)
Hello again! (<_MainThread(MainThread, started 8396)>)


In [4]:
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    # 异步请求网络地址
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    
    while True:
        line = yield from reader.readline()
        # http协议的换行使用\r\n
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    writer.close()
    
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


RuntimeError: This event loop is already running

wget www.163.com...
wget www.sohu.com...
wget www.sina.com.cn...
www.163.com header > HTTP/1.1 302 Moved Temporarily
www.163.com header > Date: Mon, 27 May 2019 14:03:07 GMT
www.163.com header > Content-Length: 0
www.163.com header > Connection: close
www.163.com header > Server: Cdn Cache Server V2.0
www.163.com header > Location: http://www.163.com/special/0077jt/error_isp.html
www.163.com header > X-Via: 1.0 PShndx3ql121:9 (Cdn Cache Server V2.0)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html;charset=UTF-8
www.sohu.com header > Connection: close
www.sohu.com header > Server: nginx
www.sohu.com header > Date: Mon, 27 May 2019 14:02:39 GMT
www.sohu.com header > Cache-Control: max-age=60
www.sohu.com header > X-From-Sohu: X-SRC-Cached
www.sohu.com header > Content-Encoding: gzip
www.sohu.com header > FSS-Cache: HIT from 4029071.6060697.5365934
www.sohu.com header > FSS-Proxy: Powered by 4291219.6584989.5628086
www.sina.com.cn header > HTTP/1.1 302 M

# async and await
- 为了更好的表示异步io
- python3.5引入
- 让协程代码更简洁
- 使用上, 可以简单的进行替换
    - 用 async 替换 @asyncio.coroutine
    - 用 await 替换 yield from 

# aiohttp
- asyncio 实现单线程的并发io, 在客户端用处不大
- 在服务器可以asyncio+coroutine配合, 因为http是io操作
- asyncio实现了tcp, udp, ssl等协议
- aiohttp是给予asyncio实现了http框架
- pip install aiohttp 安装

In [10]:
# aiohttp案例

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(bos=b'<h1>Index</h1>')

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info["name"]
    return web.Response(body=text.encode('utf-8'))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route("GET", "/", index)
    app.router.add_route("GET", "/hello/{name}", hello)
    srv = await loop.create_server(app.make_handler(),"127.0.0.1", 8000)
    print("Server started at http://127.0.0.1:8000...")
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

RuntimeError: This event loop is already running



# concurrent.futures
- python3新增的库
- 类似其他语言的线程池的 概念
- 利用multiprocessing实现真正的并行计算
- 核心原理: 以进制的形式, 并行运行多个python解释器, 从而令python程序可以利用多核CPU来提升执行速度
    - 由于子进程与主解释器相分离, 所以他们的全局解释器锁也是相互独立的
    - 每个子进程都能够完整的使用一个CPU内核
    - concorrent.fitures.Executor
        - ThreadPoolExecutor
        - ProcessPoolExecutor
        - 执行的时候需要自行选择
    - submit(fn, args, kwargs)
        - fn: 异步执行的函数
        - args, kwargs 传入参数

In [5]:
from concurrent.futures import ThreadPoolExecutor
import time

def return_future(msg):
    time.sleep(3)
    return msg

# 创建一个线程池
pool = ThreadPoolExecutor(max_workers=2)

# 往线程池加入2个task
f1 = pool.submit(return_future, "hello")
f2 = pool.submit(return_future, "world")

# done() 等待执行结束
print(f1.done())
time.sleep(3)
print(f2.done())

# 结果
print(f1.result())
print(f2.result())

False
False
hello
world


# current中map函数
- map(fn, \*iterables, timeout=None)
    - 跟map函数类似
    - 函数需要异步执行
    - timeout: 超时时间
    - map跟submit使用一个就行了

In [7]:
# map案例
import time,re
import os, datetime
from concurrent import futures

data = ["1", "2"]

def wait_on(argument):
    print(argument)
    time.sleep(2)
    return "OK"

ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on, data):
    print(i)

1
2
OK
OK


# Future
-  代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
    - 协程对象不能直接运行，在注册事件循环的时候，其实是run_until_complete方法将协程包装成为了一个任务（task）对象。所谓task对象是Future类的子类。保存了协程运行后的状态，用于未来获取协程的结果。