Skip to content

Latest commit

 

History

History
654 lines (469 loc) · 32.8 KB

File metadata and controls

654 lines (469 loc) · 32.8 KB

七、异步 IO——无线程的多线程

上一章向我们展示了同步协程的基本实现。然而,无论何时处理外部资源,同步协同路由都不是一个好主意。仅一次暂停远程连接就可能导致整个进程挂起,除非您正在使用多进程(在第 13 章多进程中进行了解释—当单个 CPU 核心不够时)或异步功能。

异步 IO 使访问外部资源成为可能,而无需担心应用的速度减慢或暂停。Python 解释器不需要主动等待结果,只需继续执行其他任务,直到再次需要它为止。这与 JavaScript 中 Node.js 和 AJAX 调用的功能非常相似。在 Python 中,我们已经看到了asyncoregeventeventlet等库,这些库多年来使这成为可能。然而,随着asyncio模块的引入,它变得更加易于使用。

本章将解释如何在 Python 中使用异步函数(特别是 3.5 及更高版本),以及如何重新构造代码,使其在不遵循返回值的标准过程编码模式的情况下仍能正常工作。

本章将介绍以下主题:

  • 功能使用:
    • async def
    • async for
    • async with
    • await
  • 并行执行
  • 服务器
  • 客户
  • 使用Future的最终结果

引入 asyncio 库

创建asyncio库是为了使异步处理更容易,结果更可预测。它的引入是为了替换asyncore模块,该模块已经使用了很长时间(实际上是从 Python 1.5 开始)。asyncore模块从来都不是很有用,这促使了geventeventlet第三方库的创建。geventeventlet都使异步编程比asyncore容易得多,但我觉得随着asyncio的引入,这两种编程方式在很大程度上已经过时了。尽管我不得不承认asyncio仍然有很多不好的地方,但它正处于非常活跃的开发阶段,这让我认为所有这些不好的地方很快就会被核心 Python 库或第三方包装器修复。

为 Python3.4 正式引入了asyncio库,但 Python3.3 的后端口可通过 Python 包索引获得。考虑到这一点,虽然本章的某些部分可以在 Python3.3 上运行,但大部分内容都是用 Python3.5 编写的,并考虑了新引入的asyncawait关键字。

异步和等待语句

在我们继续任何示例之前,了解 Python 3.4 和 Python 3.5 代码语法的关系非常重要。尽管asyncio库仅在 Python3.4 中引入,但 Python3.5 中已经替换了大部分通用语法。不是强制的,但是引入了更简单的、因此推荐的使用asyncawait的语法。

Python 3.4

对于传统的 Python 3.4 用法,需要考虑以下几点:

  • 函数应该使用asyncio.coroutine修饰符声明
  • 异步结果应使用yield from coroutine()获取
  • 不直接支持异步循环,但可以使用while True: yield from coroutine()进行模拟

以下是一个例子:

import asyncio

@asyncio.coroutine
def sleeper():
    yield from asyncio.sleep(1)

Python 3.5

在 Python3.5 中,引入了一种新语法来将函数标记为异步函数。可以使用async关键字代替asyncio.coroutine装饰符。此外,Python 现在支持await语句,而不是混乱的yield from语法。yield from语句有点让人困惑,因为它可能会让人觉得某个值正在被交换,但情况并非总是如此。

以下为async声明:

async def some_coroutine():
    pass

它可以用来代替装饰器:

import asyncio

@asyncio.coroutine
def some_coroutine():
    pass

在 Python3.5 中,很可能在未来的版本中,coroutine装饰器仍将受到支持,但如果向后兼容性不是问题,我强烈建议使用新语法。

此外,我们可以使用逻辑性更强的await语句来代替yield from语句。因此,上一段中的示例变得非常简单,如下所示:

import asyncio

async def sleeper():
    await asyncio.sleep(1)

yield from语句源自 Python 中最初的协同路由实现,是同步协同路由中使用的yield语句的逻辑扩展。实际上,yield from语句仍然有效,await语句只是它的包装,并添加了一些检查。在使用await时,解释器检查该对象是否为可等待对象,这意味着它需要是以下对象之一:

  • 使用async def语句创建的本机协同程序
  • 使用asyncio.coroutine装饰器创建的协同程序
  • 实现__await__方法的对象

仅此检查就使得await语句比yield from语句更可取,但我个人认为await也能更好地传达该语句的含义。

要总结,要转换为新语法,请对进行以下更改:

  • 应使用async def而不是def声明函数
  • 异步结果应使用await coroutine()获取
  • 可以使用async for ... in ...创建异步循环
  • 可以使用async with ...创建异步with语句

在 3.4 和 3.5 语法之间选择

除非您真的需要 Python 3.3 或 3.4 支持,否则我强烈推荐 Python 3.5 语法。新语法更清晰,支持更多功能,如异步for循环和with语句。不幸的是,它们并不完全兼容,因此您需要做出选择。在async def(3.5)中,我们不能使用yield from,但我们需要做的就是将yield from替换为await

单线程并行处理的一个简单示例

并行处理有很多用途:服务器同时处理多个请求、加快繁重的任务、等待外部资源等等。在某些情况下,通用协同路由可以帮助处理多个请求和外部资源,但它们仍然是同步的,因此受到限制。有了asyncio,我们可以超越泛型协程的限制,轻松处理暂停的资源,而不必担心阻塞主线程。让我们看一个简单的示例,说明代码如何在多个并行函数中不暂停:

>>> import asyncio

>>> async def sleeper(delay):
...     await asyncio.sleep(delay)
...     print('Finished sleeper with delay: %d' % delay)

>>> loop = asyncio.get_event_loop()
>>> results = loop.run_until_complete(asyncio.wait((
...     sleeper(1),
...     sleeper(3),
...     sleeper(2),
... )))
Finished sleeper with delay: 1
Finished sleeper with delay: 2
Finished sleeper with delay: 3

即使我们以 1、3、2 的顺序开始睡眠,睡眠时间也相当长,asyncio.sleep加上await语句实际上告诉 Python 它应该继续执行此时需要实际处理的任务。常规的time.sleep实际上会暂停 Python 任务,这意味着它们将按顺序执行。这使得它在某种程度上更加透明,因为它处理任何类型的等待,我们可以将其交给asyncio,而不是让整个 Python 线程保持忙碌。因此,我们不需要while True: fh.read(),只要有新数据,我们就可以响应。

让我们分析一下本例中使用的组件:

  • asyncio.coroutine:此装饰器允许从async def协同程序中产生收益。除非您使用这种语法,否则实际上不需要 decorator,但如果只是作为文档使用,这是一个很好的默认值。
  • asyncio.sleep:这是time.sleep的异步版本。这两者之间的最大区别是time.sleep将使 Python 进程在睡眠时保持忙碌,而asyncio.sleep将允许切换到事件循环中的不同任务。这个过程与大多数操作系统中任务切换的工作原理非常相似。
  • asyncio.get_event_loop:默认事件循环实际上是asyncio任务切换器;我们将在下一段中详细解释这些。
  • asyncio.wait:这是用于包装一系列协同过程或未来并等待结果的协同过程。等待时间是可配置的,等待方式也是可配置的(第一次完成、全部完成或第一次异常)。

这应该可以解释示例的基本工作原理:sleeper函数是异步协程,在给定延迟后退出。wait函数在退出之前等待所有协同路由完成,event循环用于在三个协同路由之间切换。

异步的概念

asyncio库有几个基本概念,在我们进一步探讨示例和用法之前,必须对这些概念进行解释。上一段中显示的示例实际上使用了其中的大部分,但是关于如何和为什么的一点解释可能仍然有用。

asyncio的主要概念是协程事件循环。在中,有几个助手类可用,例如StreamsFuturesProcesses。接下来的几段将解释基础知识,以便您能够理解后面几段中示例中的实现。

未来与任务

asyncio.Future类本质上是对结果的承诺;如果结果可用,则返回结果,一旦收到结果,则将其传递给所有已注册的回调。它在内部维护一个状态变量,允许外部方将未来标记为已取消。API 与concurrent.futures.Future类非常相似,但由于它们不完全兼容,请确保不要混淆两者。

不过,Future类本身使用起来并不方便,所以这就是asyncio.Task的用武之地。Task类包装了一个协程,并自动为您处理执行、结果和状态。协同程序将通过给定的事件循环执行,如果没有给定,则通过默认的事件循环执行。

创建这些类不需要直接担心。这是因为推荐的方法不是自己创建类,而是通过asyncio.ensure_futureloop.create_task创建类。前者实际上在内部执行loop.create_task,但如果您只想在主/默认事件循环上执行它,而不必首先指定它,则更方便。用法非常简单。要手动创建自己的未来,只需告诉事件循环为您执行create_task。下面的例子有点复杂,因为所有的设置代码,但是 C 的用法应该足够清楚。需要注意的最重要方面是,应链接事件循环,以便任务知道如何/在何处运行:

>>> import asyncio

>>> async def sleeper(delay):
...     await asyncio.sleep(delay)
...     print('Finished sleeper with delay: %d' % delay)

# Create an event loop
>>> loop = asyncio.get_event_loop()

# Create the task
>>> result = loop.call_soon(loop.create_task, sleeper(1))

# Make sure the loop stops after 2 seconds
>>> result = loop.call_later(2, loop.stop)

# Start the loop and make it run forever. Or at least until the loop.stop gets
# called in 2 seconds.
>>> loop.run_forever()
Finished sleeper with delay: 1

现在,有一点关于调试异步函数的内容。调试异步函数过去是非常困难的,如果不是不可能的话,因为没有很好的方法来查看函数在哪里以及如何停止。幸运的是,情况已经改变。对于Task类,只需调用task.get_stacktask.print_stack即可查看其当前位置。用法可以简单到如下所示:

>>> import asyncio

>>> async def stack_printer():
...     for task in asyncio.Task.all_tasks():
...         task.print_stack()

# Create an event loop
>>> loop = asyncio.get_event_loop()

# Create the task
>>> result = loop.run_until_complete(stack_printer())

事件循环

事件循环的概念实际上是asyncio中最重要的概念。您可能已经怀疑协同程序本身就是所有事情的核心,但是如果没有事件循环,它们是无用的。事件循环充当任务切换器,就像操作系统在 CPU 上的活动任务之间切换一样。即使使用多核处理器,仍然需要一个主进程来告诉 CPU 哪些任务必须运行,哪些任务需要等待/睡眠一段时间。这正是事件循环的作用:它决定运行哪个任务。

事件循环实现

到目前为止,我们只看到了,它使用默认事件循环策略返回默认事件循环。目前,有两种捆绑的事件循环实现:async.SelectorEventLoopasync.ProactorEventLoop实现。这两个选项中哪一个可用取决于您的操作系统。后一个事件循环仅在 Windows 计算机上可用,并使用 I/O 完成端口,这是一个据说比asyncio.SelectorEventLoopSelect实现更快、更高效的系统。如果性能是一个问题,这是一个值得考虑的问题。使用非常简单,幸运的是:

import asyncio

loop = asyncio.ProActorEventLoop()
asyncio.set_event_loop(loop)

备选事件循环基于选择器,自 Python 3.4 以来,选择器可通过核心 Python 安装中的selectors模块获得。在 Python 3.4 中引入了selectors模块,以方便访问低级异步 I/O 操作。基本上,它允许您使用 I/O 多路复用打开和读取许多文件。由于asyncio为您处理了所有的复杂问题,因此通常不需要直接使用该模块,但如果您需要,它的使用非常简单。下面是一个将函数绑定到标准输入上的读取事件(EVENT_READ的示例。代码只需等待其中一个注册文件提供新数据:

import sys
import selectors

def read(fh):
    print('Got input from stdin: %r' % fh.readline())

if __name__ == '__main__':
    # Create the default selector
    selector = selectors.DefaultSelector()

    # Register the read function for the READ event on stdin
    selector.register(sys.stdin, selectors.EVENT_READ, read)

    while True:
        for key, mask in selector.select():
            # The data attribute contains the read function here
            callback = key.data
            # Call it with the fileobj (stdin here)
            callback(key.fileobj)

有几种选择器可用,如传统的selectors.SelectSelector(内部使用select.select,但也有更现代的解决方案,如selectors.KqueueSelectorselectors.EpollSelectorselectors.DevpollSelector。尽管默认情况下它应该选择最有效的选择器,但在某些情况下,最有效的选择器在某些方面并不合适。在这些情况下,选择器事件循环允许您指定不同的选择器:

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

应该注意的是,这些选择器之间的差异通常太小,在大多数实际应用中都无法注意到。我遇到的唯一一种情况是,在构建一个必须同时处理大量连接的服务器时,这种优化会产生影响。在“很多”一词中,我指的是单个服务器上超过 100000 个并发连接,这是这个星球上只有少数人必须解决的问题。

事件循环策略

事件循环策略是为您创建和存储实际事件循环的对象。它们在编写时考虑了最大的灵活性,但不是您经常需要修改的对象。我可以考虑修改事件循环策略的唯一原因是,如果希望在特定处理器和/或系统上运行特定的事件循环,或者如果希望更改默认的事件循环类型。除此之外,它提供了比大多数人所需要的更多的灵活性。通过以下代码,您可以创建自己的事件循环(本例中为ProActorEventLoop)默认值:

import asyncio

class ProActorEventLoopPolicy(
        asyncio.events.BaseDefaultEventLoopPolicy):
    _loop_factory = asyncio.SelectorEventLoop

policy = ProActorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)

事件循环使用

到目前为止,我们只看到了loop.run_until_complete方法。当然,还有一些其他的。您最常使用的是loop.run_forever。正如所预期的,这个方法会一直运行,或者至少直到loop.stop被运行为止。

因此,假设我们现在有一个永远运行的事件循环,我们需要向它添加任务。这就是事情变得有趣的地方。默认事件循环中有很多选项:

  • call_soon:将项添加到(FIFO)队列的末尾,以便按插入顺序执行函数。
  • call_soon_threadsafe:与call_soon相同,只是线程安全。call_soon方法不是线程安全的,因为线程安全需要使用全局解释器锁(GIL),这在线程安全的时刻有效地使程序成为单线程。性能一章将更详细地解释这一点。
  • call_later:在给定的秒数后调用该函数。如果两个作业同时运行,它们将以未定义的顺序运行。请注意,延迟是最小值。如果事件循环处于锁定/忙碌状态,则可以稍后运行。
  • call_at:在与loop.time输出相关的特定时间调用函数。loop.time之后的每个整数加一秒。

所有这些函数都返回asyncio.Handle对象。这些对象允许通过handle.cancel函数取消任务,只要任务尚未执行。但是,要小心从其他线程取消,因为取消也不是线程安全的。为了以线程安全的方式执行它,我们还必须将取消功能作为任务执行:loop.call_soon_threadsafe(handle.cancel)。以下是一个示例用法:

>>> import time
>>> import asyncio

>>> t = time.time()

>>> def printer(name):
...     print('Started %s at %.1f' % (name, time.time() - t))
...     time.sleep(0.2)
...     print('Finished %s at %.1f' % (name, time.time() - t))

>>> loop = asyncio.get_event_loop()
>>> result = loop.call_at(loop.time() + .2, printer, 'call_at')
>>> result = loop.call_later(.1, printer, 'call_later')
>>> result = loop.call_soon(printer, 'call_soon')
>>> result = loop.call_soon_threadsafe(printer, 'call_soon_threadsafe')

>>> # Make sure we stop after a second
>>> result = loop.call_later(1, loop.stop)

>>> loop.run_forever()
Started call_soon at 0.0
Finished call_soon at 0.2
Started call_soon_threadsafe at 0.2
Finished call_soon_threadsafe at 0.4
Started call_later at 0.4
Finished call_later at 0.6
Started call_at at 0.6
Finished call_at at 0.8

你可能想知道我们为什么不在这里使用协同程序装饰器。原因是循环不允许直接运行协同路由。要通过这些调用函数运行协同路由,我们需要确保它包装在一个asyncio.Task中。正如我们在上一段中所看到的,幸运的是,这很容易:

>>> import time
>>> import asyncio

>>> t = time.time()

>>> async def printer(name):
...     print('Started %s at %.1f' % (name, time.time() - t))
...     await asyncio.sleep(0.2)
...     print('Finished %s at %.1f' % (name, time.time() - t))

>>> loop = asyncio.get_event_loop()

>>> result = loop.call_at(
...     loop.time() + .2, loop.create_task, printer('call_at'))
>>> result = loop.call_later(.1, loop.create_task,
...     printer('call_later'))
>>> result = loop.call_soon(loop.create_task,
...     printer('call_soon'))

>>> result = loop.call_soon_threadsafe(
...     loop.create_task, printer('call_soon_threadsafe'))

>>> # Make sure we stop after a second
>>> result = loop.call_later(1, loop.stop)

>>> loop.run_forever()
Started call_soon at 0.0
Started call_soon_threadsafe at 0.0
Started call_later at 0.1
Started call_at at 0.2
Finished call_soon at 0.2
Finished call_soon_threadsafe at 0.2
Finished call_later at 0.3
Finished call_at at 0.4

这些调用方法看起来可能略有不同,但内部实际上可以归结为两个队列,它们是通过heapq实现的。loop._scheduled用于预定操作,loop._ready用于立即执行。当调用_run_once方法时(run_forever方法将此方法包装在while True循环中),循环将首先尝试使用特定的循环实现(例如SelectorEventLoop处理loop._ready堆中的所有项)。一旦处理了loop._ready中的所有内容,循环将继续将项目从loop._scheduled堆移动到loop._ready堆(如果到期)。

call_sooncall_soon_threadsafe都写入loop._ready堆。而call_later方法只是call_at的包装器,当前值asyncio.time添加到计划时间,写入loop._scheduled堆。

这种处理方法的结果是,通过call_soon*方法添加的所有内容将始终在通过call_at/call_later方法添加的所有内容之后执行。

至于ensure_futures函数,它将在内部调用loop.create_task以将协程封装在Task对象中,当然,这是Future对象的子类。如果出于某种原因需要扩展Task类,那么可以通过loop.set_task_factory方法轻松实现。

根据事件循环的类型,实际上还有许多其他方法可用于创建连接、文件处理程序等。这些将在后面的段落中通过示例进行解释,因为它们与事件循环关系不大,更多的是使用协同程序进行编程。

过程

到目前为止,我们只执行了特定的异步 Python 函数,但有些东西在 Python 中异步运行有点困难。例如,假设我们有一个希望运行的长期运行的外部应用。subprocess模块将是运行外部应用的标准方法,并且工作得相当好。只要小心一点,甚至可以通过轮询输出来确保它们不会阻塞主线程。然而,这仍然需要投票。然而,在我们等待结果的同时,事件是否会变得更好,以便我们可以做其他事情?幸运的是,这很容易通过asyncio.Process安排。类似于FutureTask类,该类是通过事件循环创建的。就用法而言,该类与subprocess.Popen类非常相似,只是函数被设置为异步。当然,这会导致删除轮询功能。

首先,让我们看看传统的顺序版本:

>>> import time
>>> import subprocess
>>>
>>>
>>> t = time.time()
>>>
>>>
>>> def process_sleeper():
...     print('Started sleep at %.1f' % (time.time() - t))
...     process = subprocess.Popen(['sleep', '0.1'])
...     process.wait()
...     print('Finished sleep at %.1f' % (time.time() - t))
...
>>>
>>> for i in range(3):
...     process_sleeper()
Started sleep at 0.0
Finished sleep at 0.1
Started sleep at 0.1
Finished sleep at 0.2
Started sleep at 0.2
Finished sleep at 0.3

由于所有操作都是按顺序执行的,因此 sleep 命令睡眠的时间是 0.1 秒的三倍。因此,与其同时等待它们,不如让我们这次并行运行它们:

>>> import time
>>> import subprocess 

>>> t = time.time()

>>> def process_sleeper():
...     print('Started sleep at %.1f' % (time.time() - t))
...     return subprocess.Popen(['sleep', '0.1'])
...
>>>
>>> processes = []
>>> for i in range(5):
...     processes.append(process_sleeper())
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0

>>> for process in processes:
...     returncode = process.wait()
...     print('Finished sleep at %.1f' % (time.time() - t))
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1

虽然这个在运行时方面看起来好多了,但我们的程序结构现在有点混乱。我们需要两个循环,一个用于启动流程,另一个用于测量完成时间。此外,我们必须将 print 语句移到函数之外,这通常也是不可取的。这次我们将尝试asyncio版本:

>>> import time
>>> import asyncio

>>> t = time.time()

>>> async def async_process_sleeper():
...     print('Started sleep at %.1f' % (time.time() - t))
...     process = await asyncio.create_subprocess_exec('sleep', '0.1')
...     await process.wait()
...     print('Finished sleep at %.1f' % (time.time() - t))

>>> loop = asyncio.get_event_loop()
>>> for i in range(5):
...     task = loop.create_task(async_process_sleeper())

>>> future = loop.call_later(.5, loop.stop)

>>> loop.run_forever()
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Started sleep at 0.0
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1
Finished sleep at 0.1

如您所见,通过这种方式,同时运行多个应用非常容易。但这是容易的部分;过程的难点在于交互输入和输出。asyncio模块有几种措施使其更容易,但在实际处理结果时仍然很困难。下面是调用 Python 解释器、执行一些代码并再次退出的示例:

import asyncio

async def run_script():
    process = await asyncio.create_subprocess_shell(
        'python3',
        stdout=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )

    # Write a simple Python script to the interpreter
    process.stdin.write(b'\n'.join((
        b'import math',
        b'x = 2 ** 8',
        b'y = math.sqrt(x)',
        b'z = math.sqrt(y)',
        b'print("x: %d" % x)',
        b'print("y: %d" % y)',
        b'print("z: %d" % z)',
        b'for i in range(int(z)):',
        b'    print("i: %d" % i)',
    )))
    # Make sure the stdin is flushed asynchronously
    await process.stdin.drain()
    # And send the end of file so the Python interpreter will
    # start processing the input. Without this the process will
    # stall forever.
    process.stdin.write_eof()

    # Fetch the lines from the stdout asynchronously
    async for out in process.stdout:
        # Decode the output from bytes and strip the whitespace
        # (newline) at the right
        print(out.decode('utf-8').rstrip())

    # Wait for the process to exit
    await process.wait()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_script())
    loop.close()

代码非常简单,但该代码中有几个部分对我们来说并不明显,但需要发挥作用。虽然创建子流程和编写代码是很明显的,但您可能会对process.stdin.write_eof()行感到疑惑。这里的问题是缓冲。为了提高性能,大多数程序在默认情况下都会缓冲输入和输出。对于 Python 程序,结果是除非我们发送文件eof结尾,否则程序将继续等待更多输入。另一种解决方案是关闭stdin流或以某种方式与 Python 程序通信,我们将不再发送任何输入。然而,这当然是需要考虑的。另一种选择是使用process.stdin.drain()中的yield,但这只考虑代码的发送端;接收端可能仍在等待更多输入。让我们看看输出:

# python3 processes.py
x: 256
y: 16
z: 4
i: 0
i: 1
i: 2
i: 3

在这个实现中,我们仍然需要一个循环来获得来自stdout流的所有结果。不幸的是,asyncio.StreamReader(也就是process.stdout类)还不支持async for语法。如果是这样的话,一个简单的async for out in process.stdout就行了。一个简单的yield from process.stdout.read()也可以,但每行阅读通常更方便使用。

如果可能,我建议您避免使用stdin向子流程发送数据,而是使用进行一些网络、管道或文件通信。正如我们将在接下来的段落中看到的那样,处理这些问题要方便得多。

异步服务器和客户端

暂停脚本和应用的最常见原因之一是使用远程资源。有了asyncio,其中至少有很大一部分很容易修复。获取多个远程资源并为多个客户机提供服务比过去更简单、更轻量级。虽然多线程和多进程也可以用于这些情况,asyncio是一种更轻的选择,实际上更易于管理。创建客户端和服务器有两种主要方法。协同方式为使用asyncio.open_connectionasyncio.start_server。基于类的方法要求您继承asyncio.Protocol类。虽然这些基本上是相同的东西,但工作原理略有不同。

基本回音服务器

基本的客户端和服务器版本非常简单,可以编写。asyncio模块负责所有低级连接处理,只剩下正确连接方法的要求。对于服务器,我们需要一个方法来处理传入的连接,对于客户端,我们需要一个函数来创建连接。为了说明发生了什么以及在哪个时间点,我们将添加一个专用的打印函数,该函数将打印自服务器进程启动以来的时间和给定的参数:

import time
import sys
import asyncio

HOST = '127.0.0.1'
PORT = 1234

start_time = time.time()

def printer(start_time, *args, **kwargs):
    '''Simple function to print a message prefixed with the
    time relative to the given start_time'''
    print('%.1f' % (time.time() - start_time), *args, **kwargs)

async def handle_connection(reader, writer):
    client_address = writer.get_extra_info('peername')
    printer(start_time, 'Client connected', client_address)

    # Send over the server start time to get consistent
    # timestamps
    writer.write(b'%.2f\n' % start_time)
    await writer.drain()

    repetitions = int((await reader.readline()))
    printer(start_time, 'Started sending to', client_address)

    for i in range(repetitions):
        message = 'client: %r, %d\n' % (client_address, i)
        printer(start_time, message, end='')
        writer.write(message.encode())
        await writer.drain()

    printer(start_time, 'Finished sending to', client_address)
    writer.close()

async def create_connection(repetitions):
    reader, writer = await asyncio.open_connection(
        host=HOST, port=PORT)

    start_time = float((await reader.readline()))

    writer.write(repetitions.encode() + b'\n')
    await writer.drain()

    async for line in reader:
        # Sleeping a little to emulate processing time and make
        # it easier to add more simultaneous clients
        await asyncio.sleep(1)

        printer(start_time, 'Got line: ', line.decode(),
                end='')

    writer.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    if sys.argv[1] == 'server':
        server = asyncio.start_server(
            handle_connection,
            host=HOST,
            port=PORT,
        )
        running_server = loop.run_until_complete(server)

        try:
            result = loop.call_later(5, loop.stop)
            loop.run_forever()
        except KeyboardInterrupt:
            pass

        running_server.close()
        loop.run_until_complete(running_server.wait_closed())
    elif sys.argv[1] == 'client':
        loop.run_until_complete(create_connection(sys.argv[2]))

    loop.close()

现在我们将运行服务器和两个同时运行的客户端。由于这些并行运行,服务器输出当然有点奇怪。因此,我们同步从服务器到客户端的开始时间,并将服务器启动后的秒数作为所有打印语句的前缀。

服务器:

# python3 simple_connections.py server
0.4 Client connected ('127.0.0.1', 59990)
0.4 Started sending to ('127.0.0.1', 59990)
0.4 client: ('127.0.0.1', 59990), 0
0.4 client: ('127.0.0.1', 59990), 1
0.4 client: ('127.0.0.1', 59990), 2
0.4 Finished sending to ('127.0.0.1', 59990)
2.0 Client connected ('127.0.0.1', 59991)
2.0 Started sending to ('127.0.0.1', 59991)
2.0 client: ('127.0.0.1', 59991), 0
2.0 client: ('127.0.0.1', 59991), 1
2.0 Finished sending to ('127.0.0.1', 59991)

第一个客户:

# python3 simple_connections.py client 3
1.4 Got line:  client: ('127.0.0.1', 59990), 0
2.4 Got line:  client: ('127.0.0.1', 59990), 1
3.4 Got line:  client: ('127.0.0.1', 59990), 2

第二个客户:

# python3 simple_connections.py client 2
3.0 Got line:  client: ('127.0.0.1', 59991), 0
4.0 Got line:  client: ('127.0.0.1', 59991), 1

由于输入和输出都有缓冲区,我们需要在写入后手动清空输入,并在读取另一方的输出时使用yield from。这正是与常规外部流程的通信比网络交互更困难的原因。与计算机输入相比,流程的标准输入更注重用户输入,这使得其使用不太方便。

如果您希望使用reader.read(BUFFER)而不是reader.readline(),这也是可能的。只需注意,您需要专门分隔数据,因为它可能会意外地附加到其他数据中。所有写入操作都写入同一个缓冲区,从而产生一个长返回流。另一方面,试图在没有reader.readline()识别的新行(\n)的情况下写入将导致客户端永远等待。

总结

在本章中,我们看到了如何使用asyncio在 Python 中使用异步 I/O。对于许多场景,asyncio模块仍然有点原始和未完成,但在使用它时应该没有任何障碍。创建一个功能齐全的服务器/客户端设置仍然有点复杂,但是asyncio最明显的用途是处理基本的网络 I/O(如数据库连接)和外部资源(如网站)。特别是,使用asyncio只需几行就可以实现后者,从而消除代码中一些非常重要的瓶颈。

本章的重点是理解如何告诉 Python 在后台等待结果,而不是像往常一样简单地等待或轮询结果。在第 13 章多进程–当单个 CPU 内核不够时您将了解多进程,这也是处理暂停资源的一个选项。然而,多进程的目标实际上是使用多个处理器,而不是处理暂停的资源。如果可能的话,当涉及到潜在的缓慢的外部资源时,我建议您始终使用asyncio

在基于asyncio库构建实用程序时,请确保搜索预制库以解决您的问题,因为许多库目前正在开发中。在编写本章时,Python3.5 还没有正式发布,因此很可能会出现更多使用async/await语法的文档和库。为确保您不会重复他人所做的工作,请在asyncio上编写自己的扩展代码之前,彻底搜索互联网。

下一章将解释一个完全不同的主题——使用元类构造类。常规类是使用类型类创建的,但现在我们将看到如何扩展和修改默认行为,使类可以执行我们想要的任何操作。元类甚至可以自动注册插件,并以一种非常神奇的方式向类添加特性。简言之,如何不仅自定义类实例,而且自定义类定义本身。