Skip to content

Latest commit

 

History

History
1207 lines (823 loc) · 50.7 KB

File metadata and controls

1207 lines (823 loc) · 50.7 KB

八、Python——架构模式

架构模式是软件模式万神殿中最高级别的模式。架构模式允许架构师指定应用的基本结构。为给定的软件问题选择的架构模式控制其其余活动,例如所涉及的系统设计、系统不同部分之间的通信等。

根据手头的问题,有许多架构模式可供选择。不同的模式解决不同类别或系列的问题,创建自己的风格或架构类别。例如,某类模式解决了客户机/服务器系统的架构,其他一些模式帮助构建分布式系统,第三类模式帮助设计高度解耦的对等系统。

在本章中,我们将讨论并重点讨论 Python 世界中经常遇到的几种架构模式。我们在本章中讨论的模式将是采用一种著名的架构模式,并探索一个或两个实现该模式的流行软件应用或框架,或其变体。

在本章中,我们将不讨论很多代码。代码的使用将仅限于那些使用程序进行说明是绝对必要的模式。另一方面,大部分讨论将集中在架构细节、参与子系统、所选应用/框架实现的架构的变化等方面。

我们可以查看任意数量的架构模式。在本章中,我们将重点介绍 MVC 及其相关模式、事件驱动编程架构、微服务架构以及管道和过滤器。

本章将介绍以下主题:

  • 介绍 MVC:

    • 模型视图模板–Django
    • 烧瓶微框架
  • 事件驱动编程:

    • 使用 select 聊天服务器和客户端

    • 事件驱动与并发编程

    • Twisted

      Twisted 聊天服务器和客户端

    • Eventlet

      Eventlet 聊天服务器

    • Greenlets and gevent

      Gevent 聊天服务器

  • 微服务架构:

    • Python 中的微服务框架
    • 微服务示例
    • 微服务优势
  • 管道和过滤器架构:

    • Python 中的管道和过滤器–示例

引入 MVC

Model ViewController 或 MVC 是构建交互式应用的著名且流行的架构模式。MVC 将应用分为三个组件:模型、视图和控制器。

Introducing MVC

模型-视图-控制器(MVC)架构

三个组件履行以下职责:

  • 模型:该模型包含应用的核心数据和逻辑。
  • 视图:视图构成应用对用户的输出。它们向用户显示信息。相同数据的多个视图是可能的。
  • 控制器:控制器接收并处理用户输入,如键盘点击或鼠标点击/移动,并将其转换为模型或视图的更改请求。

使用这三个组件分离关注点可以避免应用的数据与其表示之间的紧密耦合。它允许同一数据(模型)的多个表示(视图),可以根据通过控制器接收的用户输入计算和显示。

MVC 模式允许以下交互:

  1. 模型可以根据从控制器接收的输入更改其数据。
  2. 更改的数据反映在视图上,这些视图订阅了模型中的更改。
  3. 控制器可以发送命令来更新模型的状态,例如在对文档进行更改时。控制器还可以发送命令来修改视图的表示形式,而无需对模型进行任何更改,例如放大图形或图表。
  4. MVC 模式隐含地包括变更传播机制,用于通知每个组件其他依赖组件的变更。
  5. Python 世界中的许多 web 应用实现 MVC 或其变体。在接下来的章节中,我们将介绍其中的两个,即 Django 和 Flask。

模型模板视图(MTV)–Django

Django 项目是 Python 世界中最流行的 web 应用框架之一。Django 实现了类似于 MVC 模式的东西,但有一些细微的区别。

Django(核心)组件架构如下图所示:

Model Template View (MTV) – Django

Django 核心组件架构

Django 框架的核心组件如下:

  • 一个对象关系映射器ORM),作为数据模型(Python)和数据库(RDBMS)之间的中介—这可以被认为是模型层。
  • Python 中的一组回调函数,将数据呈现给特定 URL 的用户界面,这可以被认为是视图层。视图的重点是构建和转换内容,而不是它的实际表示。
  • 用于在不同演示文稿中呈现内容的一组 HTML 模板。视图将委托给特定的模板,该模板负责数据的显示方式。
  • 基于正则表达式的URL DISPATCHER,它将服务器上的相对路径连接到特定视图及其变量参数。这可以看作是一个基本的控制器
  • 在 Django 中,由于演示由模板层执行,而内容映射仅由视图层完成,Django 通常被描述为实现模型模板视图MTV框架。
  • Django 中的控制器没有很好的定义—可以将其视为整个框架本身—或者仅限于URL DISPATCHER层。

Django admin–以模型为中心的自动化视图

Django 框架最强大的组件之一是其自动管理系统,它从 Django 模型读取元数据,并生成以模型为中心的快速管理视图,系统管理员可以通过简单的 HTML 表单查看和编辑数据模型。

为了便于说明,以下是 Django 模型的示例,该模型描述了作为glossary术语添加到网站的术语(词汇表是描述与特定主题、文本或方言相关的单词含义的单词列表或索引):

from django.db import models

class GlossaryTerm(models.Model):
    """ Model for describing a glossary word (term) """

    term = models.CharField(max_length=1024)
    meaning = models.CharField(max_length=1024)
    meaning_html = models.CharField('Meaning with HTML markup',
                    max_length=4096, null=True, blank=True)
    example = models.CharField(max_length=4096, null=True, blank=True)

    # can be a ManyToManyField?
    domains = models.CharField(max_length=128, null=True, blank=True)

    notes = models.CharField(max_length=2048, null=True, blank=True)
    url = models.CharField('URL', max_length=2048, null=True, blank=True)
    name = models.ForeignKey('GlossarySource', verbose_name='Source', blank=True)

    def __unicode__(self):
        return self.term

    class Meta:
        unique_together = ('term', 'meaning', 'url')

这是与管理系统相结合,该系统为自动管理视图注册模型:

from django.contrib import admin

admin.site.register(GlossaryTerm)
admin.site.register(GlossarySource)

以下是通过 Django 管理界面添加术语表术语的自动管理视图(HTML 表单)的图像:

Django admin – automated model-centric views

Django 自动管理视图(HTML 表单),用于添加术语表术语

通过快速观察,Django 管理员可以为模型中的不同数据字段生成正确的字段类型,并生成用于添加数据的表单。这是 Django 中的一个强大模式,它允许用户生成自动管理视图,以添加/编辑模型,几乎不需要任何编码工作。

现在让我们看看另一个流行的 Python web 应用框架,即 Flask。

柔性微框架——烧瓶

Flask 是一个微网络框架,它使用一种极简主义的理念来构建 web 应用。Flask 只依赖于两个库:Werkzeug(http://werkzeug.pocoo.org/ )WSGI 工具包和 Jinja2 模板框架。

Flask 通过 decorators 提供了简单的 URL 路由。烧瓶中的micro字表示框架的核心很小。Python 社区围绕 Flask 构建的多个扩展提供了对数据库、表单等的支持。

因此,core Flask 可以被认为是 MTV 框架减去 M(视图模板),因为 core 不实现对模型的支持。

以下是烧瓶组件架构的近似示意图:

Flexible Microframework – Flask

烧瓶组件示意图

使用模板的简单 Flask 应用如下所示:

from flask import Flask
app = Flask(__name__)

@app.route('/')
def index():
    data = 'some data'
    return render_template('index.html', **locals())

我们可以在这里找到 MVC 模式的几个组件:

  • @app.route装饰器将请求从浏览器路由到index函数。应用路由器可以看作是控制器。
  • index函数返回数据,并使用模板呈现数据。index功能可被视为生成视图或视图组件。
  • Flask 使用类似 Django 的模板将内容与演示分离。这可以看作是模板组件。
  • 烧瓶芯中没有特定的模型组件。然而,这可以通过附加插件的帮助进行添加。
  • Flask 使用插件架构来支持其他功能。例如,可以通过使用 Flask SQLAlchemy、使用 Flask RESTful 的 RESTful API 支持、使用 Flask marshmallow 的序列化以及其他方法来添加模型。

事件驱动编程

事件驱动编程是一种系统架构范例,其中程序内的逻辑流由事件驱动,如用户操作、来自其他程序的消息或硬件(传感器)输入。

在事件驱动的架构中,通常有一个主事件循环,它侦听事件,然后在检测到事件时用特定参数触发回调函数。

在 Linux 等现代操作系统中,对输入文件描述符(如套接字或打开的文件)上的事件的支持是通过系统调用实现的,如selectpollepoll

Python 通过其select模块为这些系统调用提供包装。使用 Python 中的select模块编写简单的事件驱动程序并不困难。

下面的一组程序使用 select 模块的强大功能在 Python 中共同实现了一个基本的聊天服务器和客户端。

聊天服务器和客户端使用选择模块的 I/O 多路复用

我们的聊天服务器通过select模块使用select系统调用来创建通道,客户可以在这些通道中相互连接和交谈。它处理输入就绪的事件(套接字)——如果事件是连接到服务器的客户端,则连接并执行握手;如果事件是要从标准输入读取的数据,则服务器读取数据,或者将从一个客户端接收的数据传递给其他客户端。

这是我们的聊天服务器:

由于聊天服务器的代码很大,我们只包括主要功能,即这里的 serve 功能,它显示了服务器如何使用基于选择的 I/O 多路复用。serve函数中的许多代码也被修剪,以保持打印的代码较小。

完整的源代码可以从本书的网站上下载。

# chatserver.py

import socket
import select
import signal
import sys
from communication import send, receive

class ChatServer(object):
    """ Simple chat server using select """

    def serve(self):
        inputs = [self.server,sys.stdin]
        self.outputs = []

        while True:

                inputready,outputready,exceptready = select.select(inputs, self.outputs, [])

            for s in inputready:

                if s == self.server:
                    # handle the server socket
                    client, address = self.server.accept()

                    # Read the login name
                    cname = receive(client).split('NAME: ')[1]

                    # Compute client name and send back
                    self.clients += 1
                    send(client, 'CLIENT: ' + str(address[0]))
                    inputs.append(client)

                    self.clientmap[client] = (address, cname)
                    self.outputs.append(client)

                elif s == sys.stdin:
                    # handle standard input – the server exits 
                    junk = sys.stdin.readline()
		  break
                else:
                    # handle all other sockets
                    try:
                        data = receive(s)
                        if data:
                            # Send as new client's message...
                            msg = '\n#[' + self.get_name(s) + ']>> ' + data
                            # Send data to all except ourselves
                            for o in self.outputs:
                                if o != s:
                                    send(o, msg)
                        else:
                            print('chatserver: %d hung up' % s.fileno())
                            self.clients -= 1
                            s.close()
                            inputs.remove(s)
                            self.outputs.remove(s)

                    except socket.error as e:
                        # Remove
                        inputs.remove(s)
                        self.outputs.remove(s)

        self.server.close()

if __name__ == "__main__":
    ChatServer().serve()

聊天服务器可以通过发送一行空输入来停止。

聊天客户端也使用select系统调用。它使用套接字连接到服务器,然后等待套接字上的事件和标准输入。如果事件来自标准输入,则读取数据。否则,它会通过套接字将数据发送到服务器:

# chatclient.py
import socket
import select
import sys
from communication import send, receive

class ChatClient(object):
    """ A simple command line chat client using select """

    def __init__(self, name, host='127.0.0.1', port=3490):
        self.name = name
        # Quit flag
        self.flag = False
        self.port = int(port)
        self.host = host
        # Initial prompt
        self.prompt='[' + '@'.join((name, socket.gethostname().split('.')[0])) + ']> '
        # Connect to server at port
        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.connect((host, self.port))
            print('Connected to chat server@%d' % self.port)
            # Send my name...
            send(self.sock,'NAME: ' + self.name) 
            data = receive(self.sock)
            # Contains client address, set it
            addr = data.split('CLIENT: ')[1]
            self.prompt = '[' + '@'.join((self.name, addr)) + ']> '
        except socket.error as e:
            print('Could not connect to chat server @%d' % self.port)
            sys.exit(1)

    def chat(self):
        """ Main chat method """

        while not self.flag:
            try:
                sys.stdout.write(self.prompt)
                sys.stdout.flush()

                # Wait for input from stdin & socket
                inputready, outputready,exceptrdy = select.select([0, self.sock], [],[])

                for i in inputready:
                    if i == 0:
                        data = sys.stdin.readline().strip()
                        if data: send(self.sock, data)
                    elif i == self.sock:
                        data = receive(self.sock)
                        if not data:
                            print('Shutting down.')
                            self.flag = True
                            break
                        else:
                            sys.stdout.write(data + '\n')
                            sys.stdout.flush()

            except KeyboardInterrupt:
                print('Interrupted.')
                self.sock.close()
                break

if __name__ == "__main__":
    if len(sys.argv)<3:
        sys.exit('Usage: %s chatid host portno' % sys.argv[0])

    client = ChatClient(sys.argv[1],sys.argv[2], int(sys.argv[3]))
    client.chat()

在终端上按Ctrl+C可以停止聊天客户端。

为了通过套接字来回发送数据,这两个脚本都使用名为communication的第三个模块,该模块具有sendreceive功能。此模块使用 pickle 分别对sendreceive函数中的数据进行序列化和反序列化:

# communication.py
import pickle
import socket
import struct

def send(channel, *args):
    """ Send a message to a channel """

    buf = pickle.dumps(args)
    value = socket.htonl(len(buf))
    size = struct.pack("L",value)
    channel.send(size)
    channel.send(buf)

def receive(channel):
    """ Receive a message from a channel """

    size = struct.calcsize("L")
    size = channel.recv(size)
    try:
        size = socket.ntohl(struct.unpack("L", size)[0])
    except struct.error as e:
        return ''

    buf = ""

    while len(buf) < size:
        buf = channel.recv(size - len(buf))

    return pickle.loads(buf)[0]

以下是正在运行的服务器和通过聊天服务器相互连接的两个客户端的一些图像:

下面是名为andy的客户端 1 连接到聊天服务器的图像:

Chat server and client using I/O multiplexing with the select module

聊天客户端#1 的聊天会话(客户端名称:andy)

类似地,这里的是一个名为betty的客户端,它连接到聊天服务器并与andy通话:

Chat server and client using I/O multiplexing with the select module

聊天客户端#2 的聊天会话(客户端名称:betty)

以下列出了一些节目的有趣之处:

  • 查看客户端如何能够看到彼此的消息。这是因为服务器将一个客户端发送的数据发送给所有其他连接的客户端。我们的聊天服务器在消息的前面加了一个散列#,表示该消息来自另一个客户端。

  • 查看服务器如何向所有其他客户端发送客户端的连接和断开连接信息。当另一个客户端连接到会话或从会话断开连接时,会通知客户端。

  • The server echoes messages when a client disconnects saying that the client hung up:

    前面的聊天服务器和客户端示例是作者自己的 Python 配方的一个小变化,该配方位于的 ASPN 食谱中 https://code.activestate.com/recipes/531824

Twisted、Eventlet 和 Gevent 等库将简单的基于选择的多路复用带到下一个级别,以便构建向程序员提供高级基于事件的编程例程的系统,通常基于与聊天服务器示例中的循环非常类似的核心事件循环。

我们将在以下章节中讨论这些框架的架构。

事件驱动编程与并发编程

我们在上一节中看到的示例使用了异步事件技术,正如我们在一章关于并发性中看到的那样。这与真正的并发或并行编程不同。

事件编程库也使用异步事件技术。只有一个执行线程,其中任务根据接收到的事件一个接一个地交错。

在下面的示例中,考虑由三个线程或进程真正并行执行三个任务:

Event-driven programming versus Concurrent programming

使用三个线程并行执行三个任务

将与通过事件驱动编程执行任务时发生的情况进行对比,如下图所示:

Event-driven programming versus Concurrent programming

在一个线程中异步执行三个任务

在异步模型中,只有一个执行线程,任务以交叉方式执行。在异步处理服务器的事件循环中,每个任务都有自己的处理时间槽,但在给定时间只执行一个任务。任务将控制权交还给循环,以便它可以在下一个时间片中调度与当前正在执行的任务不同的任务。正如我们在第 5 章中所看到的,编写可扩展的应用,这是一种协作多任务处理。

扭曲

Twisted 是一个事件驱动的网络引擎,支持多种协议,如 DNS、SMTP、POP3、IMAP 等。它还支持编写 SSH 客户端和服务器,以及构建消息传递和 IRC 客户端和服务器。

Twisted 还提供了一组模式(样式)来编写通用服务器和客户端,如 Web 服务器/客户端(HTTP)、发布/订阅模式、消息传递客户端和服务器(SOAP/XML-RPC)等。

它使用 Reactor 设计模式,该模式在单个线程中将来自多个源的事件多路复用并分派到它们的事件处理程序。

它接收来自多个并发客户端的消息、请求和连接,并使用事件处理程序顺序处理这些帖子,而不需要并发线程或进程。

反应堆伪代码大致如下所示:

while True:
    timeout = time_until_next_timed_event()
    events = wait_for_events(timeout)
    events += timed_events_until(now())
    for event in events:
        event.process()

Twisted 使用回调在事件发生时调用事件处理程序。要处理特定事件,将为该事件注册回调。回调可用于常规处理,也可用于管理异常(errback)。

asyncio模块类似,Twisted 使用 futures 等对象包装任务执行的结果,但实际结果仍然不可用。在 Twisted 中,这些对象称为延迟

延迟对象有一对回调链:一个用于处理结果(回调),另一个用于管理错误(errback)。当获得执行结果时,将创建一个延迟对象,并按照添加它们的顺序调用其回调和/或 errback。

下面是 Twisted 的架构图,显示了高级组件:

Twisted

扭曲-核心组件

Twisted–一个简单的 web 客户端

下面是一个简单的 web HTTP 客户端示例,它使用 Twisted 获取给定 URL 并将其内容保存到特定文件名:

# twisted_fetch_url.py
from twisted.internet import reactor
from twisted.web.client import getPage
import sys

def save_page(page, filename='content.html'):
    print type(page)
    open(filename,'w').write(page)
    print 'Length of data',len(page)
    print 'Data saved to',filename

def handle_error(error):
    print error

def finish_processing(value):
    print "Shutting down..."
    reactor.stop()

if __name__ == "__main__":
    url = sys.argv[1]
    deferred = getPage(url) 
    deferred.addCallbacks(save_page, handle_error)
    deferred.addBoth(finish_processing)

    reactor.run()

正如您在前面的代码中所看到的,getPage方法返回一个延迟的 URL,而不是 URL 的数据。对于延迟的调用,我们添加了两个回调:一个用于处理数据(T1 函数),另一个用于处理错误(T2 函数)。deferred 的addBoth方法添加了一个函数作为回调和 errback。

事件处理通过运行反应器启动。在最后调用的finish_processing回调中,反应堆停止。由于事件处理程序是按添加顺序调用的,因此此函数将仅在最后调用。

当反应堆运行时,会发生以下事件:

  • 获取页面并创建延迟页面。

  • 回调按顺序在延迟服务器上调用。首先调用save_page函数,将页面内容保存到content.html文件中。然后调用一个handle_error事件处理程序,它将打印任何错误字符串。

  • Finally, finish_processing is called, which stops the reactor and the event processing ends, exiting the program.

    在编写本文时,Twisted 还不能用于 Python3,因此前面的代码是为 Python2 编写的。

  • 当您运行代码时,您将看到生成以下输出:

    $ python2 twisted_fetch_url.py http://www.google.com
    Length of data 13280
    Data saved to content.html
    Shutting down...

使用 Twisted 的聊天服务器

现在让我们来看一下如何使用 Twisted-on 线路编写一个简单的聊天服务器,类似于使用select模块的聊天服务器。

在 Twisted 中,服务器是通过实现协议和协议工厂来构建的。协议类通常从 TwistedProtocol类继承。

工厂只不过是充当协议对象工厂模式的类。

下面是我们使用 Twisted 的聊天服务器:

from twisted.internet import protocol, reactor

class Chat(protocol.Protocol):
    """ Chat protocol """

    transports = {}
    peers = {}

    def connectionMade(self):
        self._peer = self.transport.getPeer()
        print 'Connected',self._peer

    def connectionLost(self, reason):
        self._peer = self.transport.getPeer()
        # Find out and inform other clients
        user = self.peers.get((self._peer.host, self._peer.port))
        if user != None:
            self.broadcast('(User %s disconnected)\n' % user, user)
            print 'User %s disconnected from %s' % (user, self._peer)

    def broadcast(self, msg, user):
        """ Broadcast chat message to all connected users except 'user' """

        for key in self.transports.keys():
            if key != user:
                if msg != "<handshake>":
                    self.transports[key].write('#[' + user + "]>>> " + msg)
                else:
                    # Inform other clients of connection
                    self.transports[key].write('(User %s connected from %s)\n' % (user, self._peer))                

    def dataReceived(self, data):
        """ Callback when data is ready to be read from the socket """

        user, msg = data.split(":")
        print "Got data=>",msg,"from",user
        self.transports[user] = self.transport
        # Make an entry in the peers dictionary
        self.peers[(self._peer.host, self._peer.port)] = user
        self.broadcast(msg, user)

class ChatFactory(protocol.Factory):
    """ Chat protocol factory """

    def buildProtocol(self, addr):
        return Chat()

if __name__ == "__main__":
    reactor.listenTCP(3490, ChatFactory())
    reactor.run()

我们的聊天服务器比以前的要复杂一些,因为它执行以下附加步骤:

  1. 它有一个单独的握手协议,使用特殊的<handshake>消息。

  2. 当一个客户端连接时,它会广播给其他客户端,通知他们客户端的名称和连接详细信息。

  3. When a client disconnects, other clients are informed about this.

    聊天客户端也使用 Twisted,并使用两种协议–即用于与服务器通信的ChatClientProtocol和用于从标准输入读取数据并将从服务器接收的数据回显到标准输出的StdioClientProtocol

    后一种协议还将前一种协议连接到其输入端,以便在标准输入端接收的任何数据都作为聊天信息发送到服务器。

请看下面的代码:

import sys
import socket
from twisted.internet import stdio, reactor, protocol

class ChatProtocol(protocol.Protocol):
    """ Base protocol for chat """

    def __init__(self, client):
        self.output = None
        # Client name: E.g: andy
        self.client = client
        self.prompt='[' + '@'.join((self.client, socket.gethostname().split('.')[0])) + ']> '             

    def input_prompt(self):
        """ The input prefix for client """
        sys.stdout.write(self.prompt)
        sys.stdout.flush()

    def dataReceived(self, data):
        self.processData(data)

class ChatClientProtocol(ChatProtocol):
    """ Chat client protocol """

    def connectionMade(self):
        print 'Connection made'
        self.output.write(self.client + ":<handshake>")

    def processData(self, data):
        """ Process data received """

        if not len(data.strip()):
            return

        self.input_prompt()

        if self.output:
            # Send data in this form to server
            self.output.write(self.client + ":" + data)

class StdioClientProtocol(ChatProtocol):
    """ Protocol which reads data from input and echoes
    data to standard output """

    def connectionMade(self):
        # Create chat client protocol
        chat = ChatClientProtocol(client=sys.argv[1])
        chat.output = self.transport

        # Create stdio wrapper
        stdio_wrapper = stdio.StandardIO(chat)
        # Connect to output
        self.output = stdio_wrapper
        print "Connected to server"
        self.input_prompt()

    def input_prompt(self):
        # Since the output is directly connected
        # to stdout, use that to write.
        self.output.write(self.prompt)

    def processData(self, data):
        """ Process data received """

        if self.output:
            self.output.write('\n' + data)
            self.input_prompt()

class StdioClientFactory(protocol.ClientFactory):

    def buildProtocol(self, addr):
        return StdioClientProtocol(sys.argv[1])

def main():
    reactor.connectTCP("localhost", 3490, StdioClientFactory())
    reactor.run()

if __name__ == '__main__':
    main()

这里是两个客户端andybetty使用该聊天服务器和客户端进行通信的一些屏幕截图:

Chat Server using Twisted

使用 Twisted 聊天服务器的聊天客户端–客户端#1(andy)会话

以下是客户贝蒂的第二节课:

Chat Server using Twisted

使用 Twisted 聊天服务器的聊天客户端–客户端 2 会话(betty)

您可以通过交替查看屏幕截图来遵循对话的流程。

注意当用户 betty 连接和用户 andy 断开连接时服务器发送的连接和断开消息。

小事件

Eventlet 是 Python 世界中另一个著名的网络库,它允许人们使用异步执行的相同概念编写事件驱动的程序。

Eventlet 在一组所谓的绿色线程的帮助下使用协同例程,这些线程是执行协同多任务的轻量级用户空间线程。

Eventlet 在一组绿色线程Greenpool类上使用抽象来执行其任务。

Greenpool类运行一组预定义的Greenpool线程(默认为1000),并提供以不同方式将函数和可调用项映射到线程的方法。

以下是使用 Eventlet 重写的多用户聊天服务器:

# eventlet_chat.py

import eventlet
from eventlet.green import socket

participants = set()

def new_chat_channel(conn):
    """ New chat channel for a given connection """

    data = conn.recv(1024)
    user = ''

    while data:
        print("Chat:", data.strip())
        for p in participants:
            try:
                if p is not conn:
                    data = data.decode('utf-8')
                    user, msg = data.split(':')
                    if msg != '<handshake>':
                        data_s = '\n#[' + user + ']>>> says ' + msg
                    else:
                        data_s = '(User %s connected)\n' % user

                    p.send(bytearray(data_s, 'utf-8'))
            except socket.error as e:
                # ignore broken pipes, they just mean the participant
                # closed its connection already
                if e[0] != 32:
                    raise
        data = conn.recv(1024)

    participants.remove(conn)
    print("Participant %s left chat." % user)

if __name__ == "__main__":
    port = 3490
    try:
        print("ChatServer starting up on port", port)
        server = eventlet.listen(('0.0.0.0', port))

        while True:
            new_connection, address = server.accept()
            print("Participant joined chat.")
            participants.add(new_connection)
            print(eventlet.spawn(new_chat_channel,
                                 new_connection))

    except (KeyboardInterrupt, SystemExit):
        print("ChatServer exiting.")

该服务器可以与我们在前面的示例中看到的 Twisted 聊天客户端一起使用,其行为方式完全相同。因此,我们将不显示此服务器的运行示例。

Eventlet 库内部使用greenlets,这是一个包,在 Python 运行时提供绿色线程。在下一节中,我们将看到 greenlet 和一个相关的库 Gevent。

格林莱特和格文特

Greenlet 是一个包,它在 Python 解释器的顶部提供了一个绿色或微线程版本。它的灵感来自 Stackless,一个支持称为 stacklets 的微线程的 CPython 版本。但是,greenlet 能够在标准的 CPython 运行时上运行。

Gevent 是一个 Python 网络库,在libev之上提供高级同步 API,该事件库是用 C 编写的。

Gevent 的灵感来源于 Gevent,但它具有更一致的 API 和更好的性能。

与 Eventlet 一样,gevent 在系统库上进行了大量的 monkey 修补,以支持协作多任务处理。例如,gevent 自带了自己的套接字,就像 Eventlet 一样。

与 Eventlet 不同,gevent 还需要程序员完成显式的 monkey 补丁。它提供了在模块本身上执行此操作的方法。

不用多说,让我们看看使用 gevent 的多用户聊天服务器的样子:

# gevent_chat_server.py

import gevent
from gevent import monkey
from gevent import socket
from gevent.server import StreamServer

monkey.patch_all()

participants = set()

def new_chat_channel(conn, address):
    """ New chat channel for a given connection """

    participants.add(conn)
    data = conn.recv(1024)
    user = ''

    while data:
        print("Chat:", data.strip())
        for p in participants:
            try:
                if p is not conn:
                    data = data.decode('utf-8')
                    user, msg = data.split(':')
                    if msg != '<handshake>':
                        data_s = '\n#[' + user + ']>>> says ' + msg
                    else:
                        data_s = '(User %s connected)\n' % user

                    p.send(bytearray(data_s, 'utf-8'))                  
            except socket.error as e:
                # ignore broken pipes, they just mean the participant
                # closed its connection already
                if e[0] != 32:
                    raise
        data = conn.recv(1024)

    participants.remove(conn)
    print("Participant %s left chat." % user)

if __name__ == "__main__":
    port = 3490
    try:
        print("ChatServer starting up on port", port)
        server = StreamServer(('0.0.0.0', port), new_chat_channel)
        server.serve_forever()
    except (KeyboardInterrupt, SystemExit):
        print("ChatServer exiting.")

基于 gevent 的聊天服务器的代码与使用 Eventlet 的几乎相同。这样做的原因是,当建立新连接时,通过处理对回调函数的控制,两者的工作方式非常相似。在这两种情况下,回调函数名为new_chat_channel,它具有相同的功能,因此代码非常相似。

两者的差异如下:

  • gevent 提供了自己的 TCP 服务器类StreamingServer——因此我们使用该类而不是直接监听模块
  • 在 gevent 服务器中,对于每个连接,new_chat_channel处理程序都被调用,因此参与者集在那里被管理
  • 由于 gevent 服务器有自己的事件循环,因此不需要像我们使用 Eventlet 那样创建 while 循环来侦听传入连接

此示例与前面的示例完全相同,并且适用于 Twisted 聊天客户端。

微服务架构

微服务架构是一种架构风格,将单个应用开发为一套小型独立服务,每个服务运行在自己的进程中,并通过轻量级机制(通常使用 HTTP 协议)进行通信。

微服务是可独立部署的组件,通常具有零或最低限度的中央管理或配置。

微服务可以被认为是面向服务的架构SOA的一种特定实现风格,其中,应用不是自上而下构建一个整体式应用,而是构建为一个相互交互、独立服务的动态组。

传统上,企业应用是以单片模式构建的,通常由以下三层组成:

  1. 由 HTML 和 JavaScript 组成的客户端用户界面(UI)层。
  2. 由业务逻辑组成的服务器端应用。
  3. 数据库和数据访问层,用于保存业务数据。

另一方面,微服务架构将该层拆分为多个服务。例如,业务逻辑将被拆分为多个组件服务,而不是单个应用中的业务逻辑,这些组件服务的交互定义了应用中的逻辑流。这些服务可以查询单个数据库或独立的本地数据库,后者的配置更常见。

微服务架构中的数据通常以文档对象的形式处理和返回——通常以 JSON 编码。

以下示意图说明了单片架构与微服务架构的区别:

Microservice architecture

单片(左)与微服务(右)架构

Python 中的微服务框架

由于微服务更多地是一种哲学或架构风格,因此没有一种不同的软件框架类别可以说是适合它们的。然而,我们仍然可以对框架应该具有的属性进行一些有根据的预测,因为框架是用 Python 为 web 应用构建微服务架构的好选择。这些属性包括以下内容:

  • 组件架构应该是灵活的。该框架不应在其规定的使系统的不同部分工作的组件选择中僵化。
  • 框架的核心应该是轻量级的。这是有道理的,因为如果我们一开始就对微服务框架本身有很多依赖,那么软件一开始就感觉很沉重。这可能会导致部署、测试等方面的问题。
  • 该框架应支持零配置或最低配置。微服务架构通常是自动配置的(零配置),或者使用一个位置上可用的最小配置输入集。通常,配置本身作为一种微服务提供给其他服务,以便于查询和共享配置,从而实现简单、一致和可扩展性。
  • 它应该可以很容易地将现有的业务逻辑片段(比如编码为类或函数)转换为 HTTP 或 RCP 服务。这允许重用和智能重构代码。

如果您使用这些原则并在 Python 软件生态系统中四处查看,您将发现有几个 web 应用框架符合要求,而有几个则不符合要求。

例如,Flask 和它的单文件对应物 Battle 由于其最小的占地面积、较小的核心和简单的配置,是微服务框架的良好候选。

金字塔(Pyramid)这样的框架也可以用于微服务架构,因为它提高了组件选择的灵活性,避免了紧密集成。

Django 等更为复杂的 web 框架对于 microservices 框架来说是一个糟糕的选择,原因恰恰相反——组件的紧密垂直集成、在选择组件时缺乏灵活性、配置复杂等等。

另一个专门为在 Python 中实现微服务而编写的框架是 Nameko。Nameko 致力于应用的可测试性,它支持不同的通信协议,如 HTTP、RPC(通过 AMQP)-发布子系统和计时器服务。

我们将不讨论这些框架的细节。另一方面,我们将研究使用微服务构建和设计 web 应用的真实示例。

微服务示例-餐厅预订

让我们以一个真实的 Python web 应用为例,尝试将其设计为一组微服务。

我们的应用是一个餐厅预订应用,帮助用户在特定时间在靠近其当前位置的餐厅为一定数量的人进行预订。假设预订只在同一天完成。

应用需要执行以下操作:

  1. 返回在用户想要预订时开放营业的餐厅列表。
  2. 对于给定的餐厅,返回足够的元信息,如菜肴选择、评级、定价等,并允许用户根据他们的标准筛选酒店。
  3. 一旦用户做出选择,允许他们在给定的时间内为一定数量的人预订所选餐厅。

这些需求中的每一个都足够细化,可以拥有自己的微服务。

因此,我们的应用将使用以下一组微服务进行设计:

  • 一种使用用户位置的服务,返回开放营业的餐厅列表,并支持在线预订 API。
  • 第二个服务检索给定酒店、给定餐厅 ID 的元数据。应用可以使用此元数据与用户的标准进行比较,以查看是否匹配。
  • 第三种服务,在给定餐厅 ID、用户信息、所需座位数和预订时间的情况下,使用预订 API 预订座位,并返回状态。

应用逻辑的核心部分现在适合这三种微服务。一旦实现了这些服务,调用这些服务和执行保留的管道将直接发生在应用逻辑中。

我们将不展示此应用的任何代码,因为它本身就是一个项目,但我们将向读者展示微服务在 API 和返回数据方面的外观。

Microservices example – restaurant reservation

使用微服务的餐厅预订应用架构

微服务通常以 JSON 的形式返回数据。例如,我们的第一个返回餐厅列表的服务将返回一个类似于以下内容的 JSON:

GET /restaurants?geohash=tdr1y1g1zgzc

{
    "8f95e6ad-17a7-48a9-9f82-07972d2bc660": {
        "name": "Tandoor",
        "address": "Centenary building, #28, MG Road b-01"
        "hours": "12.00 – 23.30"
	},
  "4307a4b1-6f35-481b-915b-c57d2d625e93": {
        "name": "Karavalli",
        "address": "The Gateway Hotel, 66, Ground Floor"
        "hours": "12.30 – 01:00"
	},
   ...
} 

第二个服务返回餐厅元数据,主要返回 JSON,如下所示:

GET /restaurants/8f95e6ad-17a7-48a9-9f82-07972d2bc660

{

   "name": "Tandoor",
   "address": "Centenary building, #28, MG Road b-01"
   "hours": "12.00 – 23.30",
   "rating": 4.5,
   "cuisine": "north indian",
   "lunch buffet": "no",
   "dinner buffet": "no",
   "price": 800

} 

下面是第三个的交互,它根据餐厅 ID 进行预订:

由于此服务需要用户提供预订信息,因此需要一个包含预订详细信息的 JSON 负载。因此,这最好作为 HTTP POST 调用来完成。

POST  /restaurants/reserve

在这种情况下,服务将使用以下给定有效负载作为 POST 数据:

{
   "name": "Anand B Pillai",
   "phone": 9880078014,
   "time": "2017-04-14 20:40:00",
   "seats": 3,
   "id": "8f95e6ad-17a7-48a9-9f82-07972d2bc660"
} 

它将返回一个 JSON 作为响应,如下所示:

{
   "status": "confirmed",
   "code": "WJ7D2B",
   "time": "2017-04-14 20:40:00",
   "seats": 3
}

有了这种设计,在您选择的框架中实现应用并不困难,无论是 Flask、Bottle、Nameko 还是其他任何东西。

微服务——优势

那么,与单片应用相比,使用微服务有什么优势呢?让我们来看看一些重要的问题:

  • 微服务通过将应用逻辑拆分为多个服务来增强关注点的分离。这提高了内聚性,减少了耦合。由于业务逻辑不在一个地方,因此不需要对系统进行自顶向下、预先的设计。相反,架构师可以专注于微服务和应用之间的相互作用和通信,并让微服务本身的设计和架构通过重构迭代地出现。
  • 微服务提高了可测试性,因为现在逻辑的每个部分都可以作为一个单独的服务进行独立测试,因此很容易与其他部分隔离,并进行测试。
  • 团队可以围绕业务能力而不是应用层或技术层来组织。由于每个微服务都包括逻辑、数据和部署,因此使用微服务的公司鼓励跨职能角色。这有助于建立一个更加敏捷的组织。
  • 微服务鼓励分散数据。通常,每个服务都有自己的本地数据库或数据存储,而不是单一应用首选的中央数据库。
  • 微服务促进了持续交付和集成以及快速部署。由于对业务逻辑的更改通常只需要对一个或几个服务进行小的更改,因此测试和重新部署通常可以在紧凑的周期内完成,并且在大多数情况下可以完全自动化。

管道和过滤器结构

管道和过滤器是一种简单的架构样式,它连接许多处理数据流的组件,每个组件通过管道连接到处理管道中的下一个组件。

管道和过滤器架构的灵感来自 Unix 技术,即通过外壳上的管道将应用的输出连接到另一个应用的输入。

管道和过滤器架构由一个或多个数据源组成。数据源通过管道连接到数据过滤器。过滤器处理接收到的数据,并将其传递给管道中的其他过滤器。最终数据在数据接收器接收:

Pipe and Filter architectures

管道和过滤器架构

管道和过滤器通常用于执行大量数据处理(如数据分析、数据转换、元数据提取等)的应用。

过滤器可以在同一台机器上运行,它们使用实际的 Unix 管道或共享内存进行通信。然而,在大型系统中,它们通常在单独的机器上运行,管道不需要是实际的管道,而是任何类型的数据通道,如套接字、共享内存、队列等。

可以将多个过滤器管道连接在一起,以执行复杂的数据处理和数据暂存。

使用这种架构的 Linux 应用的一个很好的例子是gstreamer——一个多媒体处理库,它可以对多媒体视频和音频执行许多任务,包括播放、录制、编辑和流媒体。

Python 中的管道和过滤器

在 Python 中,我们在多处理模块中遇到了最纯粹的管道。多处理模块提供管道作为从一个进程到另一个进程的通信方式。

管道将创建为一对父连接和子连接。连接一侧写的内容可以在另一侧读取,反之亦然。

这使我们能够构建非常简单的数据处理管道。

例如,在 Linux 上,可以通过以下一系列命令计算文件中的字数:

$ cat filename | wc -w

我们将编写一个简单的程序,使用多处理模块模拟此管道:

# pipe_words.py
from multiprocessing import Process, Pipe
import sys

def read(filename, conn):
    """ Read data from a file and send it to a pipe """

    conn.send(open(filename).read())

def words(conn):
    """ Read data from a connection and print number of words """

    data = conn.recv()
    print('Words',len(data.split()))

if __name__ == "__main__":
    parent, child = Pipe()
    p1 = Process(target=read, args=(sys.argv[1], child))
    p1.start()
    p2 = Process(target=words, args=(parent,))
    p2.start()
    p1.join();p2.join()

以下是对工作流程的分析:

  1. 将创建一个管道,并获得两个连接。
  2. read函数作为一个进程执行,传递管道(子)的一端和要读取的文件名。
  3. 此进程读取文件,并将数据写入连接。
  4. words函数作为第二个进程执行,将管道的另一端传递给它。
  5. 当此函数作为进程执行时,它从连接读取数据,并打印字数。

以下屏幕截图显示了同一文件上 shell 命令和前面程序的输出:

Pipe and Filter in Python

使用管道及其等效 Python 程序输出 shell 命令

创建管道不需要使用看起来像实际管道的对象。另一方面,Python 中的生成器提供了一种极好的方法来创建一组可调用项,这些可调用项彼此调用,并消费和处理彼此的数据,从而生成一条数据处理管道。

这是与上一个相同的示例,重写为使用生成器,这一次处理文件夹中与特定模式匹配的所有文件:

# pipe_words_gen.py

# A simple data processing pipeline using generators
# to print count of words in files matching a pattern.
import os

def read(filenames):
    """ Generator that yields data from filenames as (filename, data) tuple """

    for filename in filenames:
        yield filename, open(filename).read()

def words(input):
    """ Generator that calculates words in its input """

    for filename, data in input:
        yield filename, len(data.split())

def filter(input, pattern):
    """ Filter input stream according to a pattern """

    for item in input:
        if item.endswith(pattern):
            yield item

if __name__ == "__main__":
    # Source
    stream1 = filter(os.listdir('.'), '.py')
    # Piped to next filter
    stream2 = read(stream1)
    # Piped to last filter (sink)
    stream3 = words(stream2)

    for item in stream3:
        print(item)

以下是输出的屏幕截图:

Pipe and Filter in Python

使用生成器的管道输出,该生成器打印 Python 程序的字数

可以使用以下命令验证前一个程序的输出:

$ wc -w *.py

下面是另一个程序,它使用另外两个数据过滤生成器构建一个程序,该程序监视与特定模式匹配的文件,并打印有关最新文件的信息—类似于 Linux 上的监视程序所执行的操作:

# pipe_recent_gen.py
# Using generators, print details of the most recently modified file
# matching a pattern.

import glob
import os
from time import sleep

def watch(pattern):
    """ Watch a folder for modified files matching a pattern """

    while True:
        files = glob.glob(pattern)
        # sort by modified time
        files = sorted(files, key=os.path.getmtime)
        recent = files[-1]
        yield recent        
        # Sleep a bit
        sleep(1)

def get(input):
    """ For a given file input, print its meta data """
    for item in input:
        data = os.popen("ls -lh " + item).read()
        # Clear screen
        os.system("clear")
        yield data

if __name__ == "__main__":
    import sys

    # Source + Filter #1
    stream1 = watch('*.' + sys.argv[1])

    while True:
        # Filter #2 + sink
        stream2 = get(stream1)
        print(stream2.__next__())
        sleep(2)

这最后一个程序的细节应该对读者不言自明。

下面是我们的程序在控制台上的输出,监视 Python 源文件:

Pipe and Filter in Python

监视最近修改的 Python 源文件的程序的输出

如果我们创建一个空 Python 源文件,比如example.py,那么输出会在两秒钟内发生变化:

Pipe and Filter in Python

监视程序的输出更改,始终显示最近修改的文件

使用生成器(co 例程)构建此类管道的基本技术是将一个生成器的输出连接到下一个生成器的输入。通过将许多这样的生成器串联起来,可以构建复杂程度从简单到复杂的数据处理管道。

当然,除此之外,还可以使用多种技术来建造管道。一些常见的选择是使用队列连接的生产者-消费者任务,队列可以使用线程或进程。在关于可伸缩性的一章中,我们已经看到了这方面的例子。

微服务还可以通过将一个微服务的输入连接到另一个微服务的输出来构建简单的处理管道。

在 Python 第三方软件生态系统中,有许多模块和框架允许您构建复杂的数据管道。芹菜,虽然是一个任务队列,但可以用于构建简单的批处理工作流,但管道支持有限。流水线不是芹菜的强大特性,但它对可用于此目的的链接任务的支持有限。

Luigi 是另一个健壮的框架,它是为需要管道和过滤器架构的复杂、长期运行的批处理作业编写的。Luigi 内置了对 Hadoop 作业的支持,因此它是构建数据分析管道的好选择。

总结

在本章中,我们介绍了构建软件的一些常见架构模式。我们从模型-视图-控制器架构开始,并查看了 Django 和 Flask 中的示例。您了解了 MVC 架构的组件,并了解到 Django 使用模板实现了 MVC 的一个变体。

我们将 Flask 视为一个微框架的示例,该框架通过使用带有附加服务的插件架构来实现 web 应用的最小占用空间。

我们接着讨论了事件驱动编程架构,它是一种使用共同例程和事件的异步编程。我们从一个使用 Python 中的select模块的多用户聊天示例开始。从那以后,我们继续讨论更大的框架和库。

我们讨论了 Twisted 的架构及其组件。我们还讨论了 Eventlet 及其近亲 gevent。对于这些框架中的每一个,我们都看到了多用户聊天服务器的实现。

接下来,我们将微服务作为一种架构,它通过在多个服务之间拆分核心业务逻辑来构建可伸缩的服务和部署。我们设计了一个使用微服务的餐厅预订应用的示例,并简要介绍了可用于构建微服务的 Python web 框架。

在本章末尾,我们看到了使用管道和过滤器进行串行和可伸缩数据处理的架构。我们使用 Python 中的多处理模块构建了一个实际管道的简单示例,该模块模仿 Unix 管道命令。然后,我们研究了使用发电机建造管道的技术,并看到了几个例子。我们总结了在 Python 第三方软件生态系统中构建管道和框架的技术。

这就把我们带到了应用架构一章的末尾。在下一章中,我们将介绍可部署性,即将软件部署到生产系统等环境的方面。