# 19. 协程（2）

# asyncio - 异步 I/O
- asyncio 是用来编写 并发 代码的库，使用 async/await 语法。
- asyncio 被用作多个提供高性能 Python 异步框架的基础，包括网络和网站服务，数据库连接库，分布式任务队列等等。
- asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
- asyncio 提供一组 高层级 API 用于:
    - 并发地 运行 Python 协程 并对其执行过程实现完全控制;
    - 执行 网络 IO 和 IPC;
    - 控制 子进程;
    - 通过 队列 实现分布式任务;
    - 同步 并发代码;
- 此外，还有一些 低层级 API 以支持 库和框架的开发者 实现:
    - 创建和管理 事件循环，以提供异步 API 用于 网络化, 运行 子进程，处理 OS 信号 等等;
    - 使用 transports 实现高效率协议;
    - 通过 async/await 语法 桥接 基于回调的库和代码。
- asyncio本身是一个消息循环
- 步骤：
    - 创建消息循环
    - 导入协程
    - 关闭
- 示例1（两个tasks）
- 示例2（得到多个网站）

In [1]:
# asyncio 示例1

import threading
# 导入异步包
import asyncio

# 使用协程
@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    print('Start..... (%s)' % threading.currentThread())
    yield from asyncio.sleep(5)
    print('Done..... (%s)' % threading.currentThread())
    print('Hello again! (%s)' % threading.currentThread())

# 启动消息循环
loop = asyncio.get_event_loop()
# 定义任务（导入协程）
tasks = [hello(), hello()]
# asyncio使用wait()等待tasks执行结束
loop.run_until_complete(asyncio.wait(tasks))
# 关闭消息循环
loop.close()

RuntimeError: This event loop is already running

Hello world! (<_MainThread(MainThread, started 25500)>)
Start..... (<_MainThread(MainThread, started 25500)>)
Hello world! (<_MainThread(MainThread, started 25500)>)
Start..... (<_MainThread(MainThread, started 25500)>)
Done..... (<_MainThread(MainThread, started 25500)>)
Hello again! (<_MainThread(MainThread, started 25500)>)
Done..... (<_MainThread(MainThread, started 25500)>)
Hello again! (<_MainThread(MainThread, started 25500)>)


In [5]:
# asyncio 示例2
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    # 异步请求网络
    connect = asyncio.open_connection(host, 80)
    # 注意yield from的用法
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        # http协议的换行使用“\r\n”
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.google.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

RuntimeError: This event loop is already running

wget www.163.com...
wget www.google.com...
wget www.sina.com.cn...
www.163.com header > HTTP/1.1 302 Moved Temporarily
www.163.com header > Date: Thu, 12 Mar 2020 13:46:31 GMT
www.163.com header > Content-Length: 0
www.163.com header > Connection: close
www.163.com header > Server: Cdn Cache Server V2.0
www.163.com header > Location: http://www.163.com/special/0077jt/error_isp.html
www.163.com header > X-Via: 1.0 PSrbdbOSA2mv97:13 (Cdn Cache Server V2.0)
www.163.com header > X-Ws-Request-Id: 5e6a3d37_PSrbdbOSA2ab99_60908-42096
www.163.com header > Cache-Control: no-cache,no-store,private
www.google.com header > HTTP/1.0 200 OK
www.google.com header > Date: Thu, 12 Mar 2020 13:46:31 GMT
www.google.com header > Expires: -1
www.google.com header > Cache-Control: private, max-age=0
www.google.com header > Content-Type: text/html; charset=ISO-8859-1
www.google.com header > P3P: CP="This is not a P3P policy! See g.co/p3phelp for more info."
www.google.com header > Server: gws
www.google.com 

# async 和 await
- 协程 通过 async/await 语法进行声明，是编写 asyncio 应用的推荐方式。
- 为了更好的表示异步 I/O
- 让 coroutine 代码更简洁
- await 表达式
    - 挂起 coroutine 的执行以等待一个 awaitable 对象。
    - 只能在 coroutine function 内部使用。
- 使用上,可以简单进行替换
    - 可以把 @asyncio.coroutine 替换成async
    - yield from 替换成 await
- 示例3，将示例1直接替换得到

In [5]:
# asyncio 示例3
import threading
import asyncio

#@asyncio.coroutine
async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    print('Start..... (%s)' % threading.currentThread())
    await asyncio.sleep(5)
    print('Done..... (%s)' % threading.currentThread())
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

RuntimeError: This event loop is already running

Hello world! (<_MainThread(MainThread, started 25500)>)
Start..... (<_MainThread(MainThread, started 25500)>)
Hello world! (<_MainThread(MainThread, started 25500)>)
Start..... (<_MainThread(MainThread, started 25500)>)
Done..... (<_MainThread(MainThread, started 25500)>)
Hello again! (<_MainThread(MainThread, started 25500)>)
Done..... (<_MainThread(MainThread, started 25500)>)
Hello again! (<_MainThread(MainThread, started 25500)>)


# aiohttp
- asyncio实现单线程并发I/O，在客户端用处不大
- 在服务器端可以asyncio与coroutine配合，因为HTTP是I/O操作
- asyncio实现了TCP，UIDP，SSL等协议
- aiohttp是给予asyncio实现的HTTP框架
- 使用时应先使用该语句安装模块：pip install aiohttp
- 示例4（Jupyter上面没有该模块，所以运行不了）

In [6]:
# aiohttp 示例4
import asyncio
# 导入aiohttp模块
from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b'<h1>Index</h1>')

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info['name']
    return web.Response(body=text.encode('utf-8'))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

ModuleNotFoundError: No module named 'aiohttp'

# concurrent.futures - 启动并行任务
- 类似其他语言“线程池”的概念
- 此模块利用multiprocessiong实现真正的平行计算
- 核心原理是：concurrent.futures会以子进程的形式，平行的运行多个Python解释器，从而令Python程序可以利用多核CPU来提升执行速度。
- 由于子进程与主解释器相分离，所以他们的全局解释器锁也是相互独立的。
- 每个子进程都能够完整的使用一个CPU内核。

## concurrent.futures.Executor 
- 线程池：ThreadPoolExecutor
- 进程池：ProcessPoolExecutor
- 执行时需自行选择
    
## submit(fn, \*args, \*\*kwargs)
- 调度可调用对象 fn，以 fn(\*args \*\*kwargs) 方式执行并返回 Future 对像代表可调用对象的执行
- fn：异步执行的函数
- args，kwargs：参数
    
             # 官方死锁案例
            import time
            def wait_on_b():
                time.sleep(5)
                print(b.result())  #b不会完成，他一直在等待a的return结果
                return 5

            def wait_on_a():
                time.sleep(5)
                print(a.result())  #同理a也不会完成，他也是在等待b的结果
                return 6


            executor = ThreadPoolExecutor(max_workers=2)
            a = executor.submit(wait_on_b)
            b = executor.submit(wait_on_a)
            
- 示例5

In [7]:
# concurrent.futures 示例5
from concurrent.futures import ThreadPoolExecutor
import time


def return_future(msg):
    time.sleep(3)
    return msg


# 创建一个线程池
pool = ThreadPoolExecutor(max_workers=2)

# 往线程池加入2个task
f1 = pool.submit(return_future, 'hello')
f2 = pool.submit(return_future, 'world')

# 等待执行完毕
print(f1.done())
time.sleep(3)
print(f2.done())

# 结果
print(f1.result())
print(f2.result())

False
True
hello
world


## current中的map()函数
- map(func, *iterables, timeout=None, chunksize=1)
    - 类似于 map(func, *iterables) 除去：
        - 应立即收集 iterables 不要延迟再收集
        - func 是异步执行的且对 func 的调用可以并发执行
    - 如果 \_\_next__() 已被调用且返回的结果在对 Executor.map() 的原始调用经过 timeout 秒后还不可用，则已返回的迭代器将引发 concurrent.futures.TimeoutError。
    - timeout: 超时时间
        - timeout 可以为 int 或 float 类型。
        - 如果 timeout 未指定或为 None，则不限制等待时间。
    - 示例6
    - 示例7
    - 起执行结果是list,数据需要从list中取出来
    
            with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
                print(list(executor.map(sleeper, x)))

- submit和map根据需要选一个即可
- 示例8

In [8]:
# 示例6
import time,re
import os,datetime
from concurrent import futures

data = ['1','2']

def wait_on(argument):
    print(argument)
    time.sleep(2)
    return "ok"

ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on,data):
    print(i)

1
2
ok
ok


In [13]:
# 示例7
from concurrent.futures import ThreadPoolExecutor as Pool
#import requests
import urllib
from urllib import request

URLS = ['http://www.google.com', 'http://qq.com', 'http://sina.com']


def task(url, timeout=20):
    #return requests.get(url, timeout=timeout)
    return request.urlopen(url, timeout=timeout)


pool = Pool(max_workers=3)
results = pool.map(task, URLS)

import time
time.sleep(20)
for ret in results:
    print('%s, %s' % (ret.url, len(ret.read())))

http://www.google.com, 13045
https://www.qq.com/?fromdefault, 243655
http://sina.com, 23248


In [10]:
# 示例8
import time,re,fcntl
import os,datetime
from concurrent import futures

count_list = list()
MinuteNum = 1
StartTime = datetime.datetime(2018, 5, 1, 19, 31, 0, 484870)
NowTime = datetime.datetime.now()
os.system(':>new.txt')

f_new = open('new.txt','a')

def conc(CountTimeFormat):
    f = open('push_slave.stdout', 'r')
    for line in f.readlines():
        if re.search(CountTimeFormat,line):
            #获得文件专用锁
            fcntl.flock(f_new, fcntl.LOCK_EX)
            f_new.writelines(line)
            f_new.flush()
            #释放文件锁
            fcntl.flock(f_new, fcntl.LOCK_UN)
            break

while 1:
    AfterOneMinute = datetime.timedelta(minutes=MinuteNum)
    CountTime = AfterOneMinute + StartTime
    CountTimeFormat = CountTime.strftime('%Y-%m-%d %H:%M')
    MinuteNum = MinuteNum+1
    count_list.append(CountTimeFormat)
    if CountTimeFormat == "2018-05-2 16:00":
        break

def exec_cmd():
    with futures.ProcessPoolExecutor(max_workers=24) as executor:
        dict(( executor.submit(conc, times), times) for times in count_list)

if __name__ == '__main__':
    exec_cmd()
    f_new.close()

ModuleNotFoundError: No module named 'fcntl'

## Future
- 未来需要完成的任务
- future 实例由Excutor.submit创建
- 示例9

In [12]:
# 示例9
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests

URLS = ['http://qq.com', 'http://sina.com', 'http://www.google.com', ]


def task(url, timeout=10):
    return requests.get(url, timeout=timeout)


with Pool(max_workers=3) as executor:
    future_tasks = [executor.submit(task, url) for url in URLS]

    for f in future_tasks:
        if f.running():
            print('%s is running' % str(f))

    for f in as_completed(future_tasks):
        try:
            ret = f.done()
            if ret:
                f_ret = f.result()
                print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
        except Exception as e:
            f.cancel()
            print(str(e))

<Future at 0x1d10f48b3c8 state=running> is running
<Future at 0x1d10f4d21c8 state=running> is running
<Future at 0x1d10f4d2d48 state=running> is running
<Future at 0x1d10f4d2d48 state=finished returned Response>, done, result: http://www.google.com/, 13083
<Future at 0x1d10f4d21c8 state=finished returned Response>, done, result: http://sina.com/, 23248
<Future at 0x1d10f48b3c8 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243655
