Skip to content

Latest commit

 

History

History
718 lines (553 loc) · 41.8 KB

File metadata and controls

718 lines (553 loc) · 41.8 KB

七、流——理解流和非阻塞 I/O

我们几乎涉及了所有帮助我们用 JavaScript 为服务器编写高性能代码的主题。 最后两个应该讨论的主题是流和数据格式。 虽然这两个主题可以携手并进(因为大多数数据格式都是通过读/写流实现的),但我们将在本章重点讨论流。

流使我们能够编写能够处理数据而不占用大量工作内存和不阻塞事件队列的系统。 对于那些按顺序阅读本书的读者来说,这听起来可能与生成器的概念很熟悉,而且这是正确的。 我们将重点关注 Node.js 提供的四种不同类型的流,并且我们可以轻松地扩展它们。 从这里,我们将了解如何使用内置生成器概念组合流和生成器来处理数据。

本章涵盖以下主题:

  • 流基础知识
  • 可读的流
  • 可写的流
  • 双相流
  • 变换流
  • Aside -生成器和流

技术要求

以下是本章的先决条件:

开始使用流

流是在无限数据集上工作的行为。 这并不意味着它是,但它意味着我们有可能拥有无限的数据源。 如果我们在处理数据的传统环境中思考,我们通常会经历三个主要步骤:

  1. 打开/获得对数据源的访问。
  2. 数据源完全加载后再进行处理。
  3. 将计算数据输出到另一个位置。

我们可以把这看作是输入和输出(I/O)的基础。 我们大多数的 I/O 概念都涉及批处理或处理所有或几乎所有的数据。 这意味着我们提前知道这些数据的极限。 我们可以确保有足够的内存、存储空间、计算能力等等来处理这个进程。 一旦我们完成了这个过程,我们就终止程序或者排队等待下一批数据。

下面是一个简单的例子,我们计算文件的行数:

import { readFileSync } from 'fs'
const count = readFileSync('./input.txt', {encoding : 'utf8'})
 .split(/\n|\r\n/g).length;
console.log('number of lines in our file is: ', count);

我们从fs模块中引入readFileSync方法,然后读取input.txt文件。 从这里开始,我们在\n\r\n上进行分割,这将为我们提供一个包含文件所有行的数组。 从那里,我们得到长度,并把它放在标准输出通道。 这看起来相当简单,而且似乎工作得很好。 对于小到中等长度的文件,这很好,但当文件变得异常大时,会发生什么呢? 让我们来看看。 转到https://loremipsum.io,输入 100 段。 复制这个,并将其粘贴到input.txt文件几次。 现在,当我们运行这个程序时,我们可以在任务管理器中看到内存使用量的激增。

我们将一个大约 3 MB 的文件加载到内存中,计算换行符的数量,然后将其打印出来。 这仍然是相当快的,但我们现在开始利用一个好的内存块。 让我们对这个文件做一些更复杂的事情。 我们将计算单词lorem在课文中出现的次数。 我们可以用下面的代码来实现:

import { readFileSync } from 'fs'
const file = readFileSync('./input.txt', {encoding : 'utf8'});
const re = /\slorem\s/gi;
const matches = file.match(re);

console.log('the number of matches is: ', matches.length);

同样,这个过程应该很快,但是在处理过程中应该有一些延迟。 虽然在这里使用正则表达式可能会给我们一些误报,但它确实表明我们对该文件进行批处理。 在许多情况下,当我们在高速环境中工作时,我们处理的文件可能接近或超过 1gb。 当我们进入这些类型的文件时,我们不希望将它们全部加载到内存中。 这就是流媒体发挥作用的地方。

Many systems that are considered big data are working with terabytes of data. While there are some in-memory applications that will store large amounts of data in memory, a good chunk of this type of data processing uses a mix of both streaming with files and using in-memory data sources to work with the data.

让我们举第一个例子。 我们正在从一个文件中读取数据,并试图计算文件中的行数。 我们可以寻找表示换行符的字符,而不是把行数作为一个整体来考虑。 我们在正则表达式中寻找的字符是换行符(\n)或回车加换行符(\r\n)。 考虑到这一点,我们应该能够构建一个流应用,它可以读取文件并计算行数,而不需要将文件完全加载到内存中。

This example presents the API for utilizing a stream. We will go over what each Stream API gives us and how we can utilize it for our purposes. For now, take the code examples and run them to see how these types of applications work.

这可以在下面的代码片段中看到:

import { createReadStream } from 'fs';

const newLine = 0x0A;
const readStream = createReadStream('./input.txt');
let counter = 1;
readStream.on('data', (chunk) => {
    for(const byte of chunk) {
        if( newLine === byte ) counter += 1;
    }
}).on('end', () => {
    console.log('number of line in our file is: ', counter);
});

我们从fs模块中获取Readable流并创建一个。 我们还为十六进制格式的换行符创建一个常量。 然后,我们侦听数据事件,以便在数据传入时进行处理。 然后,我们处理每个字节,看看它是否与换行符相同。 如果是,就换行,否则就继续搜索。 我们不需要显式地查找回车,因为我们知道它后面应该有一个换行符。

虽然这比将整个文件加载到内存中要慢,但它确实在我们处理数据时节省了相当多的内存。 这个方法的另一个优点是,这些都是事件。 在完整的处理示例中,我们将占用整个事件循环,直到完成处理。 在流中,我们有数据传入时的事件。 这意味着我们可以在同一个线程上同时运行多个流,而不必太担心阻塞(只要我们不在数据块的处理上花费太多时间)。

通过前面的例子,我们可以看到如何以流形式编写反例。 为了让大家明白这一点,我们就这样做吧。 它看起来应该如下所示:

const stream = createReadStream('./input.txt');
const buf = Buffer.from('lorem');
let found = 0;
let count = 0;
stream.on('data', (chunk) => {
    for(const byte of chunk) {
        if( byte === buf[found] ) {
            found += 1;
        } else {
            found = 0;
        }
        if( found === buf.byteLength ) {
            count += 1;
            found = 0;
        }
    }
}).on('end', () => {
    console.log('the number of matches is: ', count)
});

首先,像之前一样创建一个 readstream。 接下来,我们创建我们正在寻找的关键字的Buffer形式(处理原始字节比尝试将流转换为文本要快,即使 API 允许我们这样做)。 接下来,我们维护一个found计数和一个actual计数。 found计数将让我们知道我们是否找到了这个词; 另一个计数跟踪我们找到了多少lorem实例。 接下来,当一个数据块进入数据事件时,我们处理每个字节。 如果我们发现下一个字节不是我们正在寻找的字符,我们自动返回found计数到0(我们没有找到这个特定的文本字符串)。 在这个检查之后,我们将看到是否找到了完整的字节长度。 如果我们这样做,我们可以增加计数,并将found移回0。 我们将found计数器放在数据事件之外,因为我们以块的形式接收数据。 由于它是分块的,lorem的一部分可能出现在一个分块的末尾,lorem的另一部分可能出现在下一个分块的开头。 流结束后,输出计数。

现在,如果我们运行两个版本,我们会发现第一个版本实际上捕获了更多的lorem。 我们为正则表达式添加了大小写不敏感标志。 如果我们通过移除末尾的i来关闭这个功能,并且我们移除字符序列本身(字符序列周围的\s)的需要,我们将会看到我们得到相同的结果。 这个示例展示了写流如何比批处理版本更复杂一些,但它通常会导致更低的内存使用,有时代码更快。

虽然利用内置的流,如流内的zlibfs模块将使我们走得相当远,我们将看到我们如何可以成为我们自己的定制流的生产者。 我们将采用每一个并编写一个扩展流类型来处理我们在前一章中所做的数据帧。

For those that have forgotten or skipped to this chapter, we were framing all of our messages over a socket with the !!!BEGIN!!! and !!!END!!! tags to let us know when the full data had been streamed to us.

构建一个自定义的可读流

一个Readable流做它所声明的,它从流源读取。 它根据某些标准输出数据。 我们的示例是在 Node.js 文档中显示的简单示例的基础上进行的。

我们将以计算文本文件中lorem的数量为例,但我们将输出我们在文件中找到的lorem的位置:

  1. 从各自的模块中导入Readable类和createReadStream方法:
import { Readable } from 'stream'
import { createReadStream } from 'fs'
  1. 创建一个扩展Readable类的类,并设置一些私有变量来跟踪内部状态:
class LoremFinder extends Readable {
    #lorem = Buffer.from('lorem');
    #found = 0;
    #totalCount = 0;
    #startByteLoc = -1;
    #file = null;
}
  1. 添加一个构造函数,将我们的#file变量初始化为一个Readable流:
// inside our LoremFinder class
constructor(opts) {
    super(opts); 
    if(!opts.stream ) { 
        throw new Error("This stream needs a stream to be 
         provided!");
    }
    this.#file = opts.stream;
    this.#file.on('data', this.#data.bind(this)); // will add #data 
     method next
    this.#file.on('end', () => this.push(null)); 
}
  1. 基于构造函数,我们将使用一个#data私有变量,它将是一个函数。 我们将使用它从我们的#file流中读取,并检查lorem的位置:
// inside of the LoremFinder class
#data = function(chunk) {
    for(let i = 0; i < chunk.byteLength; i++) {
        const byte = chunk[i];
        if( byte === this.#lorem[this.#found] ) {
            if(!this.#found ) {
                this.#startByteLoc = this.#totalCount + i; 
            }
            this.#found += 1;
        } else {
            this.#found = 0;
        }
        if( this.#found === this.#lorem.byteLength ) {
            const buf = Buffer.alloc(4);
            buf.writeUInt32BE(this.#startByteLoc);
            this.push(buf);
            this.#found = 0;
        }
    }
    this.#totalCount += chunk.byteLength;
}

我们遍历每个字节并检查当前是否有我们在lorem单词中寻找的字节。 如果我们这样做,它是单词的l,那么我们设置位置#startByteLoc变量。 如果我们找到了整个单词,我们输出#startByteLoc,否则,我们重置查找变量并继续循环。 一旦我们完成了循环,我们将读取的字节数加到#totalCount中,并等待#data函数再次被调用。 为了结束我们的流并让其他人知道我们已经完全消耗了资源,我们输出一个null值。

  1. 我们添加的最后一个部分是_read方法。

这将通过Readable.read方法或连接一个数据事件来调用。 这就是我们如何确保像FileStream这样的原语流被消耗的方法:

// inside of the LoremFinder class
_read(size) {
    this.#file.resume();
}
  1. 现在我们可以添加一些测试代码来确保这个流正常工作:
const locs = new Set();
const loremFinder = new LoremFinder({
    stream : createReadStream('./input.txt')
});
loremFinder.on('data', (chunk) => {
    const num = chunk.readUInt32BE();
    locs.add(num);
});
loremFinder.on('end', () => {
    console.log('here are all of the locations:');
    for(const val of locs) {
        console.log('location: ', val);
    }
    console.log('number of lorems found is', locs.size);
});

通过所有这些概念,我们可以看到我们如何能够消费原始流,并能够用超集流包装它们。 现在我们有了这个流,我们可以使用管道接口并将它管道到一个Writable流中。 让我们把索引写到一个文件中。 为了做到这一点,我们可以做一些简单的事情,如loremFinder.pipe(writeable)

如果我们打开文件,我们会看到它只是一串随机数据。 这样做的原因是我们将所有的索引编码到 32 位缓冲区中。 如果我们想看到它们,我们可以稍微重写一下我们的流实现。 这个修改看起来像这样:this.push(this.#startByteLoc.toString() + "\r\n");

通过这个修改,我们现在可以查看output.txt文件并查看所有的索引。 我们应该开始意识到,编写流和通过管道将一个流传递到另一个流是多么的强大,如果我们只是通过管道将它们通过不同的阶段,代码将变得多么易读。

理解可读流接口

Readable流有一些我们可用的属性。 Node.js 文档中对所有这些都有解释,但我们主要感兴趣的是highWaterMarkobjectMode

highWaterMark允许我们在流声明不能再接收任何数据之前声明内部缓冲区应该容纳多少数据。 我们的实现的一个问题是,我们不处理暂停。 如果到达这个highWaterMark,流可以暂停。 虽然我们大多数时候并不担心它,但它可能会导致问题,并且通常是流实现者遇到问题的地方。 通过设置更高的highWaterMark,我们可以预防这些问题。 另一种处理方法是检查运行this.push的结果。 如果它返回true,那么我们就能够写入更多的数据到流,否则,我们应该暂停流,然后当我们从另一个流得到信号时恢复流。 流的默认highWaterMark大约是 16kb。

objectMode允许我们构建非Buffer基础的流。 当我们想要遍历对象列表时,这非常有用。 不使用for循环或map函数,我们可以设置一个管道系统,它可以通过流移动对象并在其上执行某种类型的操作。 我们也不局限于普通的旧对象,而是几乎所有数据类型,除了Buffer。 关于objectMode需要注意的一件事是它改变了highWaterMark的重要性。 它不是计算要在内部缓冲区中存储多少数据,而是计算将要存储的对象的数量,直到它暂停流为止。 默认为16,但如果需要,我们可以随时更改它。

在解释了这两个属性之后,我们应该讨论各种可用的内部方法。 对于每个流类型,有一个我们需要实现的方法,也有一个我们可以实现的方法。

对于Readable流,我们只需要实现_read方法。 这个方法给我们一个size参数,它是要从底层数据源读取的字节数。 我们并不总是需要注意这个数字,但是如果我们想用它来实现我们的流,它是可用的。

除了_read方法外,我们还需要使用push方法。 这就是将数据推入内部缓冲区并帮助发射数据事件的方法,就像我们前面看到的那样。 如前所述,push方法返回一个布尔值。 如果这个值是true,我们可以继续使用push,否则,我们应该停止推送数据,直到再次调用_read实现。

As stated previously, when first implementing a Readable stream, the return value can be ignored. However, if we notice that data is not flowing or data is getting lost, the usual culprit is that the push method returned false and we continued to try pushing data on our stream. Once this happens, we should implement pausing by stopping the use of the push method until _read gets called again.

可读接口的另外两个部分是_destroy方法和如何使我们的流出错,如果有什么东西通过,我们不能处理。 如果有任何低层次的资源需要我们放弃,就应该实施_destroy方法。

这可以是用fs.open命令打开的文件句柄,也可以是用net模块创建的套接字。 如果发生错误,我们也应该使用它来触发错误事件。

为了处理流中可能出现的错误,我们应该通过this.emit系统发出错误。 如果我们抛出一个错误,根据文档,它可能会导致意外的结果。 通过发出错误,我们让流的用户处理错误,并按照他们认为合适的方式处理它。

实现可读流

根据我们在这里学到的知识,让我们实现我们之前讨论过的框架系统。 从我们前面的示例中,我们应该很明显地可以处理这个问题。 我们将持有底层资源,在本例中是套接字。 从那里,我们将找到!!!BEGIN!!!缓冲,让它过去。 然后开始存储所保存的数据。 一旦我们到达!!!END!!!缓冲区,我们将推出数据块。

在这个例子中,我们保留了相当多的数据,但它展示了我们可能如何处理帧。 双工流将展示我们如何处理一个简单的协议。 示例如下:

  1. 导入Readable流并创建一个名为ReadMessagePassStream的类:
import { Readable } from 'stream';

class ReadMessagePassStream extends Readable {
}
  1. 添加一些私有变量来保存流的内部状态:
// inside of the ReadMessagePassStream class
#socket = null;
#bufBegin = Buffer.from("!!!START!!!");
#bufEnd = Buffer.from("!!!END!!!");
#internalBuffer = [];
#size = 0;
  1. 创建一个类似于前面的#data方法。 现在我们将寻找我们之前设置的#bufBegin#bufEnd的开始帧和结束帧缓冲区:
#data = function(chunk) {
    let i = -1 
    if((i = chunk.indexOf(this.#bufBegin)) !== -1) {
        const tempBuf = chunk.slice(i + this.#bufBegin.byteLength);
        this.#size += tempBuf.byteLength;            
        this.#internalBuffer.push(tempBuf);
    }
    else if((i = chunk.indexOf(this.#bufEnd)) !== -1) {
        const tempBuf = chunk.slice(0, i);
        this.#size += tempBuf.byteLength;
        this.#internalBuffer.push(tempBuf);
        const final = Buffer.concat(this.#internalBuffer);            
        this.#internalBuffer = [];
        if(!this.push(final)) { 
            this.#socket.pause();
        }
    } else {
        this.#size += chunk.byteLength;
        this.#internalBuffer.push(chunk);
    }
}
  1. 创建类的构造函数来初始化我们的私有变量:
constructor(options) {
    if( options.objectMode ) {
        options.objectMode = false //we don't want it on
    }
    super(options);
    if(!options.socket ) {
        throw "Need a socket to attach to!"
    }
    this.#socket = options.socket;
    this.#socket.on('data', this.#data.bind(this));
    this.#socket.on('end', () => this.push(null));
}

一个新的信息是objectMode属性,它可以被传递到我们的流中。 这允许我们的流读取对象而不是原始缓冲区。 在我们的情况下,我们不希望这种情况发生; 我们想利用原始数据。

  1. 添加_read方法,以确保我们的流将启动:
// inside the ReadMessagePassStream
_read(size) {
    this.#socket.resume();
}

有了这段代码,我们现在有了一种处理套接字的方法,而不必在主代码中侦听数据事件; 它现在被包裹在这个Readable流中。 在此之上,我们现在有能力将这个流输送到另一个流。 下面的测试代码显示了这一点:

import { createWriteStream } from 'fs';

const socket = createConnection(3333);
const write = createWriteStream('./output.txt');
const messageStream = new ReadMessagePassStream({ socket });
messageStream.pipe(write);

我们有一个服务器托管在本地主机上的端口3333。 我们创建一个write流并将任何数据从ReadMessagePassStream管道输送到该文件。 如果我们在测试工具中将它连接到服务器上,我们会注意到一个输出文件被创建,并且只保存我们通过它发送的数据,而不是帧代码。

The framing technique that we are utilizing is not always going to work. Just as it has been shown in the lorem example that our data can get chunked at any point, we could have our !!!START!!! and !!!END!!! end up on one of the chunk boundaries. If this happened, our streams would fail. There is additional code that we would need to handle these cases, but these examples should provide all the necessary ideas to implement the streaming code.

接下来,我们将看看Writable流接口。

构建可写流

Writable流是我们写入数据的流,它可以通过管道进入ReadableDuplexTransform流。 我们可以使用这些流以分块的方式写入数据,这样消费流就可以分块处理数据,而不是一次处理所有数据。 可写流的 API 非常类似于Readable流,除了我们可以使用的方法。

理解可写流接口

可写流给我们提供了几乎与Readable流相同的选项,所以我们不会深入研究它。 相反,我们将看一下可供我们使用的四种方法:一种是我们必须实现的方法,另一种是我们可以实现的方法:

  • _write方法允许我们执行任何类型的转换或我们需要的数据操作,并为我们提供使用回调的能力。 这个回调是写流能够接收更多数据的信号。

While not inherently true, it is what pops data off the internal buffer if there is any. However, for our purposes, it is best to think of the callback as a way to process more data.

我们可以利用它来包装一个更基本的流,并在主数据块之前或之后添加我们自己的数据。 我们将看到这与我们的Readable流的实际对应物。

  • _final方法允许我们在可写流关闭之前执行任何需要的操作。 这可能是对资源的清理或发送我们可能一直保存的任何数据。 我们通常不会实现这个方法,除非我们坚持文件描述符之类的东西。
  • _destroy方法与Readable流相同,应该类似于_final方法进行处理,除非我们可能在该方法上得到错误。
  • _writev方法为我们提供了同时处理多个块的能力。 我们可以实现这个,如果我们有一些块的排序系统或者我们不关心块进来的顺序。 虽然现在还不明显,但我们将在下次实现双工流时实现此方法。 用例可能会受到一些限制,但它仍然是有益的。

实现可写流

下面的Writable流实现展示了我们的帧方法,以及我们如何使用它将!!!START!!!!!!END!!!帧放在我们的数据上。 虽然很简单,但它确实展示了围绕原始流构建更复杂流的能力:

  1. 从流模块中导入Writable类,并为WriteMessagePassStream创建外壳。 将此设置为该文件的默认导出:
import { Writable } from 'stream';

export default class WriteMessagePassStream extends Writable {
}
  1. 添加私有状态变量和构造函数。 确保不允许objectMode通过,因为我们想要处理原始数据:
// inside the WriteMessagePassStream
#socket = null;
#writing = false;
constructor(options) {
  if( options.objectMode ) { 
        options.objectMode = false;
    }
    if(!options.socket ) {
        throw new Error("A socket is required to construct this 
         stream!");
    }
    super(options);
    this.#socket = options.socket;
}
  1. _write方法添加到我们的类中。 其解释如下:
_write(chunk, encoding, callback) { 
    if(!this.#writing ) {
        this.#writing = true;
        this.#socket.write("!!!START!!!");
    }
    let i = -1;
    let prevI = 0;
    let numCount = 0;
    while((i = chunk.indexOf([0x00], i)) !== -1) {
        const buf = chunk.slice(prevI, i);
        this.#socket.write(buf);
        this.#socket.write("!!!END!!!");
        if( i !== chunk.byteLength - 1 ) {
            this.#socket.write("!!!START!!!");
        } else {
            return callback();
        }
        numCount += 1;
    }
    if(!numCount ) {
        this.#socket.write(chunk);
    }
    return callback();
}

通过这段代码,我们可以看到一些与处理可读端类似的地方。 一些值得注意的例外包括以下项目:

  • 我们实现了_write方法。 我们再次忽略了这个函数的编码参数,但是我们应该检查它,以防我们得到一个我们不期望的编码。 数据块是正在写入的数据,而回调是当我们完成这个数据块的写入时调用的。
  • 由于我们正在包装套接字,而且我们不想在发送完数据后杀死它,所以我们需要向流发送某种类型的停止信号。 在我们的例子中,我们使用简单的0x00字节。 在一个更健壮的实现中,我们会使用其他东西,但现在应该可以工作。
  • 无论如何,我们要么使用帧,要么直接写入底层套接字。
  • 一旦我们完成了处理,就调用回调函数。 在我们的例子中,如果我们设置了writing标志,这意味着我们仍然在帧中,我们想要提前返回,否则,我们想要将流设置为写入模式,然后写出!!!START!!!和 chunk。 同样,如果我们从未使用回调函数,流将被无限暂停。 回调函数告诉内部机制从内部缓冲区提取更多数据供我们使用。

有了这段代码,我们现在可以看看测试工具,以及我们如何利用它来创建一个服务器,并处理传入的Readable流来实现帧上下文:

import { createServer } from 'net'
import WrappedWritableStream from '../writable/main.js'
const server = createServer((con) => {
 console.log('client connected. sending test data');
 const wrapped = new WrappedWritableStream({ socket : con });
 for(let i = 0; i < 100000; i++) {
 wrapped.write(`data${i}\r\n`);
 }
 wrapped.write(Buffer.from([0x00]));
 wrapped.end();
 console.log('finished sending test data');
});
server.listen(3333);

我们创建一个服务器,并在端口3333上监听本地主机。 每当我们收到一个连接,我们用我们的Writable流包装它。 然后我们发送一串测试数据,一旦完成,我们写出0x00信号,告诉流这一帧已经完成,然后我们调用end方法,说我们完成了这个套接字。 如果我们在第一次测试之后添加另一次测试,我们就可以看到框架系统是如何工作的。 让我们继续这样做。 在wrapped.write(Buffer.from([0x00]))后面添加如下代码:

for(let i = 0; i < 100000; i++) {
    wrapped.write(`more_data${i}\r\n`);
}
wrapped.write(Buffer.from([0x00]));

If we ever hit the highWaterMark of our stream, the write stream will pause until the read stream has started to consume from it.

如果我们现在使用以前的Readable流运行测试工具,我们将看到我们正在处理所有这些数据并将其写入到文件中,而没有通过任何帧。 有了这两个流实现,我们现在可以通过自定义帧选项在套接字之间传输数据了。 现在我们可以使用这个系统来实现前一章中的数据传递系统。 然而,我们将实现一个Duplex流,它将改进这个系统,并允许我们处理多个可写块,这是我们将在下一节看到的。

实现双工流

双工流就是这样,一个双向工作的流。 它将一个ReadableWritable流组合成一个单一的接口。 有了这种类型的流,我们现在可以从套接字管道到我们的自定义流,而不是像我们以前那样包装流(尽管我们仍然将它实现为包装流)。

除了一个将新来者绊倒到流类型的事实外,没有更多的关于Duplex流的讨论。 有两个单独的缓冲区:一个用于Readable,一个用于Writable。 我们需要确保将它们视为单独的实例。 这意味着我们在_read方法中使用的变量,不应该被用于_write_writev方法的实现,否则,我们可能会遇到错误。

如前所述,下面的代码实现了一个带有计数机制的Duplex流,这样我们就可以利用_writev方法。 正如在理解可写流接口小节中提到的,_writev方法允许我们一次处理多个数据块:

  1. stream模块中导入Duplex类,并为MessageTranslator类添加外壳。 出口这类:
import { Duplex } from 'stream';

export default class MessageTranslator extends Duplex {
}
  1. 添加所有的内部状态变量。 下文将对每一项进行解释:
// inside the MessageTranslator class
#socket = null;
#internalWriteBuf = new Map();
#internalReadHoldBuf = [];
#internalPacketNum = 0;
#readSize = 0;
#writeCounter = 0;
  1. 添加类的构造函数。 我们将在这个构造函数内部处理#socket的数据事件,而不是像过去那样创建另一个方法:
// inside the MessageTranslator class
constructor(opts) {
    if(!opts.socket ) {
        throw new Error("MessageTranslator stream needs a 
         socket!");
    }
    super(opts);
    this.#socket = opts.socket;
    // we are assuming a single message for each chunk
    this.#socket.on('data', (chunk) => {
        if(!this.#readSize ) {
            this.#internalPacketNum = chunk.readInt32BE();
            this.#readSize = chunk.readInt32BE(4);
            this.#internalReadHoldBuf.push(chunk.slice(8));
            this.#readSize -= chunk.byteLength - 8
        } else {
            this.#internalReadHoldBuf.push(chunk);
            this.#readSize -= chunk.byteLength;
        }
        // reached end of message
        if(!this.#readSize ) {
            this.push(Buffer.concat(this.#internalReadHoldBuf));
            this.#internalReadHoldBuf = [];
        }
    });
}

我们会自动假定每个区块只有一条消息。 这使得处理更加容易。 当我们获得数据时,我们将读入包号,它应该是数据的前四个字节。 然后我们读取消息的大小,这是下一个4字节的数据。 最后,我们将其余的数据压入内部缓冲区。 一旦我们读完了整个消息,我们将把所有内部块放在一起并把它们推出去。 最后,我们将重置内部缓冲区。

  1. _writev_write方法添加到我们的类中。 记住,_writev方法用于多个块,所以我们必须循环遍历它们,并将每个块写出来:
// inside the MessageTranslator class
_writev(chunks, cb) { 
    for(const chunk of chunks) {
        this.#processChunkHelper(chunk); //shown next
    }
    this.#writeHelper(cb); //shown next
}
_write(chunk, encoding, cb) {
    this.#processChunkHelper(chunk); //shown next
    this.#writeHelper(cb); //shown next
}
  1. 添加帮助器方法来处理块并实际将它们写出来。 我们将利用数字-1作为一个4字节的消息来表示我们已经完成了这个消息:
// inside the MessageTranslator class
#processChunkHelper = function(chunk) {
    if(chunk.readInt32BE() === -1) { 
        this.#internalWriteBuf.get(this.#writeCounter).done = true;
        this.#writeCounter += 1;
        this.#internalWriteBuf.set(this.#writeCounter, {buf : [], 
         done : false});
    } else {
        if(!this.#internalWriteBuf.has(this.#writeCounter)) {
            this.#internalWriteBuf.set(this.#writeCounter, {buf : 
             [], done : false}); }
            this.#internalWriteBuf.get(this.#writeCounter)
             .buf.push(chunk);
        }
    }
}
#writeHelper = function(cb) {
    const writeOut = [];
    for(const [key, val] of this.#internalWriteBuf) { 
        if( val.done ) {
            const cBuf = Buffer.allocUnsafe(4);
            const valBuf = Buffer.concat(val.buf);
            const sizeBuf = Buffer.allocUnsafe(4);
            cBuf.writeInt32BE(valBuf.readInt32BE());
            sizeBuf.writeInt32BE(valBuf.byteLength - 4);
            writeOut.push(Buffer.concat([cBuf, sizeBuf, 
             valBuf.slice(4)]));
            val.buf = [];
        }
    }
    if( writeOut.length ) {
        this.#socket.write(Buffer.concat(writeOut));
    }
    cb();
}

我们的#processChunkHelper方法检查我们是否击中了神奇的-1``4字节消息,以表示我们已经完成了消息的编写。 如果不这样做,则继续向内部缓冲区(数组)添加内容。 一旦我们到达终点,我们将把所有的数据放在一起,然后移动到下一个数据包。

我们的#writeHelper方法将遍历所有这些包,并检查是否有任何包已经完成。 如果是,它将获得包号、缓冲区的大小、数据本身,并将它们连接在一起。 一旦它这样做了,它将重置内部缓冲区,以确保我们没有内存泄漏。 我们将把所有这些数据写入套接字,然后调用回调函数以表示写操作已经完成。

  1. 完成Duplex流,实现我们之前的_read方法。 _final方法应该只调用回调函数,因为没有处理剩余:
// inside the MessageTranslator class
_read() {
    this.#socket.resume();
}
_final(cb) {
    cb(); // nothing to do since it all should be consumed at this 
          // point
}

_writev应该在顺序无关紧要的情况下使用,我们只是在处理数据,并可能将其转换成其他东西。 这可能是一个哈希算法或类似的东西。 在几乎所有的情况下,应该使用_write方法。

虽然这个实现有相当多的缺陷(一个是,我们不寻找可能的其他包,如果我们达到-1数字),它确实展示了我们如何构建一个Duplex流,以及另一种处理消息的方式。 我们不建议提出自己的跨套接字移动数据的方案(我们将在下一章看到),但如果出现了新的规范,我们总是可以利用Duplex套接字为其编写。

如果我们用测试工具测试这个实现,我们应该得到一个名为output.txt的文件,该文件具有写了 100,000 次的双工加上数字消息,加上一个末尾的行结束字符。 同样,一个Duplex流只是一个单独的ReadableWritable流放在一起,应该在实现数据传输协议时使用。

我们要看的最后一个流是Transform流。

实现转换流

在四个流中,这可能是组中最有用的,也可能是最常用的流。 流连接流的可读和可写部分,并允许我们操作遇到它的数据。 这听起来类似于Duplex。 嗯,一个Transform流是一种特殊的Duplex流!

Transform流的内置实现包括zlib模块中实现的任何流。 其基本思想是,我们不只是试图将信息从一端传递到另一端; 我们试图操纵这些数据,把它变成其他东西。 这就是zlib信息流给我们的。 他们压缩和解压数据。 流将数据转换成另一种形式。 这也意味着我们可以将转换流设置为单向转换; 从转换流输出的任何内容都不能撤消。 我们将在这里创建其中一个Transform流,特别是创建字符串的散列。

首先,让我们浏览一下Transform流的接口。

理解转换流接口

我们可以访问两个我们总是想要实现的方法。 一种是让我们访问底层数据块,并允许我们在其上执行转换。 我们用_transform方法实现这个。 它有三个参数:我们正在处理的数据块、编码和让底层系统知道我们准备好处理更多信息的回调。

一种特殊的回调函数,不同于_write``Writable流回调是我们可以将数据传递给它发出Transform的数据可读一边流,或者我们可以通过什么信号,我们需要处理更多的数据。 这允许我们只在需要时发送数据事件,而不是总是需要发送它们。

另一种方法是_flush方法。 这使我们能够完成对我们可能仍然持有的任何数据的任何处理。 或者,它允许我们在所有数据被发送到流之后输出。 这就是我们要用字符串哈希函数实现的。

实现转换流

我们的Transform流将接收字符串数据,并继续运行哈希算法。 一旦完成,它将输出计算出来的最终散列。 哈希函数是指我们以某种形式输入和输出一段唯一的数据。 这个唯一的数据(在我们的例子中是一个数字)不应该容易发生碰撞。 碰撞是指两个不同的值可以得到完全相同的散列值。 在我们的例子中,我们正在用 JavaScript 将字符串转换为 32 位整数,因此冲突的几率很小,但并非不可能。

示例如下:

// implemented in stream form from 
// https://stackoverflow.com/questions/7616461/generate-a-hash-from-string-in-javascript
export default class StreamHashCreator extends Transform {
    #currHash = 0; 
    constructor(options={}) {
        if( options.objectMode ) {
            throw new Error("This stream does not support object mode!");
        }
        options.decodeStrings = true;
        super(options);
    }
    _transform(chunk, encoding, callback) {
        if( Buffer.isBuffer(chunk) ) { 
            const str = chunk.toString('utf8');
            for(let i = 0; i < str.length; i++) {
                const char = str.charCodeAt(i);
                this.#currHash = ((this.#currHash << 5) - this.#currHash ) 
                 + char;
                this.#currHash |= 0;
            }
        }
        callback(); 
    }
    _flush(callback) {
        const buf = Buffer.alloc(4);
        buf.writeInt32BE(this.#currHash);
        this.push(buf); 
        callback(null);
    }
}

前一个流的每个函数都在下面解释:

  1. 在流被销毁之前,我们唯一需要持久保存的是当前哈希码。 这将允许哈希函数跟踪我们已经传递给它的内容,并在每次写入后清除数据。

  2. 我们在这里检查接收到的数据块是否为Buffer。 由于我们确保打开了decodeStrings选项,这意味着我们应该总是获得缓冲区,但它仍然有助于检查。

  3. 虽然哈希函数的内容可以在 URL 中看到,但我们唯一需要担心的主要事情是调用我们的回调,就像我们在实现我们的Writable流时必须做的那样。

  4. 一旦我们准备好生成数据,我们利用push方法,就像我们对Readable流所做的那样。 请记住,Transform流只是特殊的Duplex流,它允许我们操作正在输入的数据,并将其更改为输出数据。 我们也可以改变最后两行代码,只做callback(null, buf); 这只是我们之前看到的简写。

现在,如果我们在之前的代码上运行一些测试用例,我们会看到我们确实为我们输入的每个唯一字符串得到一个唯一的哈希码,但是当我们输入完全相同的东西时,我们得到相同的哈希码。 这意味着我们的哈希函数是好的,我们可以将它连接到流应用。

对流使用生成器

到目前为止,我们看到的所有内容都展示了如何利用 Node.js 中的所有内置系统来创建流应用。 然而,对于那些在书中顺序跟随的人,我们已经讨论了生成器。 那些热衷于思考它们的人会注意到流和生成器之间的强烈相关性。 事实就是这样! 我们可以利用生成器挂钩到流 API。

有了这个概念,我们可以构建既能在浏览器中工作,又能在 Node.js 中工作的生成器,而不会有太多的开销。 我们甚至已经在第 6 章、消息传递-学习不同类型中看到,我们如何获取底层的 Fetch API 流。 现在,我们可以编写一个可以同时使用这两个子系统的生成器。

现在,让我们看看一个async生成器的例子,以及我们如何将它们连接到 Node.js 流系统中。 这个例子将会看到我们如何将生成器作为一个Readable流的输入:

  1. 我们将设置一个Readable流来读出英语字母表中的 26 个小写字母。 我们可以通过编写以下生成器来轻松做到这一点:
function* handleData() {
    let _char = 97;
    while(_char < 123 ) { //char code of 'z'
        yield String.fromCharCode(_char++);
    }
}
  1. 当字符代码低于123时,我们继续发送数据。 然后我们可以将其封装在一个Readable流中,像这样:
const readable = Readable.from(handleData());
readable.on('data', (chunk) => {
    console.log(chunk);
});

如果我们现在运行这段代码,我们将看到字符az出现在控制台中。 Readable流知道它已经结束了,因为生成器生成了一个具有两个键的对象。 value字段给出来自yield表达式的值,done告诉我们生成器是否已经完成运行。

这让可读接口知道什么时候发送data事件(通过我们生成一个值),什么时候关闭流(通过将done键设置为true)。 我们还可以通过管道将可读系统的输出导入可写系统的输出,从而将进程链接起来。 这可以很容易地通过下面的代码看到:

(async() => {
    const readable2 = Readable.from(grabData());
    const tempFile = createWriteStream('./temp.txt');
    readable2.pipe(tempFile);
    await once(tempFile, 'finish');
    console.log('all done');
})();

Implementing streams through generators and async/await may seem like a good idea, but we should only utilize this if we are trying to put an already async/await piece of code with a stream. Always try to go for readability; utilizing the generator or async/await method will most likely lead to something unreadable.

在前面的例子中,我们结合了生成器中的可读代码,并利用管道机制将其发送到文件中。 随着async/await和生成器成为 JavaScript 语言中的构造,流成为一流概念的时间不会太长了。

总结

流是编写高性能 Node.js 代码的支柱之一。 它允许我们不阻塞主线程,同时仍然能够处理数据。 流 API 允许我们根据自己的目的编写不同类型的流。 虽然这些流中的大多数将以转换流的形式出现,但我们很高兴看到如何实现其他三个流。

下一章的最后一个主题是数据格式。 处理除了 JSON 之外的不同数据格式将允许我们与许多大数据提供商进行交互,并能够处理他们喜欢使用的数据格式。 我们将看到他们如何利用流来实现所有的格式规范。