Skip to content

Latest commit

 

History

History
1423 lines (1123 loc) · 77.8 KB

File metadata and controls

1423 lines (1123 loc) · 77.8 KB

十五、服务剖析

hms_sys中要攻击的下一个逻辑功能块是 Artisan 网关服务。此服务等待 Artisan 或 Central Office 最终用户的输入,根据需要创建或更新对象数据,并可能将该数据与 web store 系统的数据库同步。预期两个最终用户应用程序都将在完全随机的基础上与 Artisan 网关服务通信;每当有人想要对数据进行更改时,它就会准备好并等待处理该请求。

然而,在我们真正实现这个服务之前,我们需要弄清楚任何服务可以或应该如何工作,用 Python 编写。为此,我们必须研究并理解以下内容:

  • 服务结构的基本实现,包括以下内容:
    • 用于管理服务实例配置的选项
    • 服务如何读取和响应请求
  • 在以下环境中启动服务的方式和时间:
    • 一个相当现代、兼容 POSIX 的系统(例如 Linux)
    • 窗户
  • 是否有其他更好的设计可以在 Python 可用的任何操作系统中工作

为了更好地理解服务的实现和执行的这些方面,我们将从基础上构建一个基本的服务结构,然后它可以作为最终的工匠网关服务的基础。

什么是服务?

最基本的服务只是在计算机后台运行的程序。它们通常在某处等待输入,根据输入执行一些操作,并返回数据,这些数据至少表明所执行的操作成功或失败。在最基本的层面上,输入甚至可能不是用户可见的东西;在今天的许多操作系统中,等待网络活动、监视文件系统甚至只是在某种计时器控制的基础上运行的服务非常常见。

只要主机在运行,服务就应该始终可用并持续运行;这对如何编写和实现它们有一些影响,如下所示:

  • 它们必须具有很强的容错性:每当发生意外事件时,服务就会崩溃并死亡,因此必须重新启动,这是毫无用处的。
  • 可以说,它们应该在功能上尽可能独立;对于可能失败(并导致正在运行的服务崩溃)的外部依赖关系,应该以批判的眼光进行检查。
  • 由于用户可能完全看不到它们的操作,因此设计或实现糟糕的服务很可能会占用系统资源,最终可能导致整个机器停机。即使不涉及多个处理,也需要小心并严格遵守规则,以避免出现诸如永不终止的循环或在内存中留下孤立对象、数据或函数的功能等情况。如果出现这种情况,内存或可用 CPU 将减少到零只是时间(或服务负载)的问题。

服务结构

尽管如此,服务并不一定那么复杂。如果存在可用于管理实际代码执行(启动和关闭)的操作系统功能,则它们在结构上可能不会比以下代码更复杂:

#!/usr/bin/env python
"""
A simple daemon-like function that can be started from the command-line.
"""
    import syslog
    from time import sleep

    def main_program():
        iterations = 0
        syslog.syslog('Starting %s' % __file__)
        while True:
            # TODO: Perform whatever request-acquisition and response-
            #       generation is needed here...
            syslog.syslog('Event Loop (%d)' % iterations)
            sleep(10)
            iterations += 1
        syslog.syslog('Exiting %s' % __file__)

    if __name__ == '__main__':
        main_program()

当前面的代码运行时,它不会生成任何用户可见的输出,但是观察系统日志(在 Linux 机器上使用tail -f /var/log/syslog)表明它正在做它应该做的事情,如下所示:

  • 它在进入主循环之前将开始消息写入日志文件。
  • 在循环的每个过程中,它执行以下操作:
    • 将带有迭代编号的消息写入日志
    • 睡 10 秒钟
    • 递增迭代计数器

正在退出的消息没有写入日志文件,但这在此时是意料之中的,因为停止主循环的唯一方法是终止程序本身,并且在不退出循环的情况下终止程序。从启动到几个迭代,典型的日志输出如下所示:

诚然,这不是一个很好的服务,但它说明了可能被认为是任何服务所共有的最低限度的功能。

大多数服务的核心是一个循环,它一直运行到服务关闭或终止。在该循环中,服务将通过以下几种方式之一实际检查输入。一些较常见的变体包括:

  • 它可能正在等待通过网络套接字传入的请求(web 服务将使用这种方法)。
  • 它可能正在等待来自标准输入(stdin的传入数据。
  • 它可以主动轮询来自外部队列系统(如 RabbitMQ)或基于云的等效系统(如 AWS 的 SQS 或 Google 云平台的 cloud Pub/Sub)的传入消息。

These are only a few of the possibilities for service input. Other mechanisms that don't lend themselves to a direct waiting-for-something model could always push events into a local queue, and have the service watching or polling from that queue mechanism.

在除最基本的服务外的所有服务中,必须对传入的请求进行评估,以确定必须调用什么功能才能处理请求。将传入请求数据与特定功能关联的最常见机制可能是一个大型的if…elif…else结构,它将处理请求的责任传递给特定的专用功能,如下所示:

# - Evaluate the incoming request:
    if request['path'].startswith('/product'):
       return handle_product_request(request)
    elif request['path'].startswith('/artisan'):
       return handle_artisan_request(request)
    elif request['path'].startswith('/customer'):
       return handle_customer_request(request)
    else:
# - Invalid request, so return an error
       return handle_invalid_request(request)

然后,每个handle_{something}_request函数将负责接收传入的请求,确定如何处理它,并返回结果数据。

有一个标准的 Python 库python-daemon,它进一步采用了这种基本方法,允许在基本守护进程上下文中包装函数。同样的基本函数,有一个python-daemon DaemonContext环绕,非常相似,如下代码片段所示:

#!/usr/bin/env python
"""
A bare-bones daemon implementation.
"""
    import syslog
    from daemon import DaemonContext
    from time import sleep

    def main_program():
        iterations = 0
        syslog.syslog('Starting %s' % __file__)
        while True:
        # TODO: Perform whatever request-acquisition and response-
        #       generation is needed here...
            syslog.syslog('Event Loop (%d)' % iterations)
            sleep(10)
            iterations += 1
        syslog.syslog('Exiting %s' % __file__)

    if __name__ == '__main__':
        with DaemonContext():
            main_program()

The terms service and daemon are, for the purposes of this book, interchangeable; they both refer to the same sort of background process program.

执行此代码会产生几乎相同的结果(除了日志消息中显示的文件名,实际上是相同的)。一旦守护程序代码运行,实际的差异实际上是看不见的。使用DaemonContext提供了一些基本的、仅功能的代码无法处理的操作方面,这被认为是守护程序进程的最佳实践:

  • 确保在启动期间关闭与命令关联的所有打开的文件
  • 将进程的工作目录更改为已知和/或安全目录
  • 设置文件创建权限掩码,以便进程创建的文件将具有已知(安全)权限集
  • 执行系统级进程设置,以允许进程本身在后台运行
  • 将进程与任何终端活动分离,这样一旦启动守护进程,它就不会响应终端输入

Although python-daemon is a standard library, it may not be part of a standard Python installation. If not, it can be installed with pip install python-daemon.

因此,python-daemon模块提供了一种非常简单的方法来管理许多编写守护进程和服务的最佳实践操作。但是,使用它有一个潜在的问题。它在没有类似 Unix 的密码数据库的系统上不起作用(它取决于pwd模块,该模块仅适用于 Unix)。这至少排除了需要在 Windows 系统上运行的服务。

不过,归根结底,要知道服务实现不必是一个具有永久循环的单个函数调用,主要关注点(服务逻辑实现之外)可能是如何让主机操作系统启动、停止和管理服务实例。在本章末尾,我们将更详细地研究这一点,但首先,还有一些其他公共服务实现模式和关注点需要进行一些研究。

配置

服务通常必须在不更改实际服务代码的情况下进行配置,以便活动服务的最终用户或管理者本身不必是开发人员,从而能够有效地管理正在运行的服务实例。有几个选项可用于从文件中读取配置和设置值,每个选项都有自己的优缺点。为了更好地比较和对比它们,让我们检查为执行以下操作的服务提供配置的变体:

  • 记录信息、警告、错误和关键级别消息:
    • 发送到控制台的信息和警告级别消息
    • 将所有信息(包括信息和警告级别消息)都添加到一个通用日志文件中,该文件的位置是可配置的
  • 侦听来自队列服务(如 RabbitMQ)或基于云的队列服务(如 AWS 的 SQS 或 Google cloud Platform 的 Pub/Sub)的输入消息,并需要了解以下信息:
    • 要侦听的队列名称或 URL
    • 检查传入消息的频率
    • 访问相关队列的凭据

Windows 样式.ini 文件

Python 有一个用于处理 INI 文件的标准包(或者至少是类似于基本 Windows INI 文件的文件):configparser。为先前列出的项目提供配置的兼容 INI 类文件可能如下所示:

[DEFAULT]
# This section handles settings-values that are available in other 
# sections.
# - The minimum log-level that's in play
log_level:      INFO
queue_type:     rabbit
queue_check:    5

[console_log]
# Settings for logging of messages to the console
# - Message-types to log to a console
capture:        INFO, WARNING

[file_log]
# Settings for file-logging
log_file:       /var/log/myservice/activity.log

[rabbit_config]
# Configuration for the RabbitMQ server, if queue_type is "rabbit"
server:         10.1.10.1
port:           5672
queue_name:     my-queue
user:           username
password:       password

INI 样式配置文件的一些优点包括:

  • 文件结构允许使用注释。任何以#;开头的行都是注释,不会被解析,这就允许以内联方式记录配置文件。
  • [DEFAULT]部分中指定的值由所有其他部分继承,并且可以按照最初的规定使用,或者在以后的部分中重写。
  • 格式本身已经存在很长时间了,所以它非常成熟和稳定。

可以使用一个简单的脚本检查此配置文件的值,列出每个配置部分中的可用值,并显示该格式的一些潜在缺点,如使用configparser工具解析的:

The script that generated this output is in the code for Iteration 3, at hms-gateway/scratch-space/configuration-examples/ini_config.py.

该格式的一些潜在缺点包括:

  • [DEFAULT]配置部分中的值由所有其他部分继承,即使它们不相关。例如,queue_typequeue_check值可在console_logfile_log部分中找到,它们实际上并不相关。

  • 所有配置值都是字符串,可能必须转换为它们的实际值类型:对于queue_checkrabbit_config:port,可能是console_log:capturestr值中的list,对于任何可能出现的值,都可能转换为bool值,等等。

  • 该格式实际上只支持两个级别的配置数据(节及其成员)。

不过,这些约束都不太可能有问题。知道它们的存在通常足以计划如何容纳它们,而容纳的形状可能并不比没有[DEFAULT]部分更复杂,并且将配置值分组为更连贯的部分,例如loggingqueue

JSON 文件

JSON 数据结构也是存储配置文件数据的可行候选。JSON 支持不同类型的数据和复杂的数据结构。与基本的 INI 文件结构相比,两者都是优点,尽管它们可能微不足道。不过,没有预定义的组织结构,因此开发人员必须考虑如何对配置值进行分组或组织。也没有跨节继承配置数据,因为没有可继承的节。尽管如此,它还是一个简单、健壮、易于理解的选项。类似于前面 INI 风格的配置文件的 JSON 可能如下所示:

{
    "logging": {
        "log_level": "INFO",
        "console_capture": ["INFO","WARNING"],
        "log_file": "/var/log/myservice/activity.log"
    },
    "queue": {
        "queue_type": "rabbit",
        "queue_check": 5,
        "server": "10.1.10.1",
        "port": 5672,
        "queue_name": "my-queue",
        "user": "username",
        "password": "password"
    }
}

如果 JSON 有任何缺点(关于它作为配置文件格式的使用),它们会包括这样一个事实,即没有一种好的方式允许文件内注释。Python 的json模块提供的loadloads函数(分别用于转换 JSON 字符串和 JSON 文件)会引发错误JSONDecodeError,如果正在解析的 JSON 数据中存在数据结构以外的内容。这不是一个事务破坏者,但是能够向配置文件添加注释(从而添加文档)肯定有好处,特别是当配置将由非开发人员或不愿意(或能够)深入研究代码本身的人管理时,以了解如何配置系统的某些方面。

YAML 文件

另一个很好的配置文件竞争者是 YAML。YAML 在许多方面与 JSON 类似,因为它提供结构化和类型化的数据表示,并且可以支持复杂的嵌套数据结构。此外,它允许内联注释,pyyaml模块支持暗示在基于 JSON 的方法中根本不可用的数据结构。YAML 和 Python 一样,使用缩进作为结构组织机制,指示(在 YAML 的例子中)项之间的键/值关系。类似于前面的 JSON 配置文件(带有注释,并将文件中的所有元素(对象、列表成员等)分解为离散项),如下所示:

# Logging configuration
logging:
    console_capture:
        - INFO
        - WARNING
    log_file: /var/log/myservice/activity.log
    log_level: INFO
# Queue configuration
queue:
    queue_type: rabbit
    # Credentials
    user: username
    password: password
    # Network
    server: 10.1.10.1
    port: 5672
    # Queue settings
    queue_name: my-queue
    queue_check: 5

在本章后面,我们将以使用 YAML 配置服务的思想为基础。YAML 显然不是唯一的选择,但它是更好的选择之一,可以很好地结合易理解性、注释/文档能力以及多种值类型的可用性。

日志服务活动

由于服务通常在后台以不可见的方式运行,因此它们通常以某种方式记录它们的活动,如果只是为了提供对服务调用期间发生的错误的一些可见性的话。Python 提供了一个模块logging,它为记录运行程序中的事件和消息提供了很大的灵活性。以下是一个非常简单、暴力的完整日志记录过程示例:

import logging

# - Define a format for log-output
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# - Get a logger. Once defned anywhere, loggers (with all their 
#   settings and attached formats and handlers) can be retrieved 
#   elsewhere by getting a logger instance using the same name.
logger = logging.getLogger('logging-example')
logger.setLevel(logging.DEBUG)
# - Create a file-handler to write log-messages to a file
file_handler = logging.FileHandler('example.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# - Attach handler to logger
logger.addHandler(file_handler)

# - Log some messages to show that it works:
logger.critical('This is a CRITICAL-level message')
logger.debug('This is a DEBUG-level message')
logger.error('This is an ERROR-level message')
logger.info('This is an INFO-level message')
logger.warn('This is a WARNING-level message')

执行时,前面的脚本将生成以下日志输出:

可以设置 PythonLogger对象(由getLogger调用返回)来关注不同优先级的日志消息。从最小到最关键的顺序(从生产系统的角度来看),可用的默认级别(以及它们的一些典型用途)如下所示:

  • DEBUG:记录流程运行时的信息、流程执行的步骤等,以期提供代码执行细节的可视性。
  • INFO:信息项,如请求处理流程的开始和结束时间;可能还有流程本身的细节或指标,比如传递了哪些参数,或者给定的执行是否比预期的时间长,但仍然完成。
  • WARNING:没有阻止流程或操作完成的情况,但由于某些原因可能会受到怀疑,例如完成的时间比预期要长很多。
  • ERROR:在代码执行过程中遇到的实际错误,可能包括详细的追溯信息,这些信息有助于开发人员找出问题错误的实际原因。
  • CRITICAL:记录在正在运行的代码发生严重/致命故障之前截获的信息——实际上导致执行失败的信息。在设计和实现良好的代码中,特别是对于希望始终可用的服务,很少需要这种级别的消息记录。错误将被捕获并记录为ERROR级别的项目,遇到错误后需要进行的任何清理将被执行,指示发生错误的响应将被发送回请求者,服务将继续运行,等待下一个请求。

任何给定级别的消息的实际处理和记录由Logger对象和/或其各种处理程序控制。Logger对象本身不会接受优先级低于其设置优先级的消息。在示例代码中使用logger.setLevel(logging.DEBUG)将允许任何标准消息优先级,而将其更改为logger.setLevel(logging.ERROR)将只允许接受ERRORCRITICAL消息。类似地,处理程序将忽略任何低于其配置为接受的优先级的传入消息–file_handler.setLevel(logging.DEBUG),在前面的示例中。

通过结合代码本身的详细日志记录,包括必要的DEBUG级别的项目,以及允许的消息优先级的一些配置,同一代码可以针对不同的环境微调自己的日志输出。例如:

def some_function(*args, **kwargs):
    logger.info('some_function(%s, %s) called' % (str(args), str(kwargs)))
    if not args and not kwargs:
        logger.warn(
            'some_function was called with no arguments'
        )
    elif args:
        logger.debug('*args exists: %s' % (str(args)))
        try:
            x, y = args[0:2]
            logger.debug('x = %s, y = %s' % (x, y))
            return x / y
        except ValueError as error:
            logger.error(
                '%s: Could not get x and y values from '
                'args %s' % 
                (error.__class__.__name__, str(args))
            )
        except Exception as error:
            logger.error(
                '%s in some_function: %s' % 
                (error.__class__.__name__, error)
            )
    logger.info('some_function complete')

根据logger中设置的日志优先级的不同,此代码集记录以下内容:

The complete script that generates this log information is in the Iteration 3 code, at hms-gateway/scratch-space/logging-examples/logging-example.py.

与 YAML 配置一样,我们将在本章后面构建此日志记录结构,作为构建可重用基本守护程序结构的一部分。

处理请求并生成响应

大多数服务将遵循某种请求-响应过程模型。接收到请求,无论是来自与服务交互的人类用户还是来自某个其他进程;然后,服务读取请求,确定如何处理它,执行所需的任何操作,并生成并返回响应。

至少有三种不同的请求类型非常常见,足以保证进行详细的检查——文件系统、HTTP/web 消息和基于队列的请求——每种类型都有自己关于如何将请求呈现给服务的基线假设,并且每种类型都有自己的设计和执行结果。

为任何给定请求类型生成的响应通常意味着相同基本类型的响应机制。也就是说,来自某个文件系统变量的请求通常会生成一个响应,该响应也表示为某种文件系统输出。这种情况可能并不总是如此,但在许多(也许是大多数)情况下,这种情况很有可能发生。

基于文件系统

来自和到本地文件系统的请求和响应通常(毫不奇怪)与从本地文件读取和写入数据有关。这种类型的最简单的请求和响应结构是一种服务,它从一个文件中读取数据,对其进行处理,并将结果写入另一个文件,可能在每次读取时删除或清除传入文件,或者在每次写入时替换输出文件,或者在生成和返回每个响应时附加到输出文件。单个输入和输出文件的实现可以利用 Python 的sys模块的stdinstdout功能,或者覆盖其中一个(或两个)。

Windows 和 POSIX 操作系统(Linux、macOS)都有特殊的文件类型,称为命名管道,它们驻留在文件系统上,就像文件一样,可以使用标准文件访问代码打开、读取和写入。主要区别在于,命名管道文件可以由多个不同的进程同时打开和写入/读取。然后,它允许任意数量的进程向文件中添加请求,将它们排队等待服务读取和处理。命名管道也可用于服务输出。

另一种变体是监视对本地文件系统中文件的更改,包括创建新文件,以及对给定位置中现有文件的更改(甚至删除)。最基本的是,这将涉及生成和维护一个要跟踪的文件列表,并定期检查这些文件的存在和修改时间的实际文件系统结构。遵循此模式的实现可能有一个公共输入文件目录,并且在主服务循环的每次迭代发生时,它将检查新文件,读取它们,执行,并在处理完成后删除该文件(以便将被监视的文件数量保持在合理的小范围内)。

对于被监视的文件数量大到创建和刷新该列表的计算成本太高而不实用的场景,使用pyinotify库的功能监视文件系统事件是一种可行的替代方案,尽管 POSIX/Linux 和 Windows 版本的库在可用性方面存在差异。

基于 HTTP 或 web 的

顾名思义,基于 HTTP 的服务(web 服务)使用 HTTP 协议接收请求并发送对这些请求的响应。作为网络感知服务的子集,web 服务允许从实际运行服务的机器以外的机器访问服务。Web 服务不必在公共互联网上访问;它们可以完全生活在一个本地网络中,并且在这些边界内也能很好地运行。然而,他们必须遵守一些基本的最低标准,并且可以从遵守其他标准中获益。

这些标准中最重要的可能是遵守 HTTP 协议的请求方法。在网站中最常见的方法,以及任何名副其实的 web 浏览器都支持的方法如下:

  • GET:用于检索数据
  • POST:用于使用连接的有效负载创建数据,即使POST通常用于 web 应用程序中,用于createupdate操作

协议中还有其他几种方法,包括:

  • PUTPATCH:分别用于使用连接的有效载荷全部或部分更新数据

  • DELETE:用于删除数据

  • OPTIONS:旨在提供指示可用方法的数据,特别是可以在接收系统上创建或更改数据的方法,如POSTPUTDELETE请求,尤其是,如果从服务域本身以外的其他地方向服务发出请求,则提供数据

其他可能发挥作用的方法包括HEADCONNECTTRACE。根据服务的设计和实现,每个 HTTP 方法都可以实现为类的特定功能或方法,允许每个请求类型能够执行特定于它的任何要求,同时仍然允许一些用于常见需求的功能,例如为POSTPUTPATCH请求提取有效负载。

web 服务调用的响应(即使是空响应)实际上是必需的;否则,调用客户端将等待请求超时。Web 服务响应仅限于可以通过 HTTP 协议传输的数据类型,这并不十分有限,但可能需要一些额外的开发工作来支持二进制资源响应(例如,图像)。就目前情况而言,在撰写本书时,大多数可以纯文本表示的响应似乎都是以 JSON 数据结构返回的,但 XML、HTML 和纯文本响应也是可能的。

虽然完全可以用 Python 编写一个完整的 web 服务,但是有相当数量的协议相关项可能会更好地由几个库、包或框架中的任何一个来处理,因为这样做会减少要编写、测试和维护的代码量。选项包括但不限于以下内容:

  • 将 web 服务编写为可通过 Apache 或 NGINX web 服务器访问的web 服务器网关接口*WSGI应用程序* ** 使用 Django REST 框架* 对 Flask 框架使用 Flask RESTful 扩展*

*基于 web 服务器和框架的解决方案还将受益于底层 web 服务器和框架软件的安全更新,而无需内部安全审计。

If a web service is expected to be exposed to the public internet, any of these are much better options than writing a service from the ground up, for that reason alone. It won't eliminate the need to be conscious of potential security concerns, but it will reduce the scope of those concerns to the code for the service's functionality itself.

基于消息队列

消息队列系统,如 RabbitMQ 和各种基于云的选项,在某些类型的应用程序中具有一些优势。它们通常允许使用几乎任何消息格式,前提是可以将其表示为文本,并且它们允许消息保持挂起状态,直到显式检索和处理它们,从而确保消息安全并随时可用,直到这些消息的最终消费者准备使用它们。举个例子,考虑下面的场景:

  1. 两个用户通过位于消息队列服务器上的分布式队列向服务发送消息
  2. 用户#1 发送他们的第一条消息
  3. 服务接收并处理该消息,但可能尚未在队列中删除该消息
  4. 由于某种原因重新启动服务—可能是为了将其更新为新版本,或者是因为服务器本身正在重新启动
  5. 在任何情况下,在服务恢复在线之前,用户 2 会发送他们的第一条消息。
  6. 用户#1 发送另一条消息

在目标服务完成启动之前,场景如下所示:

一旦目标服务完成启动,它要完成这些消息中挂起的请求所需做的就是轮询消息队列服务器以检索任何挂起的消息,并对其执行,就像它在重新启动之前所做的那样。

从用户#1 和用户#2 的角度来看,对服务的访问没有中断(尽管在获得回复方面可能有明显的,甚至是明显的延迟)。无论目标服务的非活动期是几秒钟还是几小时,这都是正确的。无论哪种方式,最终用户发送的消息/命令都会被保存,直到可以对它们采取行动为止,因此不会白费力气。

如果对这些请求的响应也通过基于队列的进程传输,则消息的相同持久性也将成立。因此,一旦目标服务生成并发送响应,用户就能够接收到它们,即使他们在发送前一天关机回家。响应消息将等待接收系统再次激活,此时它们将被传递并执行操作。

因此,基于队列的请求和响应周期非常适合于管理日志运行和/或异步进程,前提是作用于消息的代码考虑了这种可能性。

其他请求类型

Python 提供了对足够多的通用网络功能的访问,可以从头开始编写服务来读取和响应所需的几乎任何类型的网络流量。基于 web 和队列的服务类型是该功能的特定应用程序,由其他库在不同程度上提供支持,这些库可以满足特定于每个服务类型的一些需求,如下所示:

  • Web 服务可能至少会利用http.serversocket模块提供的功能;http.server.HTTPServersocketserver.TCPServer类是最有可能的起点,但http.server.ThreadingHTTPServer类也可能是可行的。
  • 基于队列的服务可能有一些库,这些库是专门为与它们所连接的基础队列服务进行交互而构建的,包括:
    • pika,用于 RabbitMQ 队列服务
    • boto3,对于 AWS SQS 服务,从创建boto3.SQS.Client对象开始

没有某种支持库的基于套接字的服务可能从前面列表中提到的socketserver.TCPServer类开始,或者可能从其 UDP 等价物socketserver.UDPServer开始。还有ThreadingForking混合类可用,可用于提供支持服务器线程或(在 POSIX 兼容系统上)分叉的基本服务器类,以处理更大的用户负载级别。

请求和响应格式

从纯技术/功能的角度来看,服务实现可以是数据和格式无关的。也就是说,服务不能接受原始二进制数据输入并返回原始二进制输出,这在功能上是没有原因的。毕竟,数据就是数据。然而,即使在服务真正涉及人类不容易读取的数据的情况下,格式化传入请求和传出响应也有一些优势,可以提供一定程度的人类可读性。至少,它使请求和响应的调试更容易。

在这方面,请求和响应数据有许多关于配置文件需求的问题,如下所示:

  • 能够传递结构化和类型化的数据也是同样有利的
  • 允许数据结构至少在某种程度上被一个普通的读者/观察者理解也是一件好事
  • 表示合理复杂的数据结构、列表和嵌套对象的能力也让人觉得很有优势

考虑到相同类型的关注点,解决它们的类似解决方案是有意义的,这意味着使用序列化格式(如 JSON 或 YAML)也是有意义的。这样做会带来一些额外的开发工作开销;例如,将传入数据从 JSON 转换为本机数据结构,或将出站本机数据结构响应转换为 JSON。不过,这种努力通常是微不足道的。

在这两种格式中,JSON 可以说是更好的通用*-*解决方案。它已经建立了良好的基础,并在更广泛的潜在服务客户机中直接得到支持,这仅仅是因为它本质上是 web 浏览器的本机数据格式。尽管如此,YAML 仍然是一个可行的替代方案,特别是在不需要 web 浏览器客户端支持的情况下。

通用服务设计

鉴于到目前为止我们已经探索过的配置和日志记录的可能性,基本服务即功能(service-as-a-function)方法感觉越来越不可行,除非有理由认为只需要编写一个服务。可以肯定的是,采用这种基本方法仍然是可能的,但是如果需要创建另一个服务,那么如果创建任何服务都有一个共同的起点,那么无论它期望做什么,它都会更加高效(并且至少在某种程度上更有效地利用开发人员的时间)。为此,我们将定义一组抽象基类ABC),这些抽象基类定义了未来任何服务或守护进程所期望的特性和功能的最低公分母,并将其用作 Artisan 网关服务hms_sys的起点。

将服务定义为类而不是函数的基本原理是这样一个事实,即我们可以合理地期望至少有一些属性和方法是所有服务/守护进程所共有的,在简单的基于函数的设计中,这些属性和方法将是困难、乏味和/或难以维护的。这些措施包括:

  • 一个集中式日志记录工具,按照前面介绍的示例日志记录代码构建
  • 很有可能需要跨多个端点访问服务的配置值,使用基于类的设计可能更易于管理
  • 使用所谓的可插拔请求、响应和格式化机制的能力几乎肯定会更容易开发和维护,因为这些机制将由封装所有必要功能的类表示

此处定义的类不利用前面提到的任何可用标准库实体(例如,socketserver.TCPServer的普通、线程或分叉变体)。相反,它们是任何服务的基线起点,至少在一个级别,如果需要,它们可能会将这些服务器类中的任何一个用作额外的混入。在另一个层次上,它们可以被认为是服务类所需的功能类型的纯粹说明,尽管对于某些应用程序来说,它们也可以作为服务类使用。

这些类也是纯同步的*。*他们一次处理一个请求,处理到完成并返回响应,然后获取下一个请求并进行处理。这可能足以满足hms_sys系统项目中预期的低负载场景,但可能不足以满足其他用例,尤其是在涉及实时响应和较高计算成本的过程时。我们将在第 19 章多进程和 Python 中的 HPC中研究处理此类场景的一些选项,同时讨论本地进程扩展选项。

我们将要构建的 ABC 集合如下:

考虑以下事项:

  • BaseDaemon是创建实际提供服务本身的类的起点
  • BaseRequestHandler提供了定义可调用对象的起点,这些对象将用于实际处理传入请求,并将负责使用从BaseResponseFormatter派生的类的实例格式化结果
  • BaseResponseFormatter是一个类似的可调用对象类,它将响应数据结构转换为序列化的字符串值,可以作为队列中的消息、HTTP 响应或最适合特定响应要求的任何其他格式返回

基地守护程序 ABC

不出所料,BaseDaemon的实现从标准 ABC 定义和一些类级属性/常量开始,如下所示:

class BaseDaemon(metaclass=abc.ABCMeta):
"""
Provides baseline functionality, interface requirements, and type-identity for objects that can act as a daemon/service managed by facilities in the local OS 
(like systemd) or by third-party service-configurators (like NSSM)
"""
    ###################################
    #   Class attributes/constants    #
    ###################################

    _handler_classes = {}
    _handler_keys = []

由于日志记录是任何服务的一个关键方面,因此确保某些日志记录参数始终可用是一个好主意。首先,设置存储默认日志记录配置的类级别常量,如下所示:

# - Default logging information
    _logging = {
        'name':None,
        'format':'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        'file':{
            'logfile':None,
            'level':logging.INFO,
        },
        'console':{
            'level':logging.ERROR,
        }
    }

这些默认值由一个通用的_create_logger方法使用,该方法由类作为具体方法提供,以确保日志记录始终可用,但控制日志记录的参数可以被覆盖:

def _create_logger(self):
    """
Creates the instance's logger object, sets up formatting for log-entries, and 
handlers for various log-output destinations
"""
    if not self.__class__._logging.get('name'):
        raise AttributeError(
            '%s cannot establish a logging facility because no '
            'logging-name value was set in the class itself, or '
            'through configuration settings (in %s).' % 
            (self.__class__.__name__, self.config_file)
        )

检查是否指定了记录器名称后,_logging类属性用于定义通用日志输出格式,如下所示:

    try:
        logging_settings = self.__class__._logging
        # - Global log-format
        formatter = logging.Formatter(logging_settings['format'])
        # - The main logger
        self._logger = logging.getLogger(
            logging_settings['name']
        )
        # - By default, the top-level logger instance will accept anything. 
        #   We'll change that to the appropriate level after checking the 
        #   various log-level settings:
        final_level = logging.DEBUG

相同的日志记录设置允许独立控制日志记录的文件和控制台输出。基于文件的日志输出需要logfile规范,并允许独立的level

        if logging_settings.get('file'):
            # - We're logging *something* to a file, so create a handler 
            #   to that purpose:
            if not self.__class__._logging['file'].get('logfile'):
                raise AttributeError(
                    '%s cannot establish a logging facility because no '
                    'log-file value was set in the class itself, or '
                    'through configuration settings (in %s).' % 
                    (self.__class__.__name__, self.config_file)
                )
            # - The actual file-handler
            file_handler = logging.FileHandler(
                logging_settings['file']['logfile']
            )
            # - Set the logging-level accordingly, and adjust final_level
            file_handler.setLevel(logging_settings['file']['level'])
            final_level = min(
                [
                     logging_settings['file']['level'],
                     final_level
                ]
            )
            # - Set formatting and attach it to the main logger:
            file_handler.setFormatter(formatter)
            self._logger.addHandler(file_handler)

在创建和附加每个日志输出时,日志级别用于重置final_level值,这将最终允许设置过程微调输出所附加到的记录器对象的日志级别。控制台记录器输出设置看起来与文件记录器输出基本相同,但不需要文件名:

    if logging_settings.get('console'):
        # - We're logging *something* to the console, so create a 
        #   handler to that purpose:
        # - The actual console-handler
        console_handler = logging.StreamHandler()
        # - Set the logging-level accordingly, and adjust final_level
        console_handler.setLevel(
            logging_settings['console']['level']
        )
        final_level = min(
            [
                 logging_settings['console']['level'],
                 final_level
            ]
         )
        # - Set formatting and attach it to the main logger:
        console_handler.setFormatter(formatter)
        self._logger.addHandler(console_handler)
        # - For efficiency's sake, use the final_level at the logger itself. 
        #   That should (hopefully) allow logging to run (trivially) 
        #   faster, since it'll know to skip anything that isn't handled by 
        #   at least one handler...
        self._logger.setLevel(final_level)

为了确保日志记录始终可用,到目前为止,所有设置都以try…except结构执行。如果在设置日志记录的过程中出现任何错误,则会引发最终的RuntimeError,目的是停止所有执行,因此必须修复导致日志记录失败的原因:

except Exception as error:
    raise RuntimeError(
        '%s could not complete the set-up of its logging '
        'facilities because %s was raised: %s' % 
            (
                self.__class__.__name__, error.__class__.__name__, 
                error
            )
    )
# - Log the fact that we can log stuff now :-)
    self.info(
        'Logging started. Other messages may have been output to '
        'stdout/terminal prior to now'
    )

创建实例的logger对象属性后,记录任何消息只需调用实例的各种记录方法之一。这些方法——criticaldebugerrorinfowarn——看起来或多或少都很相似,它们将以适当的优先级将提供的消息写入各个记录器输出,或者在尚未创建logger的情况下返回打印消息:

###################################
#        Logging methods          #
###################################

def critical(self, msg, *args, **kwargs):
    if self.logger:
        self.logger.critical(msg, *args, **kwargs)
    else:
        print('CRITICAL - %s' % msg)

def debug(self, msg, *args, **kwargs):
    if self.logger:
        self.logger.debug(msg, *args, **kwargs)
    else:
        print('DEBUG    - %s' % msg)

在大多数情况下,类的属性是早期代码中使用的结构和模式的典型属性,典型的类型和值检查附加到它们的相关 setter 方法:

    ###################################
    #  Instance property definitions  #
    ###################################

    config_file = property(
        _get_config_file, None, None, 
        'Gets the configuration-file used to set up the instance'
    )
    logger = property(
        _get_logger, None, None, 
        'Gets the logger for the instance'
    )

config_file属性的 setter 方法值得仔细研究,因为它执行一些检查,以确保传递的值是可读文件:

def _set_config_file(self, value:(str,)):
    if type(value) != str:
        raise TypeError(
            '%s.config_file expects a string value that points '
            'to a readable configuration-file on the local file-'
            'system, but was passed "%s" (%s)' % 
            (self.__class__.__name__, value, type(value).__name__)
        )
    if not os.path.isfile(value):
        if type(value) != str:
            raise TypeError(
                '%s.config_file expects a string value that '
                'points to a readable configuration-file on the '
                'local file-system, but was passed "%s" (%s), '
                'which is not a file' % 
                (
                    self.__class__.__name__, value, 
                    type(value).__name__
                )
            )
    if not os.access(value, os.R_OK):
        if type(value) != str:
            raise TypeError(
                '%s.config_file expects a string value that '
                'points to a readable configuration-file on the '
                'local file-system, but was passed "%s" (%s), '
                'which is not a READABLE file' % 
                (
                    self.__class__.__name__, value, 
                    type(value).__name__
                )
            )
    self.debug(
        '%s.config_file set to %s' % (self.__class__.__name__, value)
    )
    self._config_file = value

一旦配置文件被验证为可以使用,就可以调用类提供的另一个具体方法configure来读取并将其应用于类的实例。configure方法负责读取文件,将其转换为公共数据结构,并将其传递给一个必需/抽象方法,该方法实际将配置数据应用于实例:_on_configuration_loaded

这种责任划分允许单一的通用方法configure始终可用,同时允许抽象任何给定类的特定需求,并将其作为派生类_on_configuration_loaded的责任:

def configure(self):
    """
Reads the instance's configuration-file, converts it to a dictionary of values, then hands the responsibility for actually configuring the instance off to its required _on_configuration_loaded method
"""
    try:
        self.info('Loading configuration for %s' % self.__class__.__name__)
    except RuntimeError:
        # - This should only happen during start-up...
        print('Loading configuration for %s' % self.__class__.__name__)
    try:
        fp = open(self.config_file, 'r')
        config_data = yaml.load(fp)
        fp.close()
    except Exception as error:
        raise RuntimeError(
            '%s.config could not read configuration-data from '
            '%s, %s was raised: %s' % 
            (
                self.__class__.__name__, config_file, 
                error.__class__.__name__, error
            )
        )
    # - With the configuration read, it's time to actually 
    #   configure the instance
    self._on_configuration_loaded(**config_data)

_on_configuration_loaded方法可以包含一些其他类可以选择使用的具体代码,如下所示:

@abc.abstractmethod
def _on_configuration_loaded(self, **config_data):
    """
Applies the configuration to the instance. Since there are configuration values that may exist for any instance of the class, this method should be called by derived classes in addition to any local configuration.
"""
    if config_data.get('logging'):
        # - Since the class' logging settings are just a dict, we can 
        #   just update that dict, at least to start with:
        self.__class__._logging.update(config_data['logging'])
        # - Once the update is complete, we do need to change any logging-
        #   level items, though. We'll start with the file-logging:
        file_logging = self.__class__._logging.get('file')
        if file_logging:
            file_level = file_logging.get('level')
            if not file_level:
                file_logging['level'] = logging.INFO
            elif type(file_level) == str:
                try:
                    file_logging['level'] = getattr(
                        logging, file_level.upper()
                    )
                except AttributeError:
                    file_logging['level'] = logging.INFO
        # - Similarly, console-logging
        console_logging = self.__class__._logging.get('console')
        if console_logging:
            console_level = console_logging.get('level')
            if not console_level:
                console_logging['level'] = logging.INFO
            elif type(console_level) == str:
                try:
                    console_logging['level'] = getattr(
                        logging, console_level.upper()
                    )
                except AttributeError:
                    console_logging['level'] = logging.INFO

如果使用此标准配置,它将查找可能类似以下内容的 YAML 配置文件:

logging:
    console:
        level: error
    file:
        level: debug
        logfile: /var/log/daemon-name.log
    format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    name: daemon-name

值得注意的是,各种配置方法可以很好地处理日志设置,并且需要在日志记录完成之前记录消息。这就是为什么前面显示的日志记录方法具有返回打印功能的原因。

刚才显示的默认实现正是这样做的。这说明了创建BaseDaemon实例时执行的所有代码。初始化本身非常基本,但其中有几个新的值得注意的项目,如下所示:

def __init__(self, config_file:(str,)):
    """
Object initialization.
self .............. (BaseDaemon instance, required) The instance to 
                    execute against
config_file ....... (str, file-path, required) The location of the 
                    configuration-file to be used to configure the 
                    daemon instance
"""
    # - Call parent initializers if needed
    # - Set default instance property-values using _del_... methods
    self._del_config_file()
    self._del_logger()
    # - Set instance property-values from arguments using 
    #   _set_... methods
    self._set_config_file(config_file)
    # - Perform any other initialization needed
    # - Read configuration and override items as needed
    self.configure()
    # - Set up logging
    self._create_logger()
    # - Set up handlers to allow graceful shut-down
    signal.signal(signal.SIGINT, self.stop)
    signal.signal(signal.SIGTERM, self.stop)
    self.debug(
        'SIGINT and SIGTERM handlers for %s created' % 
        (self.__class__.__name__)
    )
    # - Set up the local flag that indicates whether we're expected 
    #   to be running or not:
    self._running = False

首先要注意的是对signal.signal()的调用。它们使用 Python 的signal模块来设置信号事件处理过程,这样,如果类的运行实例在操作系统级别被杀死或在终端会话中被中断,它就不会立即死亡。相反,这些调用捕获操作系统发出的终止(SIGTERM和中断(SIGINT信号,并允许运行的代码在终止执行之前对它们作出反应。在这种情况下,它们都调用实例的stop方法,这使服务实例有机会通知其main循环终止,从而允许正常关闭。

实现该功能最简单的方法是拥有一个实例值(在本例中为self._running),该值由服务的主循环用于确定是否继续。该标志值在上一个__init__方法的末尾设置。

尽管服务类的main循环方法是该类最重要的方面(没有它,服务实际上不会做任何事情),但main循环是特定于派生类的。它是必需的,但不能在 ABC 级别真正实现,因此它是一种抽象方法,如下所示:

@abc.abstractmethod
def main(self):
    """
The main event-loop (or whatever is equivalent) for the service instance.
"""
    raise NotImplementedError(
        '%s.main has not been implemented as required by '
        'BaseDaemon' % (self.__class__.__name__)
    )

为了允许在服务启动之前和终止之后需要启动的进程,我们为每个preflightcleanup提供了具体的方法。这些方法是具体的,而不是抽象的,因此它们总是可用的,但可以根据需要进行重写。它们只会记录在默认实现中调用它们的情况:

    def cleanup(self):
        """
Performs whatever clean-up actions/activities need to be executed after the main process-loop terminates. Override this in your daemon-class if needed, otherwise it can be left alone.
"""
        self.info('%s.cleanup called' % (self.__class__.__name__))

    def preflight(self):
        """
Performs whatever pre-flight actions/activities need to be executed before starting the main process. Override this in your daemon-class if needed, otherwise it can be left alone.
"""
        self.info('%s.preflight called' % (self.__class__.__name__))

preflight方法可能有助于实现reload方法(一个在不停止服务实例的情况下,在恢复之前重新获取任何本地的、可能更改的数据的过程),以使服务从中受益。

最后,服务实例需要能够通过单个简单的命令启动、停止,甚至重新启动。与这些方法相对应的方法非常简单,如下所示:

def start(self):
    """
Starts the daemon/service that the instance provides.
"""
    if self._running:
        self.info(
            '%s instance is already running' %     (self.__class__.__name__)
        )
        return
    self.preflight()
    self.info('Starting %s.main' % self.__class__.__name__)
    self.main()
    self.cleanup()

def stop(self, signal_num:(int,None)=None, frame:(FrameType,None)=None):
    """
Stops the daemon-process. May be called by a signal event-handler, in which case the signal_num and frame values will be passed. Can also be called directly without those argument-values.

signal_num ........ (int, optional, defaults to None) The signal-number, if any, that prompted the shutdown.
frame ............. (Stack-frame, optional, defaults to None) The associated stack-frame.
"""
    self.info('Stopping %s' % self.__class__.__name__)
    self.debug('+- signal_num ... %s' % (signal_num))
    self.debug('+- frame ........ %s' % (frame))
    self._running = False

def restart(self):
    """
Restarts the daemon-process by calling the instance's stop then start methods. This may not be directly accessible (at least not in any useful fashion) outside the running instance, but external daemon/service managers should be able to simply kill the running process and start it up again.
"""
    self.info('Restarting %s' % self.__class__.__name__)
    self.stop()
    self.start()

此类使用了几个需要包含的包/库,因此我们必须确保将它们包含在类所在的模块中,如下所示:

#######################################
#   Standard library imports needed   #
#######################################

import atexit
import logging
import os
import signal
import yaml

from types import FrameType    # Used by the signal handlers 

有了这段代码,创建一个新的服务类(相当于本章开头简单的基于函数的示例)非常简单:

class testdaemon(BaseDaemonizable):
    def _on_configuration_loaded(self, **config_data):
        try:
            BaseDaemonizable._on_configuration_loaded(self, **config_data)
            self.info('%s configuration has been loaded:' % 
                (self.__class__.__name__)
            )
        except Exception as error:
            self.error(‘%s: %s' % (error.__class__.__name__, error))
    def main(self):
        iteration = 0
        self._running = True
        self.info('Starting main daemon event-loop')
        while self._running:
            iteration += 1
            msg = 'Iteration %d' % iteration
            self.info(msg)
            sleep(10)
        self.info('%s main loop terminated' % (self.__class__.__name__))

下面的屏幕截图显示了从启动testdaemon到在几次迭代后终止testdaemon的输出和记录的消息。它显示了我们期望从现有代码中获得的所有行为:

这个基本的服务不使用任何请求处理程序类,只是太简单了,不需要它们,但是一个更现实的服务实现几乎肯定需要这种能力。每个处理程序类都需要在服务实例启动之前注册,并且需要一种方法来关联传入请求中的某些属性或值,以标识要创建的处理程序类,从而生成对请求的响应。

在执行过程中,当请求传入时,必须检查这些请求,以确定用于创建实例的处理程序类的键。然后,可以将执行交给该实例来创建响应。

处理程序类注册过程并不困难,但其中有相当数量的类型和值检查,以避免以后出现错误、不明确或冲突的结果。它被实现为一个类方法,这样在服务实例化之前,就可以建立键(端点、命令、消息类型或应用于传入请求的任何内容)与这些键后面的处理程序类之间的关联:

    @classmethod
    def register_handler(cls, handler_class:(type,), *keys):
        """
Registers a BaseRequestHandler *class* as a candidate for handling 
requests for the specified keys
"""
        if type(handler_class) != type \
            or not issubclass(handler_class, BaseRequestHandler):
            raise TypeError(
                '%s.register_handler expects a *class* derived from '
                'BaseRequestHandler as its handler_class argument, but '
                'was passed "%s" (%s), which is not such a class' % 
                (cls.__name__, value, type(value).__name__)
            )
        if not keys:
            raise ValueError(
                '%s.register_handler expects one or more keys, each '
                'a string-value, to register the handler-class with, '
                'but none were provided' % (cls.__name__)
            )
        # - Check for malformed keys
        bad_keys = [
            key for key in keys
            if type(key) != str or '\n' in key or '\r' in key
            or '\t' in key or key.strip() != key or not key.strip()
        ]
        if bad_keys:
            raise ValueError(
                '%s.register_handler expects one or more keys, each a '
                'single-line, non-empty string-value with no leading '
                'or trailing white-space, and no white-space other '
                'than spaces, but was passed a list including %s, '
                'which do not meet these criteria' % 
                (cls.__name__, '"' + '", "'.join(bad_keys) + '"')
            )
        # - Check for keys already registered
        existing_keys = [
            key for key in keys if key in cls._handler_classes.keys()
        ]
        if existing_keys:
            raise KeyError(
                '%s.register_handler is not allowed to replace handler-'
                'classes already registered, but is being asked to do '
                'so for %s keys' % 
                (cls.__name__, '"' + '", "'.join(existing_keys) + '"')
            )
        # - If this point is reached, everything is hunky-dory, so add 
        #   the handler_class for each key:
        for key in keys:
            cls._handler_classes[key] = handler_class

查找要实例化以处理给定请求、给定密钥的类的过程也不难;请参阅以下代码:

def find_request_handler(self, key:(str,)):
    """
Finds a registered BaseRequestHandler class that is expected to be able 
to handle the request signified by the key value, creates an instance 
of the class, and returns it.
"""
    # - Set up the _handler_keys if it hasn't been defined yet. 
    #   The goal here is to have a list of registered keys, sorted from 
    #   longest to shortest so that we can match based on the 
    #   longest registered key/path/command-name/whatever that 
    #   matches the incoming value:
    if not self.__class__._handler_keys:
        self.__class__._handler_keys = sorted(
            self.__class__._handler_classes.keys(),
            key=lambda k: len(k), 
            reverse=True
        )
    # - Find the first (longest) key that matches the incoming key:
    for candidate_key in self.__class__._handler_keys:
        if candidate_key.startswith(key):
        # - If we find a match, then create an instance of 
        #   the class and return it
            result = self.__class__._handler_classes[candidate_key]
            return result(self)
    return None

此方法将返回它可以找到的第一个类的实例,该实例与传入的请求密钥匹配,并返回它可以找到的最长密钥匹配,以便允许同一个类处理多个密钥,并(希望)消除密钥匹配错误的可能性。考虑一个 Web 服务,该服务与具有下级的 Ty1Ty 对象的 AutoT0.对象交互,允许使用以下路径访问这些客户端:

  • /client/{client_id}:使用client_handler对象处理请求
  • /client/{client_id}/client/{subordinate_id``}:使用subordinate_handler对象处理请求

为了确保应该由subordinate_handler处理的请求不会意外地获取和使用client_handler,匹配过程从最长到最短对端点键列表进行迭代,首先匹配较长的端点键,然后返回相应的类。

BaseRequestHandler 和 BaseResponseFormatter ABCs

如果没有从这些类派生的具体实现,它们就没有什么意义。它们使用本书中为其属性所使用的相同标准属性结构,以及典型的类型检查。他们提出的唯一新概念是抽象(这并不是什么新鲜事)和利用 Python 的__call__魔术方法的结合。

We'll look at these classes (indirectly, at least) when the concrete implementations derived from them are created for the hms_sys Artisan Gateway Service, in the next chapter.

当一个类有一个__call__方法时,可以像调用函数一样调用该类的实例,并在__call__方法本身的签名中定义所需的参数。实际上,可调用类实例可以看作是可配置函数。可调用类的每个实例都可以具有完全不同的状态数据,这些数据在其自身范围内保持一致。作为一个简单的例子,考虑下面的代码:

class callable_class:
    def __init__(self, some_arg, some_other_arg):
        self._some_arg = some_arg
        self._some_other_arg = some_other_arg

    def __call__(self, arg):
        print('%s(%s) called:' % (self.__class__.__name__, arg))
        print('+- self._some_arg ......... %s' % (self._some_arg))
        print('+- self._some_other_arg ... %s' % (self._some_other_arg))

假设我们创建了一个实例,并将其称为:

instance1 = callable_class('instance 1', 'other arg')
instance1('calling instance 1')

然后我们将获得以下输出:

我们可以创建其他实例并调用它们,而不会影响第一个实例的结果:

instance2 = callable_class('instance 2', 'yet other arg')
instance2('calling instance 2')

上述代码产生以下结果:

通过抽象这两个类的__call__方法,我们有效地要求它们实现一个__call__方法,该方法允许调用每个实例,就像调用函数一样,同时允许每个实例访问该类的任何实例可用的属性和方法。

将其应用于BaseRequestHandler,意味着每个实例将直接引用daemon实例及其所有日志记录设施、其startstoprestart方法以及原始配置文件;因此,以下规定将适用:

  • 在处理请求时,请求处理程序实例不必做任何非常复杂的事情来记录流程细节

  • 单个请求处理程序的配置是可行的,甚至可以存在于守护进程本身使用的同一配置文件中,尽管目前仍需要读取配置并对其执行操作

  • 可以编写一个或多个处理程序(适当谨慎,包括身份验证和授权),以允许服务请求重新启动服务

在服务实例本身级别具有更多/其他功能的其他服务守护进程也可以提供每个端点都可以访问的公共功能。因此,从结构上讲,使用一整套请求处理程序和响应格式化程序对象的服务将包含以下内容:

  • 源于BaseDaemon的单个服务实例,具有以下特性:
    • 一对多BaseRequestHandler派生类已注册并可用于实例化和调用,以响应传入请求,每个派生类可以依次创建和调用多个BaseResponseFormatter派生类的实例,以生成最终输出数据
  • 通过main的实现,根据这些类的注册,确定为每个请求创建和调用哪个类。

Artisan 网关服务的请求-响应周期的流程(使用 Artisan 和产品交互的请求处理程序以及响应格式化程序实现)可能如下所示:

一步一步地:

  1. Artisan 网关服务发送请求
  2. 服务根据请求中预定义的context确定Artisan 处理程序类应该实例化并调用
  3. 该处理程序知道它需要生成 JSON 输出,因此,在执行生成可格式化响应所需的任何处理后,它获取一个JSON 格式化程序实例,并调用该实例生成最终的响应
  4. 响应返回给Artisan 处理程序
  5. Artisan 处理程序响应返回给Artisan 网关服务
  6. Artisan 网关服务响应返回给请求的发起人

这一过程的大部分取决于BaseRequestHandlerBaseResponseFormatter类没有提供的具体实现。如上图所示,它们非常简单。BaseRequestHandler以标准抽象类结构开始,如下所示:

class BaseRequestHandler(metaclass=abc.ABCMeta):
    """
Provides baseline functionality, interface requirements, and 
type-identity for objects that can process daemon/service requests, 
generating and returning a response, serialized to some string-based 
format.
"""

每个派生类都可以有一个与之关联的默认格式化程序类,因此该类实例的最终调用不需要指定格式化程序,如下所示:

    ###################################
    #    Class attributes/constants   #
    ###################################

    _default_formatter = None

请求处理程序可以从访问它们所创建的服务/守护进程实例中获益。如果没有其他内容,则允许处理程序类使用守护程序的日志记录功能。因此,我们将跟踪该守护进程作为实例的属性,如下所示:

    ###################################
    #    Property-getter methods      #
    ###################################

    def _get_daemon(self) -> (BaseDaemon,):
        return self._daemon
    ###################################
    #    Property-setter methods      #
    ###################################

    def _set_daemon(self, value:(BaseDaemon,)) -> None:
        if not isinstance(value, BaseDaemon):
            raise TypeError(
                '%s.daemon expects an instance of a class derived '
                'from BaseDaemon, but was passed "%s" (%s)' % 
                (self.__class__.__name__, value, type(value).__name__)
            )
        self._daemon = value

    ###################################
    #    Property-deleter methods     #
    ###################################

    def _del_daemon(self) -> None:
        self._daemon = None

    ###################################
    #  Instance property definitions  #
    ###################################

    daemon = property(
        _get_daemon, None, None, 
        'Gets, sets or deletes the daemon associated with the instance'
    )

实例的初始化必须提供一个参数来设置实例的daemon属性,但除此之外没有其他内容:

    ###################################
    #     Object initialization       #
    ###################################

    def __init__(self, daemon:(BaseDaemon,)):
        """
Object initialization.
self .............. (BaseRequestHandler instance, required) The 
                    instance to execute against
daemon ............ (BaseDaemon instance, required) The daemon that the 
                    request to be handled originated with.
"""
# - Set default instance property-values using _del_... methods
        self._del_daemon()
# - Set instance property-values from arguments using 
#   _set_... methods
        self._set_daemon(daemon)

由于 ABC 的全部要点是要求创建实例的服务可以调用实例,因此我们需要一个__call__方法。无论何时调用实例,它都会有一个需要处理和响应的传入请求。允许传递一个可以覆盖默认的formatter类型(指定为类属性)的formatter也是一个好主意。在编写处理程序类的具体实现时,需要考虑如何处理类没有指定formatter类型,并且调用本身没有提供formatter类型的情况。不过,这在不同的请求类型中可能会有很大的差异,因此目前深入研究这一问题没有什么意义:

    ###################################
    #        Abstract methods         #
    ###################################

    @abc.abstractmethod
    def __call__(self, request:(dict,), formatter=None) -> (str,):
"""
Makes the instance callable, providing a mechanism for processing the 
supplied request, generating a data-structure containing the response 
for the request, formatting that response, and returning it.
self .............. (BaseRequestHandler instance, required) The instance to execute against
request ........... (dict, required) The request to be handled
formatter ......... (BaseResponseFormatter instance, optional, if not 
"""
        pass

BaseResponseFormatterABC 也开始作为一个标准的抽象类。它还使用相同的daemon属性,并添加一个request_handler属性,该属性使用类似的 setter 方法,允许格式化程序实例访问创建它的请求实例,以及接收请求的守护进程实例:

    def _set_request_handler(self, value:(BaseRequestHandler,)) -> None:
        if not isinstance(value, BaseRequestHandler):
            raise TypeError(
                '%s.request_handler expects an instance of a class '
                'derived from BaseRequestHandler, but was passed '
                '"%s" (%s)' % 
                (self.__class__.__name__, value, type(value).__name__)
            )
        self._request_handler = value

因此,创建实例时需要使用request_handler,原因与需要使用daemon的原因大致相同:

    def __init__(self, 
        daemon:(BaseDaemon,), 
        request_handler:(BaseRequestHandler,),
    ):
"""
Object initialization.

self .............. (BaseResponseFormatter instance, required) The 
                    instance to execute against
daemon ............ (BaseDaemon instance, required) The daemon that the 
                    request to be handled originated with.
request_handler ... (BaseRequesthandler instance, required) The request-handler object associated with the instance.
"""
        # - Set default instance property-values using _del_... methods
        self._del_daemon()
        self._del_request_handler()
        # - Set instance property-values from arguments using 
        #   _set_... methods
        self._set_daemon(daemon)
        self._set_request_handler(request_handler)

最后,与BaseRequestHandler一样,我们需要一个__call__方法由任何派生类实现:

    @abc.abstractmethod
    def __call__(self, response:(dict,)) -> (str,):
        """
Makes the instance callable, providing a mechanism for formatting a 
standard response-dictionary data-structure.

self .............. (BaseRequestHandler instance, required) The 
                    instance to execute against
response .......... (dict, required) The response to be formatted
"""
        pass

一般来说,这样简单的类(特别是如果它们是具体的类)(只有一个方法,加上它们的初始值设定项,__init__)并不是最好的实现方法。具有单个方法的类通常可以作为单个函数处理,即使该函数具有更复杂的参数集。随着具体实现的进展,格式化程序类很可能最终属于这一类。如果他们这样做了,将它们重构成(希望是简单的)函数,但现在,BaseResponseFormatter将保持不变,正如它所写的那样。

BaseRequestHandlerABC 不太关心这个计数。与不同后端数据对象交互的请求可以分组到这些对象类型的处理程序中;例如,一个ArtisanHandler用于工匠,一个ProductHandler用于产品。当__call__方法处理请求时,预计这些处理程序中的每一个都至少会有用于调用各种 CRUD 操作的方法,这不是一个很大的延伸,但在特定用例和服务上下文中会出现其他需求,如下所示:

  • 在 web 服务上下文中,可能有多达五种附加方法需要实现——分别用于HEADCONNECTOPTIONSTRACEPATCHHTTP 方法
  • 在没有像 web 服务的 HTTP 方法这样严格定义的操作集的服务上下文中,甚至有更多的可能需要额外的方法,甚至每个业务流程都需要支持请求

即使有这些复杂程度,实现处理请求/响应周期的功能也是可行的。它们只是更大、更复杂的功能,具有很强的潜力,更难以长期更改或维护。

将服务与操作系统集成

在进入具体功能之前,服务实现难题的最后一个重要部分是获取一个用 Python 编写的服务程序,以便在操作系统级别作为服务实际执行。毫不奇怪,该过程的细节在不同的操作系统中有所不同(甚至在某种程度上,在某些操作系统的不同版本中有所不同,尤其是 Linux),但有一些常见操作必须全面解决,如下所示:

  • 服务需要在其运行的机器启动时启动
  • 当服务运行的机器断电或重新启动时,服务需要正常停止
  • 服务需要能够重新启动(这通常只是一个先停止再启动的过程)

一些服务模型还可能受益于能够在不中断服务访问的情况下重新加载其数据和/或配置,特别是在重新启动后发生的等效重新加载过程非常耗时的情况下。对于特定场景,可能还有其他有用的操作。

对这些机制的探索将使用前面显示的testdaemon类。

使用 systemctl(Linux)运行服务

Linux 发行版正在从旧的 SystemV 风格的启动过程转移到更新的机制,systemd守护进程及其相关的systemctl命令行工具。由systemd/systemctl管理的服务至少需要一个定义启动和关闭流程的配置文件、一个控制操作系统如何处理这些流程的类型定义,以及启动或停止服务流程所需的任何可执行文件。一个简单的testdaemon.service配置文件可以如下所示:

[Unit]
Description=testdaemon: a simple service example written in Python

[Service]
Type=forking
ExecStart=/usr/bin/python /usr/local/bin/testdaemon.py
ExecStop=/usr/bin/pkill -f testdaemon.py

在上述代码中,以下内容适用:

  • Unit/Description条目只是对服务的简短描述,通常只不过是一个名称。

  • Service/Type定义systemd守护进程将如何处理启动过程。在这种情况下,执行将被分叉,这样无论调用它的进程如何都不再与之关联,并且可以在不停止服务本身的情况下终止。

  • Service/ExecStart定义了启动服务的流程,在本例中,通过将testdaemon.py文件作为 Python 脚本执行。

  • Service/ExecStop定义了一个用于停止服务的进程,在本例中,通过以testdaemon.py为名称终止所有进程。

假设实际的testdaemon类可以从某个已安装的包中导入,那么启动服务的testdaemon.py脚本可以简单如下:

#!/usr/bin/env python

# - Import the service-class
    from some_package import testdaemon
# - The location of the config-file
    config_file = '/path/to/config.yaml'
# - Create an instance of the service class
    d = testdaemon(config_file)
# - Start it.
    d.start()

在这两个文件都已就绪的情况下,从命令行启动、重新启动和停止服务的命令分别如下所示:

systemctl start testdaemon.service

systemctl restart testdaemon.service

systemctl stop testdaemon.service

必须启用systemd管理的服务才能在引导时启动,如下所示:

systemctl enable testdaemon.service

前面的命令要求将安装规范添加到相应的systemd``.service文件中,如下所示:

...
ExecStop=/usr/bin/pkill -f testdaemon.py

[Install]
WantedBy=multi-user.target

systemd服务配置还有许多其他选项可用,但这些基本设置允许使用标准命令行工具自动启动和管理服务。

使用 NSSM(Windows)运行服务

在 Windows 机器上安装 Python 编写的服务最简单的方法是使用非吸吮式服务管理器NSSM)。NSSM 提供了一种简单的方法来包装特定的可执行文件(在本例中为主python.exe文件)以及参数(即testdaemon.py脚本),并使其作为 Windows 服务可用。使用nssm install启动 NSSM 将提供一个窗口,其中包含基本服务设置所需的所有字段,如下所示:

单击“安装服务”按钮后,该服务在 Windows 服务管理器中可用,如果需要,可以在其中更改其启动类型以及所有其他标准 Windows 服务设置和属性:

也可以通过运行nssm install <service-name>对 NSSM 创建的服务属性进行更改,它显示了用于创建服务条目的相同 UI。

If an NSSM-packaged service fails to start, it will log useful information to the standard Windows Event Log; debugging startup problems should start there. Odds are good that if there are any issues, they will be permissions-related, such as the service's account not having access to the script file, a configuration file, and so on.

macOS、launchd 和 launchctl

Macintosh 操作系统macOS)实际上是一种 Unix 变体,因此在许多方面,与 Linux 和 Windows 服务安装相比,问题或差异会更少。macOS 分别提供了与systemdsystemctl近似等价的launchdlaunchctl程序。它们至少提供了相同类型的服务启动和关闭控制功能,并根据各种系统事件提供了许多进程服务流程的附加选项。

Disclaimer: While writing this book, no macOS machine was available to test with, so, while this section should be complete and usable as it stands, there may be issues that weren't identified before publication

launchd基本兼容的服务配置文件需要包含服务标签、服务启动时执行的程序以及程序需要的任何参数:正是systemd需要的,尽管launchd托管服务的配置文件是 XML 文件。一个基本的起始点配置,使用testdaemon.py作为脚本来启动实际的服务对象,并提供 run at load 和 keep alive 控件,如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
    <dict>
        <key>Label</key>
        <string>testdaemon</string>
        <key>Program</key>
        <string>/path/to/python</string>
        <key>ProgramArguments</key>
        <string>/path/to/testdaemon.py</string>
        <key>RunAtLoad</key>
        <true/>
        <!-- 
            A very basic keep-alive directive. There may be better options:
            See "SuccessfulExit" and "Crashed" subkeys
        -->
        <key>KeepAlive</key>
        <true/>
    </dict>
</plist>

该配置一旦位于launchd文件的一个标准位置,就可以分别启动、重新启动和停止服务,如下所示:

launchctl start testdaemon.service

launchctl restart testdaemon.service

launchctl stop testdaemon.service

管理其他系统上的服务

如前所述,尽管 Linux 系统中管理服务进程的当前趋势是朝着systemd/systemctl方向发展,但可能有一些操作系统仍然使用 System V 风格的初始化脚本。此类脚本的基本起点如下所示:

#!/bin/sh

# - The action we're concerned with appears as $1 in a standard 
#   bash-script
    case $1 in
        start)
            echo "Starting $0"
            /usr/bin/python /usr/local/bin/testdaemon.py
            ;;
        stop)
            echo "Stopping $0"
            /usr/bin/pkill -f testdaemon.py
            ;;
        restart)
            echo "Restarting $0"
            /usr/bin/pkill -f testdaemon.py
            /usr/bin/python /usr/local/bin/testdaemon.py
            ;;
    esac

在 SystemV 管理的上下文中,服务本身必须负责确保它与任何称之为它的进程(终端会话或操作系统本身的启动进程)分离。否则,服务流程可能只是启动,然后在实际执行任何操作之前终止。

Since this scenario should be less and less common as time goes on, but is still possible, there is a class in the daemons module, BaseDaemonizable, that handles daemonizing a service class instance, including writing the process ID (PID) to a file in a known location, in case that's needed for some part of a service process. Deriving a service class from that, instead of BaseDaemon, should take care of the majority of the different needs, while still preserving the BaseDaemon structure.

总结

本章中创建的服务基础应该为几乎所有服务提供一个坚实、通用的起点,尽管特定用例可能需要调整结构或覆盖现有功能。有了基础,就可以在hms_sys中实际创建 Artisan 网关服务,这将在下一章连接 Artisan 和中央办公室数据流。*