"It is a very sad thing that nowadays there is so little useless information." – Oscar Wilde
对于那些目睹在越来越多的应用中生成的数据量迅速增加的人来说,I/O 效率的重要性并没有消失。用户生成的内容(博客、视频、tweet 和帖子)正在成为互联网内容的主要类型,而这一趋势与社交软件的兴起同步发展,在社交软件中,映射内容之间的交叉点会在另一个层面上产生指数级的数据增长。
许多数据仓库,如谷歌、Facebook 和其他数百家,通过 API 向公众公开其数据,通常是免费的。这些网络都从用户那里收集了大量令人震惊的内容、观点、关系等,通过市场调查和各种类型的流量和使用分析,数据得到了进一步的补充。这些 API 大多是双向的,收集和存储成员上传的数据,并为这些数据提供服务。
Node 已在此数据扩展期间到达。在本章中,我们将研究 Node 如何满足排序、合并、搜索和处理大量数据的需求。在构建快速、可扩展的网络应用时,对软件进行微调,使其能够安全、廉价地处理大量数据至关重要。
我们将在下一章中讨论具体的缩放问题。在本章中,我们将研究设计多个 Node 进程一起处理大量数据的系统时的一些最佳实践。
作为讨论的一部分,我们将研究构建数据密集型应用时的并行策略,重点关注如何利用多 CPU 环境、使用多个工作人员以及利用操作系统本身来实现并行效率。将通过示例演示使用这些包含的高效处理单元组装应用的过程。
如第 5 章所述,管理多个并发客户端连接,并发性与并行性不同。并发的目标是为程序提供良好的结构,从而简化了处理多个并发进程所固有的复杂性建模。并行性的目标是通过在多个工作人员之间共享任务或计算的一部分来提高应用性能。回想一下Clinger 的愿景是非常有用的,“……数十个、数百个甚至数千个独立的微处理器,每个都有自己的本地内存和通信处理器,通过高性能通信网络进行通信。”
我们已经讨论了 Node 如何帮助我们推理非确定性控制流。让我们回忆一下 Node 的设计者是如何遵循模块化的规则的,这鼓励我们编写通过干净接口连接的简单部件。这一规则导致人们倾向于使用通用协议进行简单的网络化进程之间的通信。相关规则为简单性规则**,说明如下:**
As https://en.wikipedia.org/wiki/Unix_philosophy says, "developers should design for simplicity by looking for ways to break up program systems into small, straightforward cooperating pieces. This rule aims to discourage developers' affection for writing "intricate and beautiful complexities" that are bug prone programs in reality."
在我们继续阅读本章时,最好记住这条规则。为了控制不断扩大的数据量,我们可以构建巨大、复杂和强大的巨石,希望它们仍然足够大和强大。或者,我们可以构建小型且有用的处理单元,这些单元可以组合成任何规模的单一处理团队,这与用数千或数百万廉价商品处理器构建超级计算机的方式没有什么不同。
在阅读本章时,流程查看器将非常有用。Unix 系统的一个好工具是htop,可以从下载 http://hisham.hm/htop/ 。该工具提供了 CPU 和内存使用情况的视图;在这里,我们可以看到负载是如何分布在所有八个核心上的:
让我们从研究线程和进程开始。
从整体上看,Node 环境有效地展示了多线程并行性的效率和适用于具有高并发性的应用的表达语法。使用 Node 不会约束开发人员、开发人员对系统资源的访问权限或开发人员可能要构建的应用类型。
然而,令人惊讶的是,对 Node 的持续批评正是基于这种误解。正如我们将看到的,认为 Node 不是多线程的,因此速度慢,或者还没有准备好进入黄金时段的想法,完全没有抓住要点。JavaScript 是单线程的;Node 堆栈不可用。JavaScript 代表了用来协调多个多线程 C++进程执行的语言,甚至是开发者开发的定制 C++附加组件。Node 提供 JavaScript,通过 V8 运行,主要作为并发建模工具。此外,可以只使用 JavaScript 编写整个应用,这是该平台的另一个好处。如果你选择的话,你就不能用 JavaScript 来粘贴你的 C++应用的大部分内容。
在本章中,我们将试图消除这些误解,为 Node 的乐观发展扫清道路。特别是,我们将研究在内核、进程和线程之间分散工作的技术。现在,本节将试图阐明单个线程的能力(提示:通常这就是您所需要的)。
您将很难找到大量从事企业级软件工作的专业软件工程师,他们愿意否认多线程软件开发是痛苦的。然而,为什么要做好工作如此困难?
多线程编程本身并不困难,困难在于线程同步的复杂性。使用线程模型构建高并发性非常困难,尤其是状态共享的模型。一旦应用扩展到最基本的形状之外,预测一个线程中执行的操作可能会影响所有其他线程几乎是不可能的。纠缠和碰撞会迅速增加,有时会破坏共享内存,有时会产生几乎无法追踪的 bug。
Node 的设计人员选择承认线程的速度和并行化优势,而不要求开发人员也这样做。特别是,Node 的设计人员希望避免开发人员管理线程化系统带来的困难:
- 共享内存和锁定行为导致系统变得越来越复杂,很难进行推理。
- 任务之间的通信需要实现广泛的同步原语,如互斥量和信号量、条件变量等。一个已经具有挑战性的环境需要高度复杂的工具,扩展完成相对简单的系统所需的专业知识水平。
- 竞争条件和死锁是这类系统中常见的陷阱。共享程序空间内的同时读写操作会导致排序问题,其中两个线程可能处于不可预测的竞赛中,以获得影响状态、事件或其他关键系统特性的权利。
- 由于维护线程及其状态之间的可靠边界非常困难,因此确保一个库(对于 Node 来说是一个模块)是线程安全的将耗费大量开发人员时间。我可以知道这个库不会破坏我的应用的某些部分吗?保证线程安全需要库开发人员非常努力,这些保证可能是有条件的;例如,库在读取时可能是线程安全的,但在写入时可能不是。
单线程的主要论点是,在并发环境中,控制流是困难的,尤其是当内存访问或代码执行顺序不可预测时:
- 开发人员可以专注于构建顺序可预测的执行链,而不用担心任意锁定和其他冲突。
- 由于并行化是通过使用多个进程来实现的,每个进程都有一个单独和不同的内存空间,因此进程之间的通信通过简单性规则保持简单,我们不仅实现了简单和无错误的组件,而且还实现了更容易的互操作性。
- 因为状态不是(任意)在单个 Node 进程之间共享的;单个进程自动受到保护,不受其他致力于内存重新分配或资源垄断的进程的意外访问。通信是通过使用基本协议的清晰通道进行的,所有这些都使得编写跨进程进行不可预测更改的程序变得非常困难。
- 线程安全是开发人员不必浪费时间担心的一个问题。由于单线程并发避免了多线程并发中存在的冲突,因此开发可以在更可靠的基础上更快地进行。在下图中,我们在左侧看到,跨线程共享状态需要勤勉的管理来防止冲突,而在右侧,“不共享”体系结构可以避免冲突和阻塞操作:
由事件循环有效管理的单个线程为 Node 程序带来了稳定性、可维护性、可读性和恢复能力。大新闻是 Node 继续向其开发人员提供多线程处理的速度和能力 Node 设计的卓越使这种能力变得透明,这反映了 Node 的既定目标的一部分,即以最小的难度为大多数人带来最大的能力。
下图显示了两个单线程模型和多线程模型之间的差异:
例如,阻止从文件读取操作总是需要一些时间,这是无法避免的。单线程同步模型强制每个任务在开始之前等待其他任务完成,这会消耗更多的时间。可以使用线程并行启动多个任务,即使是在不同的时间,其中总执行时间不超过运行时间最长的线程所花费的时间。使用线程时,开发人员将负责使用锁或其他调度工具同步每个线程的活动。当线程数量增加时,这会变得非常复杂,在这种复杂的情况下,存在着非常微妙和难以发现的 bug。
Node 本身管理 I/O 线程,而不是让开发人员与这种复杂性作斗争。您不需要微观管理 I/O 线程;我们只需设计一个应用来建立数据可用点(回调)以及一旦所述数据可用就要执行的指令。线程在幕后提供了同样的效率,但是它们的管理通过一个易于理解的界面向开发人员公开。
Node 的 I/O 线程池在操作系统范围内执行,其工作分布在多个核心上(就像操作系统调度的任何其他作业都会类似地分布一样)。当您运行 Node 时,您已经在利用它的多线程执行。
在即将到来的子进程和集群模块的讨论中,我们将看到多个并行进程的这种并行方式。我们将看到 Node 如何不被拒绝操作系统的全部功能。
正如我们前面所看到的,在讨论 Node 的核心架构时,执行 JavaScript 程序的 V8 线程绑定到libuv
,它充当主系统级 I/O 事件调度器。在此容量下,libuv
处理计时器、文件系统调用、网络调用以及相关 JavaScript 进程或模块命令(如fs.readFile
和http.createServer
请求的其他 I/O 操作。因此,最好将主 V8 事件循环理解为一个控制流编程接口,由高效、多线程的系统委托libuv
支持和支持。
Bert Belder是 Node 的核心贡献者之一,也是libuv
的核心贡献者之一。事实上,Node 的开发促进了libuv
开发的同时增加,这是一个反馈回路,它只提高了两个项目的速度和稳定性。它合并并替换了构成 Node 堆栈原始核心的libeo
和libev
库。
考虑雷蒙德规则中的另一个规则,即分离规则:“分离策略与机制;分离接口与引擎”。该引擎的接口是 V8 的 JavaScript 运行时。继续 Raymond,看看这个:
"One way to effect that separation is, for example, to write your application as a library of C service routines that are driven by an embedded scripting language, with the application flow of control written in the scripting language rather than C."
在单个可预测线程的抽象中编排超高效并行操作系统进程的能力是通过设计实现的,而不是作为让步。
它总结了如何改进应用开发过程的实用分析,当然这不是对可能的限制。
A detailed unpacking of libuv can be found at: https://github.com/nikhilm/uvbook. Burt Belder also gives an in-depth talk on how libuv, and Node, works under the hood at: https://www.youtube.com/watch?v=PNa9OMajw9w.
软件开发不再是单一程序的领域。在网络上运行的应用不能放弃互操作性。现代应用是分布式和解耦的。我们现在构建的应用将用户与分布在 internet 上的资源连接起来。许多用户同时访问共享资源。如果将整个系统理解为解决一个或几个明确定义的相关问题的程序接口的集合,则更容易理解复杂系统。在这样一个系统中,期望(并且希望)进程不会处于空闲状态。
早期对 Node 的批评是它没有多核意识,也就是说,如果一台 Node 服务器运行在一台有多个核的机器上,它将无法利用这一额外的马力。在这一看似合理的批评中,隐藏着一种基于稻草人的不合理偏见:一个无法显式分配内存和执行线程以实现并行化的程序无法处理企业级问题。
这种批评是持续不断的。这也不是真的。
当单 Node 进程在单核上运行时,通过使用child_process
模块可以向上旋转任意数量的 Node 进程。这个模块的基本用法很简单:我们获取一个ChildProcess
对象并监听数据事件。此示例将调用ls
Unix 命令,列出当前目录:
const spawn = require('child_process').spawn;
let ls = spawn('ls', ['-lh', '.']);
ls.stdout.on('readable', function() {
let d = this.read();
d && console.log(d.toString());
});
ls.on('close', code => {
console.log(`child process exited with code: ${code}`);
});
在这里,我们生成ls
进程(列表目录),并从生成的readable
流中读取,接收如下内容:
-rw-r--r-- 1 root root 43 Jul 9 19:44 index.html
-rw-rw-r-- 1 root root 278 Jul 15 16:36 child_example.js
-rw-r--r-- 1 root root 1.2K Jul 14 19:08 server.js
child process exited with code 0
可以通过这种方式生成任意数量的子进程。这里需要注意的是,当子进程产生或以其他方式创建时,操作系统本身会将该进程的责任分配给给定的 CPU。Node 不负责操作系统如何分配资源。结果是,在一台有八个内核的机器上,产生八个进程很可能会导致每个进程分配给独立的处理器。换句话说,操作系统会自动将子进程分布到各个 CPU 上,这就掩盖了 Node 无法充分利用多核环境的说法。
Each new Node process (child) is allocated 10 MB of memory, and represents a new V8 instance that will take at least 30 milliseconds to start up. While it is unlikely that you will be spawning many thousands of these processes, understanding how to query and set OS limits on user-created processes is beneficial; htop or top will report the number of processes currently running, or you can use ps aux | wc –l
from the command line. The ulimit
Unix command (https://ss64.com/bash/ulimit.html) provides important information on user limits on an OS. Passing ulimit
, the –u argument will show the maximum number of user processes that can be spawned. Changing the limit is accomplished by passing it as an argument: ulimit –u 8192
.
child_process
模块表示一个公开了四种主要方法的类:spawn
、fork
、exec
和execFile
。这些方法返回一个扩展了EventEmitter
的ChildProcess
对象,将一个接口公开给子事件和一些有助于管理子进程的函数。我们将看看它的主要方法,然后讨论通用的ChildProcess
接口。
这个强大的命令允许 Node 程序启动并与通过系统命令生成的进程交互。在前面的示例中,我们使用 spawn 调用本机 OS 进程ls
,将lh
和.
参数传递给该命令。这样,任何进程都可以像通过命令行启动一样启动。该方法有三个参数:
- 命令:操作系统外壳要执行的命令
- 参数(可选):这些是命令行参数,作为数组发送
- 选项:一个可选的 spawn 设置地图
spawn
的选项允许仔细定制其行为:
cwd
(字符串):默认情况下,命令会理解其当前工作目录与调用 spawn 的 Node 进程的工作目录相同。使用此指令更改该设置。env
(对象):用于将环境变量传递给子进程。例如,考虑生成具有环境对象的子,例如:
{
name: "Sandro",
role: "admin"
}
子进程环境将有权访问以下值:
detached
(布尔):当父进程产生子进程时,两个进程都形成一个组,父进程通常是该组的领导者。要使孩子成为组长,请使用“分离”。这将允许子级即使在父级退出后仍继续运行。这是因为默认情况下,父级将等待子级退出。如果父循环child.unref()
不存在,则可以告诉其他子循环【无引用】退出。uid
(编号):根据标准系统权限设置子进程的uid
(用户标识)指令,例如对子进程具有执行权限的 UID。gid
(编号):根据标准系统权限设置子进程的gid
(组标识)指令,例如对子进程具有执行权限的 GID。stdio
(字符串或数组):子进程有文件描述符,前三个依次为process.stdin
、process.stdout
和process.stderr
标准 I/O 描述符(fds=0,1,2)。该指令允许重新定义、继承这些描述符,等等。
考虑以下子进程程序的输出:
process.stdout.write(Buffer.from("Hello!"));
在这里,父母会收听child.stdout
。相反,如果我们希望一个子项继承其父项的stdio
,这样当该子项写入process.stdout
时,发出的内容通过管道传输到父项的process.stdout
,我们会将相关的父文件描述符传递给该子项,覆盖其自身的描述符:
spawn("node", ['./reader.js', './afile.txt'], {
stdio: [process.stdin, process.stdout, process.stderr]
});
在这种情况下,子级的输出将直接通过管道传输到父级的标准输出通道。另外,有关这种模式的更多信息,请参见 fork,如下所示。
三个(或更多)文件描述符中的每一个都可以取六个值中的一个:
- 管道:这将在子级和父级之间创建管道。由于前三个子文件描述符已经公开给父级(
child.stdin
、child.stdout
和child.stderr
),因此只有在更复杂的子实现中才需要这样做。 - ipc:这将创建一个 ipc 通道,用于在子级和父级之间传递消息。子进程最多可以有一个 IPC 文件描述符。一旦建立了这种连接,家长可以通过
child.send
与孩子通信。如果孩子通过此文件描述符发送 JSON 消息,则可以使用child.on("message")
捕获这些发射。如果作为子级运行 Node 程序,则使用内置此消息传递通道的ChildProcess.fork
可能是更好的选择。 - 忽略:文件描述符 0-2 将附加
/dev/null
。对于其他文件,将不会在子文件上设置引用的文件描述符。 - 流对象:允许父对象与子对象共享流。出于演示目的,如果一个孩子将向任何提供的
WritableStream
写入相同的内容,我们可以这样做:
let writer = fs.createWriteStream('./a.out');
writer.on('open', () => {
let cp = spawn("node", ['./reader.js'], {
stdio: [null, writer, null]
});
});
子级现在将获取其内容并将其传输到已发送的任何输出流:
fs.createReadStream('cached.data').pipe(process.stdout);
- 整数:文件描述符 ID。
- 空且未定义:为默认值。对于文件描述符 0-2(
stdin
、stdout
、stderr
,创建管道;其他默认为ignore
。
除了将stdio
设置作为数组传递外,还可以通过传递以下快捷字符串值之一来实现某些常见分组
:
'ignore' = ['ignore', 'ignore', 'ignore']
'pipe' = ['pipe', 'pipe', 'pipe']
'inherit' = [process.stdin, process.stdout, process.stderr]
[0,1,2]
We have shown some examples of using spawn
to run Node programs as child processes. While this is a perfectly valid usage (and a good way to try out the API options), spawn
is primarily for running system commands. Refer to the discussion of fork
, as follows, for more information on running Node processes as children.
应该注意的是,生成任何系统进程的能力意味着可以使用 Node 运行安装在操作系统上的其他应用环境。如果安装了流行的 PHP 语言,则可以执行以下操作:
const spawn = require('child_process').spawn;
let php = spawn("php", ['-r', 'print "Hello from PHP!";']);
php.stdout.on('readable', () => {
let d;
while (d = this.read()) {
console.log(d.toString());
}
});
// Hello from PHP!
运行一个更有趣、更大的程序也同样容易。
除了使用这种技术可以轻松地通过 Node 异步运行 Java、Ruby 或其他程序之外,我们还可以很好地回答对 Node 的持续批评:JavaScript 在处理数字或执行其他 CPU 繁重任务方面不如其他语言快。这是真的,因为 Node 主要针对 I/O 效率进行优化,并帮助管理高并发性应用,而 JavaScript 是一种解释性语言,不太关注繁重的计算。
然而,使用spawn
,人们可以很容易地将分析引擎或计算引擎上的大量计算和长时间运行的例程转移到其他环境中的独立进程中。Node 的简单事件循环将确保在这些操作完成时通知主应用,从而无缝集成结果数据。同时,主应用可以自由地继续为客户端服务。
与spawn
类似,fork
启动子进程,但设计用于运行 Node 程序,具有内置通信通道的额外好处。与其将系统命令作为第一个参数传递给fork
,不如将路径传递给 Node 程序。与spawn
一样,命令行选项可以作为第二个参数发送,可通过分叉子进程中的process.argv
访问。
可选选项对象可以通过以下参数作为其第三个参数传递:
cwd
(字符串):默认情况下,命令会理解其当前工作目录与调用fork
的 Node 进程的工作目录相同。使用此指令更改该设置。env
(对象):用于将环境变量传递给子进程。参考繁殖。encoding
(字符串):设置通信信道的编码。execPath
(字符串):用于创建子进程的可执行文件。silent
(布尔值):默认情况下,分叉子对象的stdio
将与父对象的stdio
相关联(例如child.stdout
与parent.stdout
相同)。将此选项设置为 true 将禁用此行为。
fork
和spawn
之间的一个重要区别是前者的子进程在完成时不会自动退出。这样的孩子在完成时必须明确退出,通过process.exit()
可以轻松完成。
在下面的示例中,我们创建一个子系统,它每十分之一秒发出一个递增的数字,然后它的父系统将其转储到系统控制台。首先,让我们看一下儿童计划:
let cnt = 0;
setInterval(() => {
process.stdout.write(" -> " + cnt++);
}, 100);
同样,这将只是写一个稳步增长的数字。记住,使用fork
,子进程将继承其父进程的stdio
,我们只需创建子进程即可在运行父进程的终端中获得输出:
var fork = require('child_process').fork;
fork('./emitter.js');
// -> 0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> 10 ...
The silent option can be demonstrated here; fork('./emitter.js', [], { silent: true });
turns off any output to the Terminal.
创建多个并行进程很容易。让我们乘以创建的子对象数:
fork('./emitter.js');
fork('./emitter.js');
fork('./emitter.js');
// 0 -> 0 -> 0 -> 1 -> 1 -> 1 -> 2 -> 2 -> 2 -> 3 -> 3 -> 3 -> 4 ...
在这一点上应该很清楚,通过使用fork
,我们正在创建许多并行执行上下文,分布在所有机器核心上。
这很简单,但fork
内置的沟通渠道使与分叉儿童的沟通更加容易和干净。考虑下面的文件,它生成子进程并与之通信:
// parent.js
const fork = require('child_process').fork;
let cp = fork('./child.js');
cp.on('message', msgobj => {
console.log(`Parent got message: ${msgobj.text}`);
});
cp.send({
text: 'I love you'
});
我们看到现在有一个可用的通信通道,父进程可以通过该通道发送消息,也可以从子进程接收消息,如下所示:
// child.js
process.on('message', msgobj => {
console.log('Child got message:', msgobj.text);
process.send({
text: `${msgobj.text} too`
});
});
通过执行父脚本,我们将在控制台中看到以下内容:
Child got message: I love you
Parent got message: I love you too
稍后,我们将深入探讨跨流程通信的这一重要概念。
如果子进程的完整缓冲输出足够,不需要通过事件管理数据,child_process
提供exec
方法。该方法有三个参数:
- **命令:**一个命令行字符串。与通过数组将参数传递给命令的
spawn
和fork
不同,第一个参数接受完整的命令字符串,如ps aux | grep node
。 - **选项:**这是一个可选参数:
cwd
(字符串):设置命令流程的工作目录。env
(对象):这是将公开给子进程的键值对的映射。encoding
(字符串):这是孩子数据流的编码。默认值为'utf8'
。timeout
(数字):指定等待进程完成的毫秒数,此时子进程将被发送killSignal.maxBuffer
值。killSignal.maxBuffer
(数字):这是stdout
或stderr
上允许的最大字节数。当超过此数字时,进程将被终止。默认值为 200 KB。killSignal
(字符串):子进程在超时后收到此信号。此默认值为SIGTERM
。
- 回调:接收三个参数:
Error
对象,如果有,stdout
(包含结果的Buffer
对象),stderr
(包含错误数据的Buffer
对象,如果有)。如果进程被终止,Error.signal
将包含终止信号。
当您想要exec
的缓冲行为,但目标是 Node 文件时,请使用execFile
。重要的是,execFile
不会产生新的子 shell,这使得它的运行成本稍微降低。
ChildProcess
对象的所有实例都扩展了EventEmitter
,公开了对管理子数据连接有用的事件。此外,ChildProcess
对象还公开了一些与儿童直接互动的有用方法。现在让我们从属性和方法开始讨论这些问题:
child.connected
:当子系统通过child.disconnect()
与父系统断开连接时,该标志设置为false
。child.stdin
:这是一个WritableStream
对应于中的儿童标准。child.stdout
:这是一个ReadableStream
对应孩子的标准输出。child.stderr
:这是对应于孩子标准错误的ReadableStream
。child.pid
:这是一个整数,表示分配给子进程的进程 ID(PID)。child.kill
:尝试终止子进程,向其发送可选信号。如果未指定信号,则默认为SIGTERM
(有关信号的更多信息,请访问:https://en.wikipedia.org/wiki/Signal_(IPC)。虽然方法名听起来像是终端,但不能保证终止进程,它只向进程发送一个信号。危险的是,如果在已经退出的进程上尝试kill
,则新分配了死进程 PID 的另一进程可能会收到该信号,其后果无法确定。此方法应触发close
事件,该事件是用于关闭进程的信号。child.disconnect()
:此命令切断子级和父级之间的 IPC 连接。孩子将优雅地死去,因为它没有 IPC 通道让它活着。您也可以从孩子本身内部调用process.disconnect()
。一旦子 Node 断开连接,该子 Node 引用上的connected
标志将设置为false
。
正如我们在fork
的讨论中所看到的,当在spawn
上使用ipc
选项时,子进程可以通过child.send
发送消息,消息作为第一个参数传递。TCP 服务器或套接字句柄可以作为第二个参数随消息一起传递。通过这种方式,TCP 服务器可以跨多个子进程传播请求。例如,下面的服务器将套接字处理分配到多个子进程,这些子进程等于可用 CPU 的总数。每个分叉子级都有一个唯一的 ID,它在启动时报告该 ID。每当 TCP 服务器收到套接字时,该套接字将作为句柄传递给随机子进程:
// tcpparent.js
const fork = require('child_process').fork;
const net = require('net');
let children = [];
require('os').cpus().forEach((f, idx) => {
children.push(fork('./tcpchild.js', [idx]));
});
net.createServer((socket) => {
let rand = Math.floor(Math.random() * children.length);
children[rand].send(null, socket);
}).listen(8080)
然后,该子进程发送一个唯一的响应,表明套接字处理正在分发:
// tcpchild.js
let id = process.argv[2];
process.on('message', (n, socket) => {
socket.write(`child ${id} was your server today.\r\n`);
socket.end();
});
在终端窗口中启动父服务器。在另一个窗口中,运行telnet 127.0.0.1 8080
。您应该会看到类似于以下输出的内容,每个连接上都会显示一个随机的子 ID(假设存在多个核心):
Trying 127.0.0.1...
…
child 3 was your server today.
Connection closed by foreign host.
再打那个端点几次。您应该看到您的请求由不同的子进程提供服务。
许多开发人员将承担的任务之一是构建日志文件处理器。日志文件可能非常大,并且有很多兆字节长。在一个非常大的文件上运行的任何一个程序都很容易遇到内存问题,或者运行速度太慢。将一个大文件分块处理是有意义的。我们将构建一个简单的日志处理器,它将一个大文件分解为多个片段,并将一个片段分配给多个童工,并行运行它们。
本例的完整代码可以在代码包的logproc
文件夹中找到。我们将重点关注以下主要程序:
- 确定日志文件中的行数
- 把它们分成相等的块
- 为每个块创建一个子块并向其传递解析指令
- 集合并显示结果
为了得到我们文件的字数,我们使用带child.exec
的wc
命令,如下代码所示:
child.exec(`wc -l ${filename}`, function(e, fL) {
fileLength = parseInt(fL.replace(filename, ""));
let fileRanges = [];
let oStart = 1;
let oEnd = fileChunkLength;
while(oStart < fileLength) {
fileRanges.push({
offsetStart: oStart,
offsetEnd: oEnd
})
oStart = oEnd + 1;
oEnd = Math.min(oStart + fileChunkLength, fileLength);
}
...
}
假设我们使用了 500000 行的fileChunkLength
。这意味着将创建四个子进程,每个子进程将被告知处理文件中的 500000 行,例如 1 到 500000 行:
let w = child.fork('bin/worker');
w.send({
file: filename,
offsetStart: range.offsetStart,
offsetEnd: range.offsetEnd
});
w.on('message', chunkData => {
// pass results data on to a reducer.
});
这些工作者中的每一个人都将使用一个子进程来获取分配给他们的数据块,使用 Unix 本机流编辑器sed
:
process.on('message', (m) => {
let filename = m.file;
let sed = `sed -n '${m.offsetStart},${m.offsetEnd}p' ${filename}`;
let reader = require('child_process').exec(sed, {maxBuffer: 1024e6}, (err, data, stderr) => {
// Split the file chunk into lines and process it.
//
data = data.split("\n");
...
})
})
这里,我们正在执行sed –n '500001,1000001p' logfile.txt
命令,该命令提取给定范围的行并返回它们进行处理。一旦我们完成了对数据列的处理(相加等),该子级将其数据返回到主级(如前所述),数据结果将写入文件,或者进行其他操作,或者发送到stdout
,如以下输出所示:
本例的完整文件要长得多,但所有这些额外的代码仅仅是格式化和其他细节——我们所描述的 Node 子进程管理足以创建一个用于数字处理的并行系统,它将在几秒钟内处理数百万行代码。通过使用分布在更多内核上的更多进程,可以进一步降低日志解析速度。
View the README.MD
file in the /logproc
folder in your code bundle to experiment with this example.
正如我们在处理大型日志文件时看到的,许多子进程的主-父控制器模式正好适合在 Node 中进行垂直缩放。为此,Node API 增加了一个cluster
模块,该模块将此模式形式化,并有助于简化其实现。继续 Node 的核心目的,帮助更容易地构建可伸缩的网络软件,cluster 的特殊目标是促进多个孩子之间共享网络端口。
例如,以下代码创建了共享相同 HTTP 连接的工作进程的cluster
:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if(cluster.isMaster) {
for(let i = 0; i < numCPUs; i++) {
cluster.fork();
}
}
if(cluster.isWorker) {
http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from ${cluster.worker.id}`);
}).listen(8080);
}
我们将很快深入了解细节。现在,请注意,cluster.fork
采用了零参数。没有命令或文件参数的fork
做什么?在cluster
中,默认操作是对当前程序进行fork
。我们看到在cluster.isMaster
期间,操作是针对fork
子级(每个可用 CPU 一个)。当该程序在分叉上下文中重新执行时,cluster.isWorker
将被true
并启动在共享端口上运行的新 HTTP 服务器*。多个进程共享单个服务器的负载。*
启动并使用浏览器连接到此服务器。您将看到类似于Hello from 8
的内容,该整数对应于负责处理您的请求的工作者的唯一cluster.worker.id
值。所有工作人员之间的平衡将自动处理,因此刷新浏览器几次将导致显示不同的工作人员 ID。
稍后,我们将介绍一个跨集群共享套接字服务器的示例。现在,我们将展示集群 API,它分为两部分:集群主 Node 可用的方法、属性和事件,以及子 Node 可用的方法、属性和事件。由于此上下文中的工人是使用 fork 定义的,因此child_process
方法的文档也可以应用于此:
cluster.isMaster
:表示该流程是否为主流程的布尔值。cluster.isWorker
:这是一个布尔值,指示进程是否从主进程派生。cluster.worker
:这将包含对当前工作者对象的引用,仅对子进程可用。cluster.workers
:这是一个哈希,包含对所有活动工作对象的引用,由工作对象 ID 键入。使用此哈希循环所有工作对象。这只存在于主进程中。cluster.setupMaster([settings])
:这是传递默认参数映射的一种方便方法,在分支子级时使用。如果所有子级都将分叉同一个文件(通常情况下),则可以通过在此处设置来节省时间。可用的默认值如下所示:exec
(字符串):流程文件的文件路径,默认为__filename
。args
(数组):包含作为参数发送给子进程的字符串。默认情况下,获取带有process.argv.slice(2)
的参数。silent
(布尔):指定是否向主机的 stdio 发送输出,默认为 false。
cluster.fork([env])
:这将创建一个新的工作进程。只有主进程可以调用此方法。要向子进程环境公开键值对的映射,请向env
发送一个对象。cluster.disconnect([callback])
:用于终止集群中的所有工人。一旦所有工作进程都体面地死去,集群进程本身将终止,如果它没有更多的事件等待。要在所有儿童都已过期时收到通知,请通过callback
。
cluster 对象发出多个事件,如下所示:
fork
:当主人试图用叉子叉一个新孩子时,会触发此命令。这与online
不同。它接收一个worker
对象。online
:当主 Node 收到子 Node 已完全绑定的通知时触发。这与fork
事件不同,接收worker
对象。listening
:当工作进程执行需要listen()
调用的操作(如启动 HTTP 服务器)时,主进程将触发此事件。事件发出两个参数:worker
对象和包含连接的address
、port
和addressType
值的 address 对象。disconnect
:每当子进程断开连接时,就会调用它,这可以通过进程退出事件发生,也可以在调用child.kill()
后发生。这将在exit
事件之前触发,它们不一样。它接收一个worker
对象。exit
:每当一个孩子死亡,这个事件就会被触发。事件接收三个参数:worker
对象、退出代码编号和信号字符串,如SIGNUP
,这导致进程被终止。setup
:在cluster.setupMaster
执行后调用。
工人具有以下属性和方法:
worker.id
:分配给工人的唯一 ID,也代表工人在cluster.workers
索引中的密钥。worker.process
:指定引用工作者的ChildProcess
对象。worker.suicide
:最近拜访过kill
或disconnect
的员工的suicide
属性设置为true
。worker.send(message, [sendHandle])
:参见前面提到的child_process.fork()
。worker.kill([signal])
:这会杀死一名工人。船长可以检查该工人的自杀财产,以确定死亡是故意的还是意外的。发送的默认信号值为SIGTERM
。worker.disconnect()
:指示工作人员断开连接。重要的是,与工作者的现有连接不会立即终止(与kill
一样),而是允许在工作者完全断开之前正常退出。这是因为现有连接可能存在很长时间。定期检查工作人员是否实际断开了连接是一种很好的模式,可能是使用超时。
工作进程还发出事件,如以下列表中提到的事件:
message
:参见child_process.fork
online
:与cluster.online
相同,只是检查只针对指定的工人listening
:与cluster.listening
相同,只是检查只针对指定的工人disconnect
:与cluster.disconnect
相同,只是检查只针对指定的工人exit
:参见child_process
的exit
事件setup
:在cluster.setupMaster
执行后调用
现在,利用我们现在对cluster
模块的了解,让我们实现一个实时工具,用于分析多个用户同时与应用交互时发出的数据流。
PM2 旨在成为企业级流程管理器。正如其他地方所讨论的,Node 在 Unix 进程中运行,其子进程和集群模块用于生成进一步的进程,通常是在跨多个核心扩展应用时。PM2 可用于通过命令行和编程方式对 Node 进程的部署和监视进行仪表化。PM2 为开发人员节省了配置集群样板文件的复杂性,自动处理重新启动,并提供了现成的高级日志记录和监视工具。
全球安装 PM2:npm install pm2 -g
使用 PM2 最直接的方法是作为一个简单的流程运行程序。以下程序将每秒递增并记录一个值:
// script.js
let count = 1;
function loop() {
console.log(count++);
setTimeout(loop, 1000);
}
loop();
在这里,我们从script.js
派生一个新流程,在后台永远运行它,直到我们停止它。这是一种运行后台进程的好方法:
pm2 start script.js
// [PM2] Process script.js launched
一旦脚本启动,您应该会在终端中看到如下内容:
大多数值的含义应该很清楚,例如进程正在使用的内存量、是否联机、启动时间等(模式和监视字段将很快解释)。进程将继续运行,直到停止或删除。
要在启动流程时为其设置自定义名称,请将--name
参数传递给 PM2:pm2 start script.js --name 'myProcessName'
。
可通过命令pm2 list
随时显示所有正在运行的 PM2 过程的概览。
PM2 还提供了其他简单的命令:
pm2 stop <app_name | id | all>
:按名称、id 停止流程或停止所有流程。已停止的进程保留在进程列表中,以后可以重新启动。pm2 restart <app_name | id | all>
:重启流程。进程重新启动的次数显示在所有进程列表的“重新启动”下。要在进程达到某个最大内存限制(例如 15 米)时自动重新启动进程,请使用命令pm2 start script.js --max-memory-restart 15M
。pm2 delete <app_name | id | all>
:删除一个进程。无法重新启动此进程。pm2 全部删除删除所有 pm2 流程。pm2 info <app_name | id >
:提供流程的详细信息。
您将经常使用pm2 info <processname>
。使用PM2 list
确保script.js
作为 PM2 流程运行,然后使用pm2 info script
检查该流程信息:
请注意为错误和其他日志提供的路径。请记住,我们的脚本每秒递增一个整数,并记录该计数。如果您cat /path/to/script/out/log
您的终端将显示写入输出日志的内容,输出日志应为递增数字列表。错误同样会写入日志。此外,您还可以使用pm2 logs
实时流化输出日志:
要清除所有日志,请使用pm2 flush
。
您还可以通过编程方式使用 PM2。要复制我们使用 PM2 运行scripts.js
所采取的步骤,首先创建以下脚本programmatic.js
:
const pm2 = require('pm2');
pm2.connect(err => {
pm2.start('script.js', {
name: 'programmed script runner',
scriptArgs: [
'first',
'second',
'third'
],
execMode : 'fork_mode'
}, (err, proc) => {
if(err) {
throw new Error(err);
}
});
});
此脚本将使用 pm2 模块作为进程运行script.js
。继续并使用node programmatic.js
运行它。执行pm2 list
应显示已编程脚本运行程序处于活动状态:
为了确保这一点,请尝试pm2 logs
——您应该会看到数字正在递增,就像之前一样。您可以在此处阅读完整的编程选项集:http://pm2.keymetrics.io/docs/usage/pm2-api/ 。
PM2 使过程监控变得容易。要查看进程 CPU 和内存使用的实时统计信息,只需输入命令pm2 monit
:
很好,对吧?在通过 PM2 管理 Node 应用的生产服务器上,您可以使用此界面快速查看应用的状态,包括内存使用情况和运行日志。
PM2 还可以轻松创建基于 web 的监控界面——只需运行pm2 web
即可。此命令将启动监听端口 9615 的受监控进程——运行pm2 list
将列出名为pm2-http-interface
的进程。运行 web 命令,然后在浏览器中导航到localhost:9615
。您将看到作为 JSON 对象的进程、操作系统等的详细快照:
...
"monit": {
"loadavg": [ 1.89892578125, 1.91162109375, 1.896484375 ],
"total_mem": 17179869184, "free_mem": 8377733120,
...
{
"pid": 13352,
"name": "programmed script runner",
"pm2_env": {
"instance_var": "NODE_APP_INSTANCE",
"exec_mode": "fork_mode",
...
"pm_id": 8, // our script.js process "monit": {
"memory": 19619840, "cpu": 0
...
创建一个基于 web 的 UI,每隔几秒钟轮询一次服务器,获取流程信息,然后绘制图表。由于 PM2 的这一内置功能,它变得更加简单。PM2 还有一个选项,可以在所有托管脚本上设置一个观察者,这样,对被观察脚本的任何更改都将导致进程自动重启。这在开发时非常有用。
作为演示,让我们创建一个简单的 HTTP 服务器并通过 PM2 运行它:
// server.js
const http = require('http');
http.createServer((req, resp) => {
if(req.url === "/") {
resp.writeHead(200, {
'content-type' : 'text/plain'
});
return resp.end("Hello World");
}
resp.end();
}).listen(8080);
每当点击localhost:8080
时,此服务器将回显“Hello World”。现在,让我们使用 PM2 进程文件来进行更复杂的配置。
继续,用 PM2 delete all 终止所有正在运行的 PM2 进程。然后,创建以下process.json
文件:
// process.json
{
"apps" : [{
"name" : "server",
"script" : "./server.js",
"watch" : true,
"env": {
"NODE_ENV": "development"
},
"instances" : 4,
"exec_mode" : "cluster"
}]
}
我们将使用此部署定义在 PM2 上启动应用。请注意,应用是一个数组,这意味着您可以列出具有不同配置的多个不同应用,并同时启动它们。稍后我们将解释这些字段,但现在,请使用pm2 start process.json
执行此清单。您应该看到如下内容:
部署多进程(集群)应用就是这么简单。PM2 将自动平衡实例间的负载,通过instances
属性在清单中设置为 4 个 CPU,其中exec_mode
为集群(默认模式为“fork”)。在生产中,您可能希望平衡最大数量的内核,只需将instances
设置为0
即可对其进行标记。此外,您可以看到我们已经通过env:
设置了环境变量,您可以在这里为您的服务器创建dev和prod(甚至可能是stage)配置,设置 API 密钥和密码,以及其他环境变量。
打开浏览器并访问localhost:8080
查看服务器是否正在运行。注意,我们在 JSON 清单中将watch
设置为true
。这会告诉 PM2 在存储库中的任何文件发生更改时,跨所有核心自动重新启动应用。通过将服务器上的“Hello”消息更改为其他消息来测试它。如果您随后重新加载localhost:8080
,您将看到新消息,指示服务器已重新启动。如果列出正在运行的 PM2 进程,您将看到重新启动的次数:
试几次。重启是稳定、快速和自动的。
您还可以针对观察者的特定文件:
{
"apps" : [{
...
"watch": [
"tests/*.test",
"app"
],
"ignore_watch": [
"**/*.log"
],
"watch_options": {
"followSymlinks": false
},
...
}]
}
这里,我们告诉 PM2 只监视/test
中的.test
文件和/app
目录,忽略任何.log 文件中的更改。发动机罩下 PM2 使用 Chokidar(https://github.com/paulmillr/chokidar#api )监视文件更改,因此您可以通过在watch_options
上设置 Chokidar 选项来进一步配置监视程序。请注意,您可以在这些设置中使用全局表达式(和正则表达式)。
您可以在此处阅读 PM2 工艺文件的完整选项列表:http://pm2.keymetrics.io/docs/usage/application-declaration/ 。
需要注意的是:
max_restarts
:PM2 完全停止前允许的不稳定重启次数。min_uptime
:应用在被认为不稳定并触发重启之前的最短启动时间。autorestart
:崩溃时是否重新启动。node_args
:将命令行参数传递给 Node 进程本身。例如:node_args: "--harmony"
相当于node --harmony server.js
。max_memory_restart
:当内存使用量超过此阈值时,会重新启动。restart_delay
:特别是在watch
场景中,您可能希望延迟文件更改的重新启动,等待进一步的编辑后再做出反应。
多亏了 PM2,服务器应用的实时开发变得更加容易
利用我们所学到的知识,我们将构建一个多进程系统来跟踪示例网页的所有访问者的行为。这将由两个主要部分组成:一个 WebSocket 支持的客户端库,它将在用户每次移动鼠标时广播;一个管理界面,显示用户交互以及用户连接和断开与系统的连接时。我们的目标是展示如何设计一个更复杂的系统(比如跟踪和绘制用户每次点击、滑动或其他交互的图形)。
最终的管理界面将显示多个用户的活动图,如下所示:
由于该系统将跟踪所有用户每次鼠标移动的 X 和 Y 位置,我们将使用cluster
在所有可用的机器内核中传播该连续数据流,集群中的每个工作人员分担将大量套接字数据送入单个共享端口的负担。继续浏览本章的代码包,并按照/watcher
文件夹中的README.MD
说明进行操作
一个好的起点是设计模拟客户端页面,该页面只负责捕获所有鼠标移动事件,并通过WebSocket
将它们广播到集群套接字服务器。我们正在使用本机的WebSocket
实现;您可能希望使用库来处理较旧的浏览器(如Socket.IO
):
<head>
<script>
let connection = new WebSocket('ws://127.0.0.1:8081', ['json']);
connection.onopen = () => {
let userId = 'user' + Math.floor(Math.random()*10e10);
document.onmousemove = e => {
connection.send(JSON.stringify({id: userId, x: e.x, y: e.y}));
}
};
</script>
</head>
在这里,我们需要简单地打开基本的mousemove
跟踪,它将在每次移动时向我们的套接字广播用户鼠标的位置。此外,我们还发送了一个唯一的用户 ID,因为跟踪客户身份对我们以后来说很重要。请注意,在生产环境中,您可能希望实现更智能的唯一 ID 生成器,很可能是通过服务器端身份验证模块实现的。
为了让这些信息到达其他客户端,必须设置一个集中式套接字服务器。如前所述,我们希望此套接字服务器集群化。集群子进程(以下程序的每个副本)将处理客户端发送的鼠标数据:
const SServer = require('ws').Server;
let socketServer = new SServer({port: 8081});
socketServer.on('connection', socket => {
let lastMessage = null;
function kill() => {
if (lastMessage) {
process.send({kill: lastMessage.id});
}
}
socket.on('message', message => {
lastMessage = JSON.parse(message);
process.send(lastMessage);
});
socket.on('close', kill);
socket.on('error', kill);
});
In this demonstration, we are using Einar Otto Stangvik's very fast and well-designed socket server library, ws
, which is hosted on GitHub at: https://github.com/websockets/ws
谢天谢地,我们的代码仍然非常简单。我们有一个套接字服务器监听消息(请记住,客户端正在发送一个带有鼠标X和Y以及用户 ID 的对象)。最后,当接收到数据时(message
事件),我们将接收到的 JSON 解析为一个对象,并通过process.send
将其传递回集群主机。
还要注意,我们是如何存储最后一条消息(lastMessage
)的,这是出于记账的原因而完成的,例如,当连接终止时,我们需要将此连接上看到的最后一个用户 ID 传递给管理员。
捕捉客户端数据广播的片段现在已经设置好。收到此数据后,如何将其传递到前面所示的管理界面?
我们在设计这个系统时考虑到了可伸缩性,我们希望将数据收集与广播数据的系统分离。我们的 socket 服务器集群可以接受来自数千个客户端的恒定数据流,并且应该为此进行优化。换句话说,集群应该将鼠标活动数据广播的责任委托给另一个系统,甚至是其他服务器。
在下一章中,我们将介绍更高级的扩展和消息传递工具,如消息队列和 UDP 广播。出于本文的目的,我们只需创建一个 HTTP 服务器,负责管理来自管理员的连接并向他们广播鼠标活动更新。为此,我们将使用 SSE,因为从服务器到客户端的数据流只需要是单向的。
HTTP 服务器将为管理员登录实现一个非常基本的验证系统,以允许套接字集群向所有用户广播鼠标活动更新的方式保持成功的连接。它还将作为一个基本的静态文件服务器,在请求时发送客户端和管理 HTML,尽管我们将只关注它如何处理两条路由:admin/adminname;
和/receive/adminname
。一旦理解了服务器,我们将讨论套接字集群如何连接到它。
第一条路径/admin/adminname
-主要负责验证管理员登录,并确保这不是重复登录。一旦建立了该标识,我们就可以将 HTML 页面发送回管理界面。这里不讨论用于绘制前面所示图形的特定客户端代码。我们真正需要的是与服务器的 SSE 连接,这样界面的图形工具就可以接收鼠标活动的实时更新。返回的管理员页面上的某些 JavaScript 建立了这样的连接:
let ev = new EventSource('/receive/adminname');
ev.addEventListener("open", () => {
console.log("Connection opened");
});
ev.addEventListener("message", data => {
// Do something with mouse data, like graph it.
}
在我们的服务器上,我们实现了/receive/adminname
路由:
if (method === "receive") {
// Unknown admin; reject
if (!admins[adminId]) {
return response.end();
}
response.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
});
response.write(":" + Array(2049).join(" ") + "\n");
response.write("retry: 2000\n");
response.on("close", () => {
admins[adminId] = {};
});
setInterval(() => {
response.write("data: PING\n\n");
}, 15000);
admins[adminId].socket = response;
return;
}
此路由的主要目的是建立 SSE 连接并存储管理员的连接,以便我们以后可以向其广播。
现在,我们将添加将鼠标活动数据传递到可视化界面的片段。下一步是使用集群模块跨核心扩展此子系统。集群主机现在只需等待来自其服务子代的套接字的鼠标数据,如前所述。
我们将使用前面关于集群的讨论中提出的相同思想,只需在所有可用的 CPU 上分叉前面的套接字服务器代码:
if (cluster.isMaster) {
let i;
for (i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
})
// Set up socket worker listeners
Object.keys(cluster.workers).forEach(id => {
cluster.workers[id].on('message', msg => {
let a;
for (a in admins) {
if (admins[a].socket) {
admins[a].socket.write(`data: ${JSON.stringify(msg)}\n\n`);
}
}
});
});
鼠标活动数据通过套接字传输到集群工作者,并通过process.send
广播到前面描述的集群主机。在每个 worker 消息上,我们都会运行所有连接的管理员,并使用 SSE 将鼠标数据发送到他们的可视化界面。管理员现在可以监视客户机的到达和退出以及他们各自的活动级别。
要测试系统,请首先使用http://localhost:2112/admin/adminname
以默认管理员身份登录。您应该会看到一个蓝绿色的背景,因为没有连接的客户端,所以暂时为空。接下来,通过打开一个或多个浏览器窗口并导航到http://localhost:2112
,您将看到一个空白屏幕,从而创建一些客户端。在屏幕上随意移动鼠标。如果您返回到管理界面,您将看到您的鼠标移动(一个或多个客户端)正在被跟踪和绘制。
这是我们真正开始测试 Node 可伸缩性目标的第一章。在考虑了支持和反对关于并发性和并行性的不同思考方式的各种论点之后,我们理解了 Node 如何成功地保持线程和并行处理的优势,同时将所有这些复杂性包装在一个易于推理且健壮的并发模型中。
在深入研究了进程如何工作,特别是子进程如何相互通信,甚至产生更多的子进程之后,我们看了一些用例。通过一个如何将本机 Unix 命令进程与自定义 Node 进程无缝结合的示例,我们找到了一种处理大型文件的高效、直观的技术。然后,集群模块被应用于如何在多个工作者之间分担处理繁忙套接字的责任的问题,这种在进程之间共享套接字句柄的能力展示了 Node 设计的一个强大方面。我们还了解了生产级流程管理器 PM2,以及它如何使管理单个流程和集群变得更容易。
在了解了如何垂直扩展 Node 应用之后,我们现在可以研究跨许多系统和服务器的水平扩展。在下一章中,我们将学习如何将 Node 与第三方服务(如 Amazon 和 Twilio)连接,在代理后设置多个 Node 服务器,等等。