diff --git a/.travis.yml b/.travis.yml index b00d6e695..579ad5ac9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,10 +2,13 @@ language: node_js dist: bionic before_script: | + yarn build node packages/pg/script/create-test-tables.js postgresql:/// env: - CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres + # test w/ new faster parsing code + - CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres PG_FAST_CONNECTION=true node_js: - lts/dubnium @@ -30,6 +33,7 @@ matrix: -e '/^host/ s/trust$/md5/' \ /etc/postgresql/10/main/pg_hba.conf sudo -u postgres psql -c "ALTER ROLE postgres PASSWORD 'test-password'; SELECT pg_reload_conf()" + yarn build node packages/pg/script/create-test-tables.js postgresql:/// - node_js: lts/carbon diff --git a/package.json b/package.json index ce7f9f3b8..03e3827e1 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,8 @@ ], "scripts": { "test": "yarn lerna exec yarn test", + "build": "yarn lerna exec --scope pg-packet-stream yarn build", + "pretest": "yarn build", "lint": "yarn lerna exec --parallel yarn lint" }, "devDependencies": { diff --git a/packages/pg-packet-stream/package.json b/packages/pg-packet-stream/package.json index 9cc325274..bf9c13e84 100644 --- a/packages/pg-packet-stream/package.json +++ b/packages/pg-packet-stream/package.json @@ -16,7 +16,9 @@ }, "scripts": { "test": "mocha dist/**/*.test.js", - "prepublish": "tsc", - "pretest": "tsc" + "build": "tsc", + "build:watch": "tsc --watch", + "prepublish": "yarn build", + "pretest": "yarn build" } } diff --git a/packages/pg-packet-stream/src/inbound-parser.test.ts b/packages/pg-packet-stream/src/inbound-parser.test.ts index 098f41242..e8619bf83 100644 --- a/packages/pg-packet-stream/src/inbound-parser.test.ts +++ b/packages/pg-packet-stream/src/inbound-parser.test.ts @@ -1,8 +1,9 @@ import buffers from './testing/test-buffers' import BufferList from './testing/buffer-list' -import { PgPacketStream } from './' +import { parse } from './' import assert from 'assert' -import { Readable } from 'stream' +import { PassThrough } from 'stream' +import { BackendMessage } from './messages' var authOkBuffer = buffers.authenticationOk() var paramStatusBuffer = buffers.parameterStatus('client_encoding', 'UTF8') @@ -137,25 +138,14 @@ var expectedTwoRowMessage = { }] } -const concat = (stream: Readable): Promise => { - return new Promise((resolve) => { - const results: any[] = [] - stream.on('data', item => results.push(item)) - stream.on('end', () => resolve(results)) - }) -} - var testForMessage = function (buffer: Buffer, expectedMessage: any) { it('recieves and parses ' + expectedMessage.name, async () => { - const parser = new PgPacketStream(); - parser.write(buffer); - parser.end(); - const [lastMessage] = await concat(parser); + const messages = await parseBuffers([buffer]) + const [lastMessage] = messages; for (const key in expectedMessage) { - assert.deepEqual(lastMessage[key], expectedMessage[key]) + assert.deepEqual((lastMessage as any)[key], expectedMessage[key]) } - }) } @@ -197,6 +187,19 @@ var expectedNotificationResponseMessage = { payload: 'boom' } + + +const parseBuffers = async (buffers: Buffer[]): Promise => { + const stream = new PassThrough(); + for (const buffer of buffers) { + stream.write(buffer); + } + stream.end() + const msgs: BackendMessage[] = [] + await parse(stream, (msg) => msgs.push(msg)) + return msgs +} + describe('PgPacketStream', function () { testForMessage(authOkBuffer, expectedAuthenticationOkayMessage) testForMessage(plainPasswordBuffer, expectedPlainPasswordMessage) @@ -391,18 +394,9 @@ describe('PgPacketStream', function () { describe('split buffer, single message parsing', function () { var fullBuffer = buffers.dataRow([null, 'bang', 'zug zug', null, '!']) - const parse = async (buffers: Buffer[]): Promise => { - const parser = new PgPacketStream(); - for (const buffer of buffers) { - parser.write(buffer); - } - parser.end() - const [msg] = await concat(parser) - return msg; - } - it('parses when full buffer comes in', async function () { - const message = await parse([fullBuffer]); + const messages = await parseBuffers([fullBuffer]); + const message = messages[0] as any assert.equal(message.fields.length, 5) assert.equal(message.fields[0], null) assert.equal(message.fields[1], 'bang') @@ -416,7 +410,8 @@ describe('PgPacketStream', function () { var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length) fullBuffer.copy(firstBuffer, 0, 0) fullBuffer.copy(secondBuffer, 0, firstBuffer.length) - const message = await parse([firstBuffer, secondBuffer]); + const messages = await parseBuffers([fullBuffer]); + const message = messages[0] as any assert.equal(message.fields.length, 5) assert.equal(message.fields[0], null) assert.equal(message.fields[1], 'bang') @@ -447,15 +442,6 @@ describe('PgPacketStream', function () { dataRowBuffer.copy(fullBuffer, 0, 0) readyForQueryBuffer.copy(fullBuffer, dataRowBuffer.length, 0) - const parse = (buffers: Buffer[]): Promise => { - const parser = new PgPacketStream(); - for (const buffer of buffers) { - parser.write(buffer); - } - parser.end() - return concat(parser) - } - var verifyMessages = function (messages: any[]) { assert.strictEqual(messages.length, 2) assert.deepEqual(messages[0], { @@ -473,7 +459,7 @@ describe('PgPacketStream', function () { } // sanity check it('recieves both messages when packet is not split', async function () { - const messages = await parse([fullBuffer]) + const messages = await parseBuffers([fullBuffer]) verifyMessages(messages) }) @@ -482,7 +468,7 @@ describe('PgPacketStream', function () { var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length) fullBuffer.copy(firstBuffer, 0, 0) fullBuffer.copy(secondBuffer, 0, firstBuffer.length) - const messages = await parse([firstBuffer, secondBuffer]) + const messages = await parseBuffers([firstBuffer, secondBuffer]) verifyMessages(messages) } diff --git a/packages/pg-packet-stream/src/index.ts b/packages/pg-packet-stream/src/index.ts index 2bd2da69c..3ebe5e847 100644 --- a/packages/pg-packet-stream/src/index.ts +++ b/packages/pg-packet-stream/src/index.ts @@ -1,5 +1,5 @@ -import { Transform, TransformCallback, TransformOptions } from 'stream'; -import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password } from './messages'; +import { TransformOptions } from 'stream'; +import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password, NoticeMessage } from './messages'; import { BufferReader } from './BufferReader'; import assert from 'assert' @@ -46,23 +46,27 @@ const enum MessageCodes { CopyData = 0x64, // d } -export class PgPacketStream extends Transform { +type MessageCallback = (msg: BackendMessage) => void; + +export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise { + const parser = new PgPacketParser() + stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback)) + return new Promise((resolve) => stream.on('end', () => resolve())) +} + +class PgPacketParser { private remainingBuffer: Buffer = emptyBuffer; private reader = new BufferReader(); private mode: Mode; constructor(opts?: StreamOptions) { - super({ - ...opts, - readableObjectMode: true - }) if (opts?.mode === 'binary') { throw new Error('Binary mode not supported yet') } this.mode = opts?.mode || 'text'; } - public _transform(buffer: Buffer, encoding: string, callback: TransformCallback) { + public parse(buffer: Buffer, callback: MessageCallback) { let combinedBuffer = buffer; if (this.remainingBuffer.byteLength) { combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength); @@ -81,7 +85,7 @@ export class PgPacketStream extends Transform { if (fullMessageLength + offset <= combinedBuffer.byteLength) { const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer); - this.push(message) + callback(message) offset += fullMessageLength; } else { break; @@ -94,7 +98,6 @@ export class PgPacketStream extends Transform { this.remainingBuffer = combinedBuffer.slice(offset) } - callback(null); } private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage { @@ -146,10 +149,6 @@ export class PgPacketStream extends Transform { } } - public _flush(callback: TransformCallback) { - this._transform(Buffer.alloc(0), 'utf-8', callback) - } - private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) { this.reader.setBuffer(offset, bytes); const status = this.reader.string(1); @@ -304,8 +303,9 @@ export class PgPacketStream extends Transform { fieldType = this.reader.string(1) } - // the msg is an Error instance - var message = new DatabaseError(fields.M, length, name) + const messageValue = fields.M + + const message = name === MessageName.notice ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name) message.severity = fields.S message.code = fields.C diff --git a/packages/pg-packet-stream/src/messages.ts b/packages/pg-packet-stream/src/messages.ts index 160eb3ffb..222a24902 100644 --- a/packages/pg-packet-stream/src/messages.ts +++ b/packages/pg-packet-stream/src/messages.ts @@ -74,7 +74,27 @@ export const copyDone: BackendMessage = { length: 4, } -export class DatabaseError extends Error { +interface NoticeOrError { + message: string | undefined; + severity: string | undefined; + code: string | undefined; + detail: string | undefined; + hint: string | undefined; + position: string | undefined; + internalPosition: string | undefined; + internalQuery: string | undefined; + where: string | undefined; + schema: string | undefined; + table: string | undefined; + column: string | undefined; + dataType: string | undefined; + constraint: string | undefined; + file: string | undefined; + line: string | undefined; + routine: string | undefined; +} + +export class DatabaseError extends Error implements NoticeOrError { public severity: string | undefined; public code: string | undefined; public detail: string | undefined; @@ -167,3 +187,24 @@ export class DataRowMessage { this.fieldCount = fields.length; } } + +export class NoticeMessage implements BackendMessage, NoticeOrError { + constructor(public readonly length: number, public readonly message: string | undefined) {} + public readonly name = MessageName.notice; + public severity: string | undefined; + public code: string | undefined; + public detail: string | undefined; + public hint: string | undefined; + public position: string | undefined; + public internalPosition: string | undefined; + public internalQuery: string | undefined; + public where: string | undefined; + public schema: string | undefined; + public table: string | undefined; + public column: string | undefined; + public dataType: string | undefined; + public constraint: string | undefined; + public file: string | undefined; + public line: string | undefined; + public routine: string | undefined; +} diff --git a/packages/pg/bench.js b/packages/pg/bench.js index 3c12fa683..b5707db73 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -54,13 +54,14 @@ const run = async () => { queries = await bench(client, seq, seconds * 1000); console.log("sequence queries:", queries); console.log("qps", queries / seconds); - console.log("on my laptop best so far seen 1192 qps") + console.log("on my laptop best so far seen 1209 qps") console.log('') queries = await bench(client, insert, seconds * 1000); console.log("insert queries:", queries); console.log("qps", queries / seconds); - console.log("on my laptop best so far seen 5600 qps") + console.log("on my laptop best so far seen 5799 qps") + console.log() await client.end(); await client.end(); }; diff --git a/packages/pg/lib/connection-fast.js b/packages/pg/lib/connection-fast.js index 631ea3b0e..ecbb362c9 100644 --- a/packages/pg/lib/connection-fast.js +++ b/packages/pg/lib/connection-fast.js @@ -13,13 +13,13 @@ var util = require('util') var Writer = require('buffer-writer') // eslint-disable-next-line -var PacketStream = require('pg-packet-stream') +const { parse } = require('pg-packet-stream') var TEXT_MODE = 0 // TODO(bmc) support binary mode here // var BINARY_MODE = 1 -console.log('using faster connection') +console.log('***using faster connection***') var Connection = function (config) { EventEmitter.call(this) config = config || {} @@ -84,12 +84,13 @@ Connection.prototype.connect = function (port, host) { this.stream.once('data', function (buffer) { var responseCode = buffer.toString('utf8') switch (responseCode) { - case 'N': // Server does not support SSL connections - return self.emit('error', new Error('The server does not support SSL connections')) case 'S': // Server supports SSL connections, continue with a secure connection break - default: - // Any other response byte, including 'E' (ErrorResponse) indicating a server error + case 'N': // Server does not support SSL connections + self.stream.end() + return self.emit('error', new Error('The server does not support SSL connections')) + default: // Any other response byte, including 'E' (ErrorResponse) indicating a server error + self.stream.end() return self.emit('error', new Error('There was an error establishing an SSL connection')) } var tls = require('tls') @@ -108,19 +109,15 @@ Connection.prototype.connect = function (port, host) { } Connection.prototype.attachListeners = function (stream) { - var self = this - const mode = this._mode === TEXT_MODE ? 'text' : 'binary' - const packetStream = new PacketStream.PgPacketStream({ mode }) - this.stream.pipe(packetStream) - packetStream.on('data', (msg) => { + stream.on('end', () => { + this.emit('end') + }) + parse(stream, (msg) => { var eventName = msg.name === 'error' ? 'errorMessage' : msg.name - if (self._emitMessage) { - self.emit('message', msg) + if (this._emitMessage) { + this.emit('message', msg) } - self.emit(eventName, msg) - }) - stream.on('end', function () { - self.emit('end') + this.emit(eventName, msg) }) } @@ -331,6 +328,10 @@ Connection.prototype.end = function () { // 0x58 = 'X' this.writer.clear() this._ending = true + if (!this.stream.writable) { + this.stream.end() + return + } return this.stream.write(END_BUFFER, () => { this.stream.end() })