From c813f765f5c268d35e9e38138655b69ff6c084a3 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 8 Apr 2020 17:23:16 -0500 Subject: [PATCH 1/6] Move message writing to typescript lib --- .../src/{BufferReader.ts => buffer-reader.ts} | 4 +- .../pg-packet-stream/src/buffer-writer.ts | 120 +++++++ packages/pg-packet-stream/src/index.ts | 327 +----------------- .../src/outbound-serializer.test.ts | 252 ++++++++++++++ packages/pg-packet-stream/src/parser.ts | 322 +++++++++++++++++ packages/pg-packet-stream/src/serializer.ts | 226 ++++++++++++ packages/pg/lib/connection-fast.js | 165 ++------- 7 files changed, 949 insertions(+), 467 deletions(-) rename packages/pg-packet-stream/src/{BufferReader.ts => buffer-reader.ts} (96%) create mode 100644 packages/pg-packet-stream/src/buffer-writer.ts create mode 100644 packages/pg-packet-stream/src/outbound-serializer.test.ts create mode 100644 packages/pg-packet-stream/src/parser.ts create mode 100644 packages/pg-packet-stream/src/serializer.ts diff --git a/packages/pg-packet-stream/src/BufferReader.ts b/packages/pg-packet-stream/src/buffer-reader.ts similarity index 96% rename from packages/pg-packet-stream/src/BufferReader.ts rename to packages/pg-packet-stream/src/buffer-reader.ts index 9729d919f..68dc89cae 100644 --- a/packages/pg-packet-stream/src/BufferReader.ts +++ b/packages/pg-packet-stream/src/buffer-reader.ts @@ -2,8 +2,10 @@ const emptyBuffer = Buffer.allocUnsafe(0); export class BufferReader { private buffer: Buffer = emptyBuffer; - // TODO(bmc): support non-utf8 encoding + + // TODO(bmc): support non-utf8 encoding? private encoding: string = 'utf-8'; + constructor(private offset: number = 0) { } public setBuffer(offset: number, buffer: Buffer): void { diff --git a/packages/pg-packet-stream/src/buffer-writer.ts b/packages/pg-packet-stream/src/buffer-writer.ts new file mode 100644 index 000000000..f8b6dd2a5 --- /dev/null +++ b/packages/pg-packet-stream/src/buffer-writer.ts @@ -0,0 +1,120 @@ +//binary data writer tuned for creating +//postgres message packets as effeciently as possible by reusing the +//same buffer to avoid memcpy and limit memory allocations + +export class Writer { + private buffer: Buffer; + private offset: number = 5; + private headerPosition: number = 0; + private readonly encoding = 'utf-8'; + constructor(size: number = 1024) { + this.buffer = Buffer.alloc(size + 5) + } + + private _ensure(size: number): void { + var remaining = this.buffer.length - this.offset; + if (remaining < size) { + var oldBuffer = this.buffer; + // exponential growth factor of around ~ 1.5 + // https://stackoverflow.com/questions/2269063/buffer-growth-strategy + var newSize = oldBuffer.length + (oldBuffer.length >> 1) + size; + this.buffer = Buffer.alloc(newSize); + oldBuffer.copy(this.buffer); + } + } + + public addInt32(num: number): Writer { + this._ensure(4); + this.buffer[this.offset++] = (num >>> 24 & 0xFF); + this.buffer[this.offset++] = (num >>> 16 & 0xFF); + this.buffer[this.offset++] = (num >>> 8 & 0xFF); + this.buffer[this.offset++] = (num >>> 0 & 0xFF); + return this; + } + + public addInt16(num: number): Writer { + this._ensure(2); + this.buffer[this.offset++] = (num >>> 8 & 0xFF); + this.buffer[this.offset++] = (num >>> 0 & 0xFF); + return this; + } + + public addCString(string: string): Writer { + //just write a 0 for empty or null strings + if (!string) { + this._ensure(1); + } else { + var len = Buffer.byteLength(string); + this._ensure(len + 1); //+1 for null terminator + this.buffer.write(string, this.offset, this.encoding) + this.offset += len; + } + + this.buffer[this.offset++] = 0; // null terminator + return this; + } + + // note: this assumes character is 1 byte - used for writing protocol charcodes + public addChar(c: string): Writer { + this._ensure(1); + this.buffer.write(c, this.offset); + this.offset++; + return this; + } + + public addString(string: string = ""): Writer { + var len = Buffer.byteLength(string); + this._ensure(len); + this.buffer.write(string, this.offset); + this.offset += len; + return this; + } + + public getByteLength(): number { + return this.offset - 5; + } + + public add(otherBuffer: Buffer): Writer { + this._ensure(otherBuffer.length); + otherBuffer.copy(this.buffer, this.offset); + this.offset += otherBuffer.length; + return this; + } + + public clear(): void { + this.offset = 5; + this.headerPosition = 0; + } + + //appends a header block to all the written data since the last + //subsequent header or to the beginning if there is only one data block + public addHeader(code: number, last: boolean = false) { + var origOffset = this.offset; + this.offset = this.headerPosition; + this.buffer[this.offset++] = code; + //length is everything in this packet minus the code + this.addInt32(origOffset - (this.headerPosition + 1)); + //set next header position + this.headerPosition = origOffset; + //make space for next header + this.offset = origOffset; + if (!last) { + this._ensure(5); + this.offset += 5; + } + } + + public join(code?: number): Buffer { + if (code) { + this.addHeader(code, true); + } + return this.buffer.slice(code ? 0 : 5, this.offset); + } + + public flush(code?: number): Buffer { + var result = this.join(code); + this.clear(); + return result; + } +} + diff --git a/packages/pg-packet-stream/src/index.ts b/packages/pg-packet-stream/src/index.ts index 3ebe5e847..f4ade0173 100644 --- a/packages/pg-packet-stream/src/index.ts +++ b/packages/pg-packet-stream/src/index.ts @@ -1,328 +1,11 @@ -import { TransformOptions } from 'stream'; -import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password, NoticeMessage } from './messages'; -import { BufferReader } from './BufferReader'; -import assert from 'assert' - -// every message is prefixed with a single bye -const CODE_LENGTH = 1; -// every message has an int32 length which includes itself but does -// NOT include the code in the length -const LEN_LENGTH = 4; - -const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH; - -export type Packet = { - code: number; - packet: Buffer; -} - -const emptyBuffer = Buffer.allocUnsafe(0); - -type StreamOptions = TransformOptions & { - mode: Mode -} - -const enum MessageCodes { - DataRow = 0x44, // D - ParseComplete = 0x31, // 1 - BindComplete = 0x32, // 2 - CloseComplete = 0x33, // 3 - CommandComplete = 0x43, // C - ReadyForQuery = 0x5a, // Z - NoData = 0x6e, // n - NotificationResponse = 0x41, // A - AuthenticationResponse = 0x52, // R - ParameterStatus = 0x53, // S - BackendKeyData = 0x4b, // K - ErrorMessage = 0x45, // E - NoticeMessage = 0x4e, // N - RowDescriptionMessage = 0x54, // T - PortalSuspended = 0x73, // s - ReplicationStart = 0x57, // W - EmptyQuery = 0x49, // I - CopyIn = 0x47, // G - CopyOut = 0x48, // H - CopyDone = 0x63, // c - CopyData = 0x64, // d -} - -type MessageCallback = (msg: BackendMessage) => void; +import { BackendMessage } from './messages'; +import { serialize } from './serializer'; +import { Parser, MessageCallback } from './parser' export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise { - const parser = new PgPacketParser() + const parser = new Parser() stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback)) return new Promise((resolve) => stream.on('end', () => resolve())) } -class PgPacketParser { - private remainingBuffer: Buffer = emptyBuffer; - private reader = new BufferReader(); - private mode: Mode; - - constructor(opts?: StreamOptions) { - if (opts?.mode === 'binary') { - throw new Error('Binary mode not supported yet') - } - this.mode = opts?.mode || 'text'; - } - - public parse(buffer: Buffer, callback: MessageCallback) { - let combinedBuffer = buffer; - if (this.remainingBuffer.byteLength) { - combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength); - this.remainingBuffer.copy(combinedBuffer) - buffer.copy(combinedBuffer, this.remainingBuffer.byteLength) - } - let offset = 0; - while ((offset + HEADER_LENGTH) <= combinedBuffer.byteLength) { - // code is 1 byte long - it identifies the message type - const code = combinedBuffer[offset]; - - // length is 1 Uint32BE - it is the length of the message EXCLUDING the code - const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH); - - const fullMessageLength = CODE_LENGTH + length; - - if (fullMessageLength + offset <= combinedBuffer.byteLength) { - const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer); - callback(message) - offset += fullMessageLength; - } else { - break; - } - } - - if (offset === combinedBuffer.byteLength) { - this.remainingBuffer = emptyBuffer; - } else { - this.remainingBuffer = combinedBuffer.slice(offset) - } - - } - - private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage { - switch (code) { - case MessageCodes.BindComplete: - return bindComplete; - case MessageCodes.ParseComplete: - return parseComplete; - case MessageCodes.CloseComplete: - return closeComplete; - case MessageCodes.NoData: - return noData; - case MessageCodes.PortalSuspended: - return portalSuspended; - case MessageCodes.CopyDone: - return copyDone; - case MessageCodes.ReplicationStart: - return replicationStart; - case MessageCodes.EmptyQuery: - return emptyQuery; - case MessageCodes.DataRow: - return this.parseDataRowMessage(offset, length, bytes); - case MessageCodes.CommandComplete: - return this.parseCommandCompleteMessage(offset, length, bytes); - case MessageCodes.ReadyForQuery: - return this.parseReadyForQueryMessage(offset, length, bytes); - case MessageCodes.NotificationResponse: - return this.parseNotificationMessage(offset, length, bytes); - case MessageCodes.AuthenticationResponse: - return this.parseAuthenticationResponse(offset, length, bytes); - case MessageCodes.ParameterStatus: - return this.parseParameterStatusMessage(offset, length, bytes); - case MessageCodes.BackendKeyData: - return this.parseBackendKeyData(offset, length, bytes); - case MessageCodes.ErrorMessage: - return this.parseErrorMessage(offset, length, bytes, MessageName.error); - case MessageCodes.NoticeMessage: - return this.parseErrorMessage(offset, length, bytes, MessageName.notice); - case MessageCodes.RowDescriptionMessage: - return this.parseRowDescriptionMessage(offset, length, bytes); - case MessageCodes.CopyIn: - return this.parseCopyInMessage(offset, length, bytes); - case MessageCodes.CopyOut: - return this.parseCopyOutMessage(offset, length, bytes); - case MessageCodes.CopyData: - return this.parseCopyData(offset, length, bytes); - default: - assert.fail(`unknown message code: ${code.toString(16)}`) - } - } - - private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) { - this.reader.setBuffer(offset, bytes); - const status = this.reader.string(1); - return new ReadyForQueryMessage(length, status) - } - - private parseCommandCompleteMessage(offset: number, length: number, bytes: Buffer) { - this.reader.setBuffer(offset, bytes); - const text = this.reader.cstring(); - return new CommandCompleteMessage(length, text); - } - - private parseCopyData(offset: number, length: number, bytes: Buffer) { - const chunk = bytes.slice(offset, offset + (length - 4)); - return new CopyDataMessage(length, chunk); - } - - private parseCopyInMessage(offset: number, length: number, bytes: Buffer) { - return this.parseCopyMessage(offset, length, bytes, MessageName.copyInResponse) - } - - private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) { - return this.parseCopyMessage(offset, length, bytes, MessageName.copyOutResponse) - } - - private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) { - this.reader.setBuffer(offset, bytes); - const isBinary = this.reader.byte() !== 0; - const columnCount = this.reader.int16() - const message = new CopyResponse(length, messageName, isBinary, columnCount); - for (let i = 0; i < columnCount; i++) { - message.columnTypes[i] = this.reader.int16(); - } - return message; - } - - private parseNotificationMessage(offset: number, length: number, bytes: Buffer) { - this.reader.setBuffer(offset, bytes); - const processId = this.reader.int32(); - const channel = this.reader.cstring(); - const payload = this.reader.cstring(); - return new NotificationResponseMessage(length, processId, channel, payload); - } - - private parseRowDescriptionMessage(offset: number, length: number, bytes: Buffer) { - this.reader.setBuffer(offset, bytes); - const fieldCount = this.reader.int16() - const message = new RowDescriptionMessage(length, fieldCount); - for (let i = 0; i < fieldCount; i++) { - message.fields[i] = this.parseField() - } - return message; - } - - private parseField(): Field { - const name = this.reader.cstring() - const tableID = this.reader.int32() - const columnID = this.reader.int16() - const dataTypeID = this.reader.int32() - const dataTypeSize = this.reader.int16() - const dataTypeModifier = this.reader.int32() - const mode = this.reader.int16() === 0 ? 'text' : 'binary'; - return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode) - } - - private parseDataRowMessage(offset: number, length: number, bytes: Buffer) { - this.reader.setBuffer(offset, bytes); - const fieldCount = this.reader.int16(); - const fields: any[] = new Array(fieldCount); - for (let i = 0; i < fieldCount; i++) { - const len = this.reader.int32(); - if (len === -1) { - fields[i] = null - } else if (this.mode === 'text') { - fields[i] = this.reader.string(len) - } - } - return new DataRowMessage(length, fields); - } - - private parseParameterStatusMessage(offset: number, length: number, bytes: Buffer) { - this.reader.setBuffer(offset, bytes); - const name = this.reader.cstring(); - const value = this.reader.cstring() - return new ParameterStatusMessage(length, name, value) - } - - private parseBackendKeyData(offset: number, length: number, bytes: Buffer) { - this.reader.setBuffer(offset, bytes); - const processID = this.reader.int32() - const secretKey = this.reader.int32() - return new BackendKeyDataMessage(length, processID, secretKey) - } - - - public parseAuthenticationResponse(offset: number, length: number, bytes: Buffer) { - this.reader.setBuffer(offset, bytes); - const code = this.reader.int32() - // TODO(bmc): maybe better types here - const message: BackendMessage & any = { - name: MessageName.authenticationOk, - length, - }; - - switch (code) { - case 0: // AuthenticationOk - break; - case 3: // AuthenticationCleartextPassword - if (message.length === 8) { - message.name = MessageName.authenticationCleartextPassword - } - break - case 5: // AuthenticationMD5Password - if (message.length === 12) { - message.name = MessageName.authenticationMD5Password - const salt = this.reader.bytes(4); - return new AuthenticationMD5Password(length, salt); - } - break - case 10: // AuthenticationSASL - message.name = MessageName.authenticationSASL - message.mechanisms = [] - let mechanism: string; - do { - mechanism = this.reader.cstring() - - if (mechanism) { - message.mechanisms.push(mechanism) - } - } while (mechanism) - break; - case 11: // AuthenticationSASLContinue - message.name = MessageName.authenticationSASLContinue - message.data = this.reader.string(length - 4) - break; - case 12: // AuthenticationSASLFinal - message.name = MessageName.authenticationSASLFinal - message.data = this.reader.string(length - 4) - break; - default: - throw new Error('Unknown authenticationOk message type ' + code) - } - return message; - } - - private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: MessageName) { - this.reader.setBuffer(offset, bytes); - var fields: Record = {} - var fieldType = this.reader.string(1) - while (fieldType !== '\0') { - fields[fieldType] = this.reader.cstring() - fieldType = this.reader.string(1) - } - - const messageValue = fields.M - - const message = name === MessageName.notice ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name) - - message.severity = fields.S - message.code = fields.C - message.detail = fields.D - message.hint = fields.H - message.position = fields.P - message.internalPosition = fields.p - message.internalQuery = fields.q - message.where = fields.W - message.schema = fields.s - message.table = fields.t - message.column = fields.c - message.dataType = fields.d - message.constraint = fields.n - message.file = fields.F - message.line = fields.L - message.routine = fields.R - return message; - } -} +export { serialize }; diff --git a/packages/pg-packet-stream/src/outbound-serializer.test.ts b/packages/pg-packet-stream/src/outbound-serializer.test.ts new file mode 100644 index 000000000..246c5fe9d --- /dev/null +++ b/packages/pg-packet-stream/src/outbound-serializer.test.ts @@ -0,0 +1,252 @@ +import assert from 'assert' +import { serialize } from './serializer' +import BufferList from './testing/buffer-list' + +describe('serializer', () => { + it('builds startup message', function () { + const actual = serialize.startup({ + user: 'brian', + database: 'bang' + }) + assert.deepEqual(actual, new BufferList() + .addInt16(3) + .addInt16(0) + .addCString('user') + .addCString('brian') + .addCString('database') + .addCString('bang') + .addCString('client_encoding') + .addCString("'utf-8'") + .addCString('').join(true)) + }) + + it('builds password message', function () { + const actual = serialize.password('!') + assert.deepEqual(actual, new BufferList().addCString('!').join(true, 'p')) + }) + + it('builds request ssl message', function () { + const actual = serialize.requestSsl() + const expected = new BufferList().addInt32(80877103).join(true) + assert.deepEqual(actual, expected); + }) + + it('builds SASLInitialResponseMessage message', function () { + const actual = serialize.sendSASLInitialResponseMessage('mech', 'data') + assert.deepEqual(actual, new BufferList().addCString('mech').addInt32(4).addString('data').join(true, 'p')) + }) + + + it('builds SCRAMClientFinalMessage message', function () { + const actual = serialize.sendSCRAMClientFinalMessage('data') + assert.deepEqual(actual, new BufferList().addString('data').join(true, 'p')) + }) + + + it('builds query message', function () { + var txt = 'select * from boom' + const actual = serialize.query(txt) + assert.deepEqual(actual, new BufferList().addCString(txt).join(true, 'Q')) + }) + + + describe('parse message', () => { + + it('builds parse message', function () { + const actual = serialize.parse({ text: '!' }) + var expected = new BufferList() + .addCString('') + .addCString('!') + .addInt16(0).join(true, 'P') + assert.deepEqual(actual, expected) + }) + + it('builds parse message with named query', function () { + const actual = serialize.parse({ + name: 'boom', + text: 'select * from boom', + types: [] + }) + var expected = new BufferList() + .addCString('boom') + .addCString('select * from boom') + .addInt16(0).join(true, 'P') + assert.deepEqual(actual, expected) + }) + + it('with multiple parameters', function () { + const actual = serialize.parse({ + name: 'force', + text: 'select * from bang where name = $1', + types: [1, 2, 3, 4] + }) + var expected = new BufferList() + .addCString('force') + .addCString('select * from bang where name = $1') + .addInt16(4) + .addInt32(1) + .addInt32(2) + .addInt32(3) + .addInt32(4).join(true, 'P') + assert.deepEqual(actual, expected) + }) + + }) + + + describe('bind messages', function () { + it('with no values', function () { + const actual = serialize.bind() + + var expectedBuffer = new BufferList() + .addCString('') + .addCString('') + .addInt16(0) + .addInt16(0) + .addInt16(0) + .join(true, 'B') + assert.deepEqual(actual, expectedBuffer) + }) + + it('with named statement, portal, and values', function () { + const actual = serialize.bind({ + portal: 'bang', + statement: 'woo', + values: ['1', 'hi', null, 'zing'] + }) + var expectedBuffer = new BufferList() + .addCString('bang') // portal name + .addCString('woo') // statement name + .addInt16(0) + .addInt16(4) + .addInt32(1) + .add(Buffer.from('1')) + .addInt32(2) + .add(Buffer.from('hi')) + .addInt32(-1) + .addInt32(4) + .add(Buffer.from('zing')) + .addInt16(0) + .join(true, 'B') + assert.deepEqual(actual, expectedBuffer) + }) + }) + + it('with named statement, portal, and buffer value', function () { + const actual = serialize.bind({ + portal: 'bang', + statement: 'woo', + values: ['1', 'hi', null, Buffer.from('zing', 'utf8')] + }) + var expectedBuffer = new BufferList() + .addCString('bang') // portal name + .addCString('woo') // statement name + .addInt16(4)// value count + .addInt16(0)// string + .addInt16(0)// string + .addInt16(0)// string + .addInt16(1)// binary + .addInt16(4) + .addInt32(1) + .add(Buffer.from('1')) + .addInt32(2) + .add(Buffer.from('hi')) + .addInt32(-1) + .addInt32(4) + .add(Buffer.from('zing', 'utf-8')) + .addInt16(0) + .join(true, 'B') + assert.deepEqual(actual, expectedBuffer) + }) + + describe('builds execute message', function () { + it('for unamed portal with no row limit', function () { + const actual = serialize.execute() + var expectedBuffer = new BufferList() + .addCString('') + .addInt32(0) + .join(true, 'E') + assert.deepEqual(actual, expectedBuffer) + }) + + it('for named portal with row limit', function () { + const actual = serialize.execute({ + portal: 'my favorite portal', + rows: 100 + }) + var expectedBuffer = new BufferList() + .addCString('my favorite portal') + .addInt32(100) + .join(true, 'E') + assert.deepEqual(actual, expectedBuffer) + }) + }) + + it('builds flush command', function () { + const actual = serialize.flush() + var expected = new BufferList().join(true, 'H') + assert.deepEqual(actual, expected) + }) + + it('builds sync command', function () { + const actual = serialize.sync() + var expected = new BufferList().join(true, 'S') + assert.deepEqual(actual, expected) + }) + + it('builds end command', function () { + const actual = serialize.end() + var expected = Buffer.from([0x58, 0, 0, 0, 4]) + assert.deepEqual(actual, expected) + }) + + describe('builds describe command', function () { + it('describe statement', function () { + const actual = serialize.describe({ type: 'S', name: 'bang' }) + var expected = new BufferList().addChar('S').addCString('bang').join(true, 'D') + assert.deepEqual(actual, expected) + }) + + it('describe unnamed portal', function () { + const actual = serialize.describe({ type: 'P' }) + var expected = new BufferList().addChar('P').addCString('').join(true, 'D') + assert.deepEqual(actual, expected) + }) + }) + + describe('builds close command', function () { + it('describe statement', function () { + const actual = serialize.close({ type: 'S', name: 'bang' }) + var expected = new BufferList().addChar('S').addCString('bang').join(true, 'C') + assert.deepEqual(actual, expected) + }) + + it('describe unnamed portal', function () { + const actual = serialize.close({ type: 'P' }) + var expected = new BufferList().addChar('P').addCString('').join(true, 'C') + assert.deepEqual(actual, expected) + }) + }) + + describe('copy messages', function () { + it('builds copyFromChunk', () => { + const actual = serialize.copyData(Buffer.from([1, 2, 3])) + const expected = new BufferList().add(Buffer.from([1, 2,3 ])).join(true, 'd') + assert.deepEqual(actual, expected) + }) + + it('builds copy fail', () => { + const actual = serialize.copyFail('err!') + const expected = new BufferList().addCString('err!').join(true, 'f') + assert.deepEqual(actual, expected) + }) + + it('builds copy done', () => { + const actual = serialize.copyDone() + const expected = new BufferList().join(true, 'c') + assert.deepEqual(actual, expected) + + }) + }) + +}) diff --git a/packages/pg-packet-stream/src/parser.ts b/packages/pg-packet-stream/src/parser.ts new file mode 100644 index 000000000..69a9c28b2 --- /dev/null +++ b/packages/pg-packet-stream/src/parser.ts @@ -0,0 +1,322 @@ +import { TransformOptions } from 'stream'; +import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password, NoticeMessage } from './messages'; +import { BufferReader } from './buffer-reader'; +import assert from 'assert' + +// every message is prefixed with a single bye +const CODE_LENGTH = 1; +// every message has an int32 length which includes itself but does +// NOT include the code in the length +const LEN_LENGTH = 4; + +const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH; + +export type Packet = { + code: number; + packet: Buffer; +} + +const emptyBuffer = Buffer.allocUnsafe(0); + +type StreamOptions = TransformOptions & { + mode: Mode +} + +const enum MessageCodes { + DataRow = 0x44, // D + ParseComplete = 0x31, // 1 + BindComplete = 0x32, // 2 + CloseComplete = 0x33, // 3 + CommandComplete = 0x43, // C + ReadyForQuery = 0x5a, // Z + NoData = 0x6e, // n + NotificationResponse = 0x41, // A + AuthenticationResponse = 0x52, // R + ParameterStatus = 0x53, // S + BackendKeyData = 0x4b, // K + ErrorMessage = 0x45, // E + NoticeMessage = 0x4e, // N + RowDescriptionMessage = 0x54, // T + PortalSuspended = 0x73, // s + ReplicationStart = 0x57, // W + EmptyQuery = 0x49, // I + CopyIn = 0x47, // G + CopyOut = 0x48, // H + CopyDone = 0x63, // c + CopyData = 0x64, // d +} + +export type MessageCallback = (msg: BackendMessage) => void; + +export class Parser { + private remainingBuffer: Buffer = emptyBuffer; + private reader = new BufferReader(); + private mode: Mode; + + constructor(opts?: StreamOptions) { + if (opts?.mode === 'binary') { + throw new Error('Binary mode not supported yet') + } + this.mode = opts?.mode || 'text'; + } + + public parse(buffer: Buffer, callback: MessageCallback) { + let combinedBuffer = buffer; + if (this.remainingBuffer.byteLength) { + combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength); + this.remainingBuffer.copy(combinedBuffer) + buffer.copy(combinedBuffer, this.remainingBuffer.byteLength) + } + let offset = 0; + while ((offset + HEADER_LENGTH) <= combinedBuffer.byteLength) { + // code is 1 byte long - it identifies the message type + const code = combinedBuffer[offset]; + + // length is 1 Uint32BE - it is the length of the message EXCLUDING the code + const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH); + + const fullMessageLength = CODE_LENGTH + length; + + if (fullMessageLength + offset <= combinedBuffer.byteLength) { + const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer); + callback(message) + offset += fullMessageLength; + } else { + break; + } + } + + if (offset === combinedBuffer.byteLength) { + this.remainingBuffer = emptyBuffer; + } else { + this.remainingBuffer = combinedBuffer.slice(offset) + } + + } + + private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage { + switch (code) { + case MessageCodes.BindComplete: + return bindComplete; + case MessageCodes.ParseComplete: + return parseComplete; + case MessageCodes.CloseComplete: + return closeComplete; + case MessageCodes.NoData: + return noData; + case MessageCodes.PortalSuspended: + return portalSuspended; + case MessageCodes.CopyDone: + return copyDone; + case MessageCodes.ReplicationStart: + return replicationStart; + case MessageCodes.EmptyQuery: + return emptyQuery; + case MessageCodes.DataRow: + return this.parseDataRowMessage(offset, length, bytes); + case MessageCodes.CommandComplete: + return this.parseCommandCompleteMessage(offset, length, bytes); + case MessageCodes.ReadyForQuery: + return this.parseReadyForQueryMessage(offset, length, bytes); + case MessageCodes.NotificationResponse: + return this.parseNotificationMessage(offset, length, bytes); + case MessageCodes.AuthenticationResponse: + return this.parseAuthenticationResponse(offset, length, bytes); + case MessageCodes.ParameterStatus: + return this.parseParameterStatusMessage(offset, length, bytes); + case MessageCodes.BackendKeyData: + return this.parseBackendKeyData(offset, length, bytes); + case MessageCodes.ErrorMessage: + return this.parseErrorMessage(offset, length, bytes, MessageName.error); + case MessageCodes.NoticeMessage: + return this.parseErrorMessage(offset, length, bytes, MessageName.notice); + case MessageCodes.RowDescriptionMessage: + return this.parseRowDescriptionMessage(offset, length, bytes); + case MessageCodes.CopyIn: + return this.parseCopyInMessage(offset, length, bytes); + case MessageCodes.CopyOut: + return this.parseCopyOutMessage(offset, length, bytes); + case MessageCodes.CopyData: + return this.parseCopyData(offset, length, bytes); + default: + assert.fail(`unknown message code: ${code.toString(16)}`) + } + } + + private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes); + const status = this.reader.string(1); + return new ReadyForQueryMessage(length, status) + } + + private parseCommandCompleteMessage(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes); + const text = this.reader.cstring(); + return new CommandCompleteMessage(length, text); + } + + private parseCopyData(offset: number, length: number, bytes: Buffer) { + const chunk = bytes.slice(offset, offset + (length - 4)); + return new CopyDataMessage(length, chunk); + } + + private parseCopyInMessage(offset: number, length: number, bytes: Buffer) { + return this.parseCopyMessage(offset, length, bytes, MessageName.copyInResponse) + } + + private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) { + return this.parseCopyMessage(offset, length, bytes, MessageName.copyOutResponse) + } + + private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) { + this.reader.setBuffer(offset, bytes); + const isBinary = this.reader.byte() !== 0; + const columnCount = this.reader.int16() + const message = new CopyResponse(length, messageName, isBinary, columnCount); + for (let i = 0; i < columnCount; i++) { + message.columnTypes[i] = this.reader.int16(); + } + return message; + } + + private parseNotificationMessage(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes); + const processId = this.reader.int32(); + const channel = this.reader.cstring(); + const payload = this.reader.cstring(); + return new NotificationResponseMessage(length, processId, channel, payload); + } + + private parseRowDescriptionMessage(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes); + const fieldCount = this.reader.int16() + const message = new RowDescriptionMessage(length, fieldCount); + for (let i = 0; i < fieldCount; i++) { + message.fields[i] = this.parseField() + } + return message; + } + + private parseField(): Field { + const name = this.reader.cstring() + const tableID = this.reader.int32() + const columnID = this.reader.int16() + const dataTypeID = this.reader.int32() + const dataTypeSize = this.reader.int16() + const dataTypeModifier = this.reader.int32() + const mode = this.reader.int16() === 0 ? 'text' : 'binary'; + return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode) + } + + private parseDataRowMessage(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes); + const fieldCount = this.reader.int16(); + const fields: any[] = new Array(fieldCount); + for (let i = 0; i < fieldCount; i++) { + const len = this.reader.int32(); + if (len === -1) { + fields[i] = null + } else if (this.mode === 'text') { + fields[i] = this.reader.string(len) + } + } + return new DataRowMessage(length, fields); + } + + private parseParameterStatusMessage(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes); + const name = this.reader.cstring(); + const value = this.reader.cstring() + return new ParameterStatusMessage(length, name, value) + } + + private parseBackendKeyData(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes); + const processID = this.reader.int32() + const secretKey = this.reader.int32() + return new BackendKeyDataMessage(length, processID, secretKey) + } + + + public parseAuthenticationResponse(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes); + const code = this.reader.int32() + // TODO(bmc): maybe better types here + const message: BackendMessage & any = { + name: MessageName.authenticationOk, + length, + }; + + switch (code) { + case 0: // AuthenticationOk + break; + case 3: // AuthenticationCleartextPassword + if (message.length === 8) { + message.name = MessageName.authenticationCleartextPassword + } + break + case 5: // AuthenticationMD5Password + if (message.length === 12) { + message.name = MessageName.authenticationMD5Password + const salt = this.reader.bytes(4); + return new AuthenticationMD5Password(length, salt); + } + break + case 10: // AuthenticationSASL + message.name = MessageName.authenticationSASL + message.mechanisms = [] + let mechanism: string; + do { + mechanism = this.reader.cstring() + + if (mechanism) { + message.mechanisms.push(mechanism) + } + } while (mechanism) + break; + case 11: // AuthenticationSASLContinue + message.name = MessageName.authenticationSASLContinue + message.data = this.reader.string(length - 4) + break; + case 12: // AuthenticationSASLFinal + message.name = MessageName.authenticationSASLFinal + message.data = this.reader.string(length - 4) + break; + default: + throw new Error('Unknown authenticationOk message type ' + code) + } + return message; + } + + private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: MessageName) { + this.reader.setBuffer(offset, bytes); + var fields: Record = {} + var fieldType = this.reader.string(1) + while (fieldType !== '\0') { + fields[fieldType] = this.reader.cstring() + fieldType = this.reader.string(1) + } + + const messageValue = fields.M + + const message = name === MessageName.notice ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name) + + message.severity = fields.S + message.code = fields.C + message.detail = fields.D + message.hint = fields.H + message.position = fields.P + message.internalPosition = fields.p + message.internalQuery = fields.q + message.where = fields.W + message.schema = fields.s + message.table = fields.t + message.column = fields.c + message.dataType = fields.d + message.constraint = fields.n + message.file = fields.F + message.line = fields.L + message.routine = fields.R + return message; + } +} diff --git a/packages/pg-packet-stream/src/serializer.ts b/packages/pg-packet-stream/src/serializer.ts new file mode 100644 index 000000000..feeb0cec8 --- /dev/null +++ b/packages/pg-packet-stream/src/serializer.ts @@ -0,0 +1,226 @@ +import { Writer } from './buffer-writer' + +const enum code { + startup = 0x70, + query = 0x51, + parse = 0x50, + bind = 0x42, + execute = 0x45, + flush = 0x48, + sync = 0x53, + end = 0x58, + close = 0x43, + describe = 0x44, + copyFromChunk = 0x64, + copyDone = 0x63, + copyFail = 0x66 +} + +const writer = new Writer() + +const startup = (opts: Record): Buffer => { + // protocol version + writer.addInt16(3).addInt16(0) + for (const key of Object.keys(opts)) { + writer.addCString(key).addCString(opts[key]) + } + + writer.addCString('client_encoding').addCString("'utf-8'") + + var bodyBuffer = writer.addCString('').flush() + // this message is sent without a code + + var length = bodyBuffer.length + 4 + + return new Writer() + .addInt32(length) + .add(bodyBuffer) + .flush() +} + +const requestSsl = (): Buffer => { + const response = Buffer.allocUnsafe(8) + response.writeInt32BE(8, 0); + response.writeInt32BE(80877103, 4) + return response +} + +const password = (password: string): Buffer => { + return writer.addCString(password).flush(code.startup) +} + +const sendSASLInitialResponseMessage = function (mechanism: string, initialResponse: string): Buffer { + // 0x70 = 'p' + writer + .addCString(mechanism) + .addInt32(Buffer.byteLength(initialResponse)) + .addString(initialResponse) + + return writer.flush(code.startup) +} + +const sendSCRAMClientFinalMessage = function (additionalData: string): Buffer { + return writer.addString(additionalData).flush(code.startup) +} + +const query = (text: string): Buffer => { + return writer.addCString(text).flush(code.query) +} + +type ParseOpts = { + name?: string; + types?: number[]; + text: string; +} + +const parse = (query: ParseOpts): Buffer => { + // expect something like this: + // { name: 'queryName', + // text: 'select * from blah', + // types: ['int8', 'bool'] } + + // normalize missing query names to allow for null + query.name = query.name || '' + if (query.name.length > 63) { + /* eslint-disable no-console */ + console.error('Warning! Postgres only supports 63 characters for query names.') + console.error('You supplied %s (%s)', query.name, query.name.length) + console.error('This can cause conflicts and silent errors executing queries') + /* eslint-enable no-console */ + } + // normalize null type array + query.types = query.types || [] + var len = query.types.length + var buffer = writer + .addCString(query.name) // name of query + .addCString(query.text) // actual query text + .addInt16(len) + for (var i = 0; i < len; i++) { + buffer.addInt32(query.types[i]) + } + + return writer.flush(code.parse) +} + +type BindOpts = { + portal?: string; + binary?: boolean; + statement?: string; + values?: any[]; +} + +const bind = (config: BindOpts = {}): Buffer => { + // normalize config + const portal = config.portal || '' + const statement = config.statement || '' + const binary = config.binary || false + var values = config.values || [] + var len = values.length + + var useBinary = false + // TODO(bmc): all the loops in here aren't nice, we can do better + for (var j = 0; j < len; j++) { + useBinary = useBinary || values[j] instanceof Buffer + } + + var buffer = writer + .addCString(portal) + .addCString(statement) + if (!useBinary) { buffer.addInt16(0) } else { + buffer.addInt16(len) + for (j = 0; j < len; j++) { + buffer.addInt16(values[j] instanceof Buffer ? 1 : 0) + } + } + buffer.addInt16(len) + for (var i = 0; i < len; i++) { + var val = values[i] + if (val === null || typeof val === 'undefined') { + buffer.addInt32(-1) + } else if (val instanceof Buffer) { + buffer.addInt32(val.length) + buffer.add(val) + } else { + buffer.addInt32(Buffer.byteLength(val)) + buffer.addString(val) + } + } + + if (binary) { + buffer.addInt16(1) // format codes to use binary + buffer.addInt16(1) + } else { + buffer.addInt16(0) // format codes to use text + } + return writer.flush(code.bind) +} + +type ExecOpts = { + portal?: string; + rows?: number; +} + +const execute = (config: ExecOpts = {}): Buffer => { + const portal = config.portal || '' + const rows = config.rows || 0 + return writer + .addCString(portal) + .addInt32(rows) + .flush(code.execute) +} + +type PortalOpts = { + type: 'S' | 'P', + name?: string; +} + +const cstringMessage = (code: code, string: string): Buffer => { + return writer.addCString(string).flush(code) +} + +const describe = (msg: PortalOpts): Buffer => { + const text = `${msg.type}${msg.name || ''}` + return cstringMessage(code.describe, text) +} + +const close = (msg: PortalOpts): Buffer => { + const text = `${msg.type}${msg.name || ''}` + return cstringMessage(code.close, text) +} + +const copyData = (chunk: Buffer): Buffer => { + return writer.add(chunk).flush(code.copyFromChunk) +} + +const copyFail = (message: string): Buffer => { + return cstringMessage(code.copyFail, message); +} + +const codeOnlyBuffer = (code: code): Buffer => Buffer.from([code, 0x00, 0x00, 0x00, 0x04]) + +const flushBuffer = codeOnlyBuffer(code.flush) +const syncBuffer = codeOnlyBuffer(code.sync) +const endBuffer = codeOnlyBuffer(code.end) +const copyDoneBuffer = codeOnlyBuffer(code.copyDone) + +const serialize = { + startup, + password, + requestSsl, + sendSASLInitialResponseMessage, + sendSCRAMClientFinalMessage, + query, + parse, + bind, + execute, + describe, + close, + flush: () => flushBuffer, + sync: () => syncBuffer, + end: () => endBuffer, + copyData, + copyDone: () => copyDoneBuffer, + copyFail +} + +export { serialize } diff --git a/packages/pg/lib/connection-fast.js b/packages/pg/lib/connection-fast.js index ecbb362c9..2018e05bd 100644 --- a/packages/pg/lib/connection-fast.js +++ b/packages/pg/lib/connection-fast.js @@ -13,7 +13,7 @@ var util = require('util') var Writer = require('buffer-writer') // eslint-disable-next-line -const { parse } = require('pg-packet-stream') +const { parse, serialize } = require('pg-packet-stream') var TEXT_MODE = 0 @@ -122,40 +122,11 @@ Connection.prototype.attachListeners = function (stream) { } Connection.prototype.requestSsl = function () { - var bodyBuffer = this.writer - .addInt16(0x04d2) - .addInt16(0x162f) - .flush() - - var length = bodyBuffer.length + 4 - - var buffer = new Writer() - .addInt32(length) - .add(bodyBuffer) - .join() - this.stream.write(buffer) + this.stream.write(serialize.requestSsl()) } Connection.prototype.startup = function (config) { - var writer = this.writer.addInt16(3).addInt16(0) - - Object.keys(config).forEach(function (key) { - var val = config[key] - writer.addCString(key).addCString(val) - }) - - writer.addCString('client_encoding').addCString("'utf-8'") - - var bodyBuffer = writer.addCString('').flush() - // this message is sent without a code - - var length = bodyBuffer.length + 4 - - var buffer = new Writer() - .addInt32(length) - .add(bodyBuffer) - .join() - this.stream.write(buffer) + this.stream.write(serialize.startup(config)) } Connection.prototype.cancel = function (processID, secretKey) { @@ -176,142 +147,53 @@ Connection.prototype.cancel = function (processID, secretKey) { } Connection.prototype.password = function (password) { - // 0x70 = 'p' - this._send(0x70, this.writer.addCString(password)) + this._send(serialize.password(password)) } Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) { - // 0x70 = 'p' - this.writer - .addCString(mechanism) - .addInt32(Buffer.byteLength(initialResponse)) - .addString(initialResponse) - - this._send(0x70) + this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) } Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) { - // 0x70 = 'p' - this.writer.addString(additionalData) - - this._send(0x70) + this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) } -Connection.prototype._send = function (code, more) { +Connection.prototype._send = function (buffer) { if (!this.stream.writable) { return false } - return this.stream.write(this.writer.flush(code)) + return this.stream.write(buffer) } Connection.prototype.query = function (text) { - // 0x51 = Q - this.stream.write(this.writer.addCString(text).flush(0x51)) + this._send(serialize.query(text)) } // send parse message Connection.prototype.parse = function (query) { - // expect something like this: - // { name: 'queryName', - // text: 'select * from blah', - // types: ['int8', 'bool'] } - - // normalize missing query names to allow for null - query.name = query.name || '' - if (query.name.length > 63) { - /* eslint-disable no-console */ - console.error('Warning! Postgres only supports 63 characters for query names.') - console.error('You supplied %s (%s)', query.name, query.name.length) - console.error('This can cause conflicts and silent errors executing queries') - /* eslint-enable no-console */ - } - // normalize null type array - query.types = query.types || [] - var len = query.types.length - var buffer = this.writer - .addCString(query.name) // name of query - .addCString(query.text) // actual query text - .addInt16(len) - for (var i = 0; i < len; i++) { - buffer.addInt32(query.types[i]) - } - - var code = 0x50 - this._send(code) - this.flush() + this._send(serialize.parse(query)) } // send bind message // "more" === true to buffer the message until flush() is called Connection.prototype.bind = function (config) { - // normalize config - config = config || {} - config.portal = config.portal || '' - config.statement = config.statement || '' - config.binary = config.binary || false - var values = config.values || [] - var len = values.length - var useBinary = false - for (var j = 0; j < len; j++) { - useBinary |= values[j] instanceof Buffer - } - var buffer = this.writer.addCString(config.portal).addCString(config.statement) - if (!useBinary) { - buffer.addInt16(0) - } else { - buffer.addInt16(len) - for (j = 0; j < len; j++) { - buffer.addInt16(values[j] instanceof Buffer) - } - } - buffer.addInt16(len) - for (var i = 0; i < len; i++) { - var val = values[i] - if (val === null || typeof val === 'undefined') { - buffer.addInt32(-1) - } else if (val instanceof Buffer) { - buffer.addInt32(val.length) - buffer.add(val) - } else { - buffer.addInt32(Buffer.byteLength(val)) - buffer.addString(val) - } - } - - if (config.binary) { - buffer.addInt16(1) // format codes to use binary - buffer.addInt16(1) - } else { - buffer.addInt16(0) // format codes to use text - } - // 0x42 = 'B' - this._send(0x42) - this.flush() + this._send(serialize.bind(config)) } // send execute message // "more" === true to buffer the message until flush() is called Connection.prototype.execute = function (config) { - config = config || {} - config.portal = config.portal || '' - config.rows = config.rows || '' - this.writer.addCString(config.portal).addInt32(config.rows) - - // 0x45 = 'E' - this._send(0x45) - this.flush() + this._send(serialize.execute(config)) } -var emptyBuffer = Buffer.alloc(0) - -const flushBuffer = Buffer.from([0x48, 0x00, 0x00, 0x00, 0x04]) +const flushBuffer = serialize.flush() Connection.prototype.flush = function () { if (this.stream.writable) { this.stream.write(flushBuffer) } } -const syncBuffer = Buffer.from([0x53, 0x00, 0x00, 0x00, 0x04]) +const syncBuffer = serialize.sync() Connection.prototype.sync = function () { this._ending = true // clear out any pending data in the writer @@ -322,7 +204,7 @@ Connection.prototype.sync = function () { } } -const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04]) +const endBuffer = serialize.end() Connection.prototype.end = function () { // 0x58 = 'X' @@ -332,34 +214,29 @@ Connection.prototype.end = function () { this.stream.end() return } - return this.stream.write(END_BUFFER, () => { + return this.stream.write(endBuffer, () => { this.stream.end() }) } Connection.prototype.close = function (msg) { - this.writer.addCString(msg.type + (msg.name || '')) - this._send(0x43) + this._send(serialize.close(msg)) } Connection.prototype.describe = function (msg) { - this.writer.addCString(msg.type + (msg.name || '')) - this._send(0x44) - this.flush() + this._send(serialize.describe(msg)) } Connection.prototype.sendCopyFromChunk = function (chunk) { - this.stream.write(this.writer.add(chunk).flush(0x64)) + this._send(serialize.copyData(chunk)) } Connection.prototype.endCopyFrom = function () { - this.stream.write(this.writer.add(emptyBuffer).flush(0x63)) + this._send(serialize.copyDone()) } Connection.prototype.sendCopyFail = function (msg) { - // this.stream.write(this.writer.add(emptyBuffer).flush(0x66)); - this.writer.addCString(msg) - this._send(0x66) + this._send(serialize.copyFail(msg)) } module.exports = Connection From 917fce103818e7577dd29fe27ba2c05db9aa606f Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 8 Apr 2020 18:29:55 -0500 Subject: [PATCH 2/6] Write more tests, cleanup code to some extent --- .../pg-packet-stream/src/buffer-writer.ts | 34 +++++-------------- .../src/outbound-serializer.test.ts | 6 +++- packages/pg-packet-stream/src/serializer.ts | 13 ++++++- packages/pg/lib/connection-fast.js | 33 ++---------------- 4 files changed, 28 insertions(+), 58 deletions(-) diff --git a/packages/pg-packet-stream/src/buffer-writer.ts b/packages/pg-packet-stream/src/buffer-writer.ts index f8b6dd2a5..0bbcd5f55 100644 --- a/packages/pg-packet-stream/src/buffer-writer.ts +++ b/packages/pg-packet-stream/src/buffer-writer.ts @@ -39,6 +39,7 @@ export class Writer { return this; } + public addCString(string: string): Writer { //just write a 0 for empty or null strings if (!string) { @@ -81,39 +82,20 @@ export class Writer { return this; } - public clear(): void { - this.offset = 5; - this.headerPosition = 0; - } - - //appends a header block to all the written data since the last - //subsequent header or to the beginning if there is only one data block - public addHeader(code: number, last: boolean = false) { - var origOffset = this.offset; - this.offset = this.headerPosition; - this.buffer[this.offset++] = code; - //length is everything in this packet minus the code - this.addInt32(origOffset - (this.headerPosition + 1)); - //set next header position - this.headerPosition = origOffset; - //make space for next header - this.offset = origOffset; - if (!last) { - this._ensure(5); - this.offset += 5; - } - } - - public join(code?: number): Buffer { + private join(code?: number): Buffer { if (code) { - this.addHeader(code, true); + this.buffer[this.headerPosition] = code; + //length is everything in this packet minus the code + const length = this.offset - (this.headerPosition + 1) + this.buffer.writeInt32BE(length, this.headerPosition + 1) } return this.buffer.slice(code ? 0 : 5, this.offset); } public flush(code?: number): Buffer { var result = this.join(code); - this.clear(); + this.offset = 5; + this.headerPosition = 0; return result; } } diff --git a/packages/pg-packet-stream/src/outbound-serializer.test.ts b/packages/pg-packet-stream/src/outbound-serializer.test.ts index 246c5fe9d..110b932ce 100644 --- a/packages/pg-packet-stream/src/outbound-serializer.test.ts +++ b/packages/pg-packet-stream/src/outbound-serializer.test.ts @@ -245,8 +245,12 @@ describe('serializer', () => { const actual = serialize.copyDone() const expected = new BufferList().join(true, 'c') assert.deepEqual(actual, expected) - }) }) + it('builds cancel message', () => { + const actual = serialize.cancel(3, 4) + const expected = new BufferList().addInt16(1234).addInt16(5678).addInt32(3).addInt32(4).join(true) + assert.deepEqual(actual, expected) + }) }) diff --git a/packages/pg-packet-stream/src/serializer.ts b/packages/pg-packet-stream/src/serializer.ts index feeb0cec8..8dc8e0c33 100644 --- a/packages/pg-packet-stream/src/serializer.ts +++ b/packages/pg-packet-stream/src/serializer.ts @@ -169,6 +169,16 @@ const execute = (config: ExecOpts = {}): Buffer => { .flush(code.execute) } +const cancel = (processID: number, secretKey: number): Buffer => { + const buffer = Buffer.allocUnsafe(16) + buffer.writeInt32BE(16, 0) + buffer.writeInt16BE(1234, 4) + buffer.writeInt16BE(5678, 6) + buffer.writeInt32BE(processID, 8) + buffer.writeInt32BE(secretKey, 12) + return buffer; +} + type PortalOpts = { type: 'S' | 'P', name?: string; @@ -220,7 +230,8 @@ const serialize = { end: () => endBuffer, copyData, copyDone: () => copyDoneBuffer, - copyFail + copyFail, + cancel } export { serialize } diff --git a/packages/pg/lib/connection-fast.js b/packages/pg/lib/connection-fast.js index 2018e05bd..3111269b3 100644 --- a/packages/pg/lib/connection-fast.js +++ b/packages/pg/lib/connection-fast.js @@ -11,12 +11,9 @@ var net = require('net') var EventEmitter = require('events').EventEmitter var util = require('util') -var Writer = require('buffer-writer') // eslint-disable-next-line const { parse, serialize } = require('pg-packet-stream') -var TEXT_MODE = 0 - // TODO(bmc) support binary mode here // var BINARY_MODE = 1 console.log('***using faster connection***') @@ -28,15 +25,9 @@ var Connection = function (config) { this._keepAlive = config.keepAlive this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis this.lastBuffer = false - this.lastOffset = 0 - this.buffer = null - this.offset = null - this.encoding = config.encoding || 'utf8' this.parsedStatements = {} - this.writer = new Writer() this.ssl = config.ssl || false this._ending = false - this._mode = TEXT_MODE this._emitMessage = false var self = this this.on('newListener', function (eventName) { @@ -130,20 +121,7 @@ Connection.prototype.startup = function (config) { } Connection.prototype.cancel = function (processID, secretKey) { - var bodyBuffer = this.writer - .addInt16(1234) - .addInt16(5678) - .addInt32(processID) - .addInt32(secretKey) - .flush() - - var length = bodyBuffer.length + 4 - - var buffer = new Writer() - .addInt32(length) - .add(bodyBuffer) - .join() - this.stream.write(buffer) + this._send(serialize.cancel(processID, secretKey)) } Connection.prototype.password = function (password) { @@ -196,19 +174,14 @@ Connection.prototype.flush = function () { const syncBuffer = serialize.sync() Connection.prototype.sync = function () { this._ending = true - // clear out any pending data in the writer - this.writer.clear() - if (this.stream.writable) { - this.stream.write(syncBuffer) - this.stream.write(flushBuffer) - } + this._send(syncBuffer) + this._send(flushBuffer) } const endBuffer = serialize.end() Connection.prototype.end = function () { // 0x58 = 'X' - this.writer.clear() this._ending = true if (!this.stream.writable) { this.stream.end() From 5c256ffde211f9e3c09854493a946a3834d6d7a5 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 8 Apr 2020 18:36:20 -0500 Subject: [PATCH 3/6] Rename package to something more representing its name --- package.json | 2 +- packages/{pg-packet-stream => pg-protocol}/package.json | 3 ++- .../{pg-packet-stream => pg-protocol}/src/buffer-reader.ts | 0 .../{pg-packet-stream => pg-protocol}/src/buffer-writer.ts | 0 .../src/inbound-parser.test.ts | 2 +- packages/{pg-packet-stream => pg-protocol}/src/index.ts | 0 packages/{pg-packet-stream => pg-protocol}/src/messages.ts | 0 .../src/outbound-serializer.test.ts | 0 packages/{pg-packet-stream => pg-protocol}/src/parser.ts | 0 packages/{pg-packet-stream => pg-protocol}/src/serializer.ts | 0 .../src/testing/buffer-list.ts | 0 .../src/testing/test-buffers.ts | 0 .../{pg-packet-stream => pg-protocol}/src/types/chunky.d.ts | 0 packages/{pg-packet-stream => pg-protocol}/tsconfig.json | 0 packages/pg/lib/connection-fast.js | 2 +- packages/pg/package.json | 2 +- 16 files changed, 6 insertions(+), 5 deletions(-) rename packages/{pg-packet-stream => pg-protocol}/package.json (81%) rename packages/{pg-packet-stream => pg-protocol}/src/buffer-reader.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/buffer-writer.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/inbound-parser.test.ts (99%) rename packages/{pg-packet-stream => pg-protocol}/src/index.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/messages.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/outbound-serializer.test.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/parser.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/serializer.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/testing/buffer-list.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/testing/test-buffers.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/src/types/chunky.d.ts (100%) rename packages/{pg-packet-stream => pg-protocol}/tsconfig.json (100%) diff --git a/package.json b/package.json index 03e3827e1..160180777 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ ], "scripts": { "test": "yarn lerna exec yarn test", - "build": "yarn lerna exec --scope pg-packet-stream yarn build", + "build": "yarn lerna exec --scope pg-protocol yarn build", "pretest": "yarn build", "lint": "yarn lerna exec --parallel yarn lint" }, diff --git a/packages/pg-packet-stream/package.json b/packages/pg-protocol/package.json similarity index 81% rename from packages/pg-packet-stream/package.json rename to packages/pg-protocol/package.json index bf9c13e84..e3e5640cd 100644 --- a/packages/pg-packet-stream/package.json +++ b/packages/pg-protocol/package.json @@ -1,6 +1,7 @@ { - "name": "pg-packet-stream", + "name": "pg-protocol", "version": "1.1.0", + "description": "The postgres client/server binary protocol, implemented in TypeScript", "main": "dist/index.js", "types": "dist/index.d.ts", "license": "MIT", diff --git a/packages/pg-packet-stream/src/buffer-reader.ts b/packages/pg-protocol/src/buffer-reader.ts similarity index 100% rename from packages/pg-packet-stream/src/buffer-reader.ts rename to packages/pg-protocol/src/buffer-reader.ts diff --git a/packages/pg-packet-stream/src/buffer-writer.ts b/packages/pg-protocol/src/buffer-writer.ts similarity index 100% rename from packages/pg-packet-stream/src/buffer-writer.ts rename to packages/pg-protocol/src/buffer-writer.ts diff --git a/packages/pg-packet-stream/src/inbound-parser.test.ts b/packages/pg-protocol/src/inbound-parser.test.ts similarity index 99% rename from packages/pg-packet-stream/src/inbound-parser.test.ts rename to packages/pg-protocol/src/inbound-parser.test.ts index e8619bf83..461ab2628 100644 --- a/packages/pg-packet-stream/src/inbound-parser.test.ts +++ b/packages/pg-protocol/src/inbound-parser.test.ts @@ -1,6 +1,6 @@ import buffers from './testing/test-buffers' import BufferList from './testing/buffer-list' -import { parse } from './' +import { parse } from '.' import assert from 'assert' import { PassThrough } from 'stream' import { BackendMessage } from './messages' diff --git a/packages/pg-packet-stream/src/index.ts b/packages/pg-protocol/src/index.ts similarity index 100% rename from packages/pg-packet-stream/src/index.ts rename to packages/pg-protocol/src/index.ts diff --git a/packages/pg-packet-stream/src/messages.ts b/packages/pg-protocol/src/messages.ts similarity index 100% rename from packages/pg-packet-stream/src/messages.ts rename to packages/pg-protocol/src/messages.ts diff --git a/packages/pg-packet-stream/src/outbound-serializer.test.ts b/packages/pg-protocol/src/outbound-serializer.test.ts similarity index 100% rename from packages/pg-packet-stream/src/outbound-serializer.test.ts rename to packages/pg-protocol/src/outbound-serializer.test.ts diff --git a/packages/pg-packet-stream/src/parser.ts b/packages/pg-protocol/src/parser.ts similarity index 100% rename from packages/pg-packet-stream/src/parser.ts rename to packages/pg-protocol/src/parser.ts diff --git a/packages/pg-packet-stream/src/serializer.ts b/packages/pg-protocol/src/serializer.ts similarity index 100% rename from packages/pg-packet-stream/src/serializer.ts rename to packages/pg-protocol/src/serializer.ts diff --git a/packages/pg-packet-stream/src/testing/buffer-list.ts b/packages/pg-protocol/src/testing/buffer-list.ts similarity index 100% rename from packages/pg-packet-stream/src/testing/buffer-list.ts rename to packages/pg-protocol/src/testing/buffer-list.ts diff --git a/packages/pg-packet-stream/src/testing/test-buffers.ts b/packages/pg-protocol/src/testing/test-buffers.ts similarity index 100% rename from packages/pg-packet-stream/src/testing/test-buffers.ts rename to packages/pg-protocol/src/testing/test-buffers.ts diff --git a/packages/pg-packet-stream/src/types/chunky.d.ts b/packages/pg-protocol/src/types/chunky.d.ts similarity index 100% rename from packages/pg-packet-stream/src/types/chunky.d.ts rename to packages/pg-protocol/src/types/chunky.d.ts diff --git a/packages/pg-packet-stream/tsconfig.json b/packages/pg-protocol/tsconfig.json similarity index 100% rename from packages/pg-packet-stream/tsconfig.json rename to packages/pg-protocol/tsconfig.json diff --git a/packages/pg/lib/connection-fast.js b/packages/pg/lib/connection-fast.js index 3111269b3..71ef63ba6 100644 --- a/packages/pg/lib/connection-fast.js +++ b/packages/pg/lib/connection-fast.js @@ -12,7 +12,7 @@ var EventEmitter = require('events').EventEmitter var util = require('util') // eslint-disable-next-line -const { parse, serialize } = require('pg-packet-stream') +const { parse, serialize } = require('../../pg-protocol/dist') // TODO(bmc) support binary mode here // var BINARY_MODE = 1 diff --git a/packages/pg/package.json b/packages/pg/package.json index edd24337b..b0bd735f5 100644 --- a/packages/pg/package.json +++ b/packages/pg/package.json @@ -22,7 +22,7 @@ "buffer-writer": "2.0.0", "packet-reader": "1.0.0", "pg-connection-string": "0.1.3", - "pg-packet-stream": "^1.1.0", + "pg-protocol": "^1.1.0", "pg-pool": "^3.0.0", "pg-types": "^2.1.0", "pgpass": "1.x", From b6325b646bd4abcef2454602341009f76f808c9e Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Thu, 9 Apr 2020 11:05:26 -0500 Subject: [PATCH 4/6] Remove unused code --- packages/pg-protocol/src/buffer-writer.ts | 23 +++++------------------ packages/pg-protocol/src/serializer.ts | 4 +++- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/packages/pg-protocol/src/buffer-writer.ts b/packages/pg-protocol/src/buffer-writer.ts index 0bbcd5f55..1fc17e9dc 100644 --- a/packages/pg-protocol/src/buffer-writer.ts +++ b/packages/pg-protocol/src/buffer-writer.ts @@ -6,9 +6,8 @@ export class Writer { private buffer: Buffer; private offset: number = 5; private headerPosition: number = 0; - private readonly encoding = 'utf-8'; - constructor(size: number = 1024) { - this.buffer = Buffer.alloc(size + 5) + constructor(private size = 256) { + this.buffer = Buffer.alloc(size) } private _ensure(size: number): void { @@ -41,13 +40,12 @@ export class Writer { public addCString(string: string): Writer { - //just write a 0 for empty or null strings if (!string) { this._ensure(1); } else { var len = Buffer.byteLength(string); - this._ensure(len + 1); //+1 for null terminator - this.buffer.write(string, this.offset, this.encoding) + this._ensure(len + 1); // +1 for null terminator + this.buffer.write(string, this.offset, 'utf-8') this.offset += len; } @@ -55,14 +53,6 @@ export class Writer { return this; } - // note: this assumes character is 1 byte - used for writing protocol charcodes - public addChar(c: string): Writer { - this._ensure(1); - this.buffer.write(c, this.offset); - this.offset++; - return this; - } - public addString(string: string = ""): Writer { var len = Buffer.byteLength(string); this._ensure(len); @@ -71,10 +61,6 @@ export class Writer { return this; } - public getByteLength(): number { - return this.offset - 5; - } - public add(otherBuffer: Buffer): Writer { this._ensure(otherBuffer.length); otherBuffer.copy(this.buffer, this.offset); @@ -96,6 +82,7 @@ export class Writer { var result = this.join(code); this.offset = 5; this.headerPosition = 0; + this.buffer = Buffer.allocUnsafe(this.size) return result; } } diff --git a/packages/pg-protocol/src/serializer.ts b/packages/pg-protocol/src/serializer.ts index 8dc8e0c33..2909c6486 100644 --- a/packages/pg-protocol/src/serializer.ts +++ b/packages/pg-protocol/src/serializer.ts @@ -126,7 +126,9 @@ const bind = (config: BindOpts = {}): Buffer => { var buffer = writer .addCString(portal) .addCString(statement) - if (!useBinary) { buffer.addInt16(0) } else { + if (!useBinary) { + buffer.addInt16(0) + } else { buffer.addInt16(len) for (j = 0; j < len; j++) { buffer.addInt16(values[j] instanceof Buffer ? 1 : 0) From c4ed703824e1173ff9d9b3de763c39c4bae049f5 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Thu, 9 Apr 2020 11:44:47 -0500 Subject: [PATCH 5/6] Small tweaks based on microbenchmarks --- packages/pg-protocol/src/b.ts | 24 ++++++++ packages/pg-protocol/src/buffer-writer.ts | 4 +- packages/pg-protocol/src/serializer.ts | 67 +++++++++++++++++------ 3 files changed, 75 insertions(+), 20 deletions(-) create mode 100644 packages/pg-protocol/src/b.ts diff --git a/packages/pg-protocol/src/b.ts b/packages/pg-protocol/src/b.ts new file mode 100644 index 000000000..267d211c4 --- /dev/null +++ b/packages/pg-protocol/src/b.ts @@ -0,0 +1,24 @@ +// file for microbenchmarking + +import { Writer } from './buffer-writer' +import { serialize } from './index' + +const LOOPS = 1000 +let count = 0 +let start = Date.now() +const writer = new Writer() + +const run = () => { + if (count > LOOPS) { + console.log(Date.now() - start) + return; + } + count++ + for(let i = 0; i < LOOPS; i++) { + serialize.describe({ type: 'P'}) + serialize.describe({ type: 'S'}) + } + setImmediate(run) +} + +run() diff --git a/packages/pg-protocol/src/buffer-writer.ts b/packages/pg-protocol/src/buffer-writer.ts index 1fc17e9dc..102b3413a 100644 --- a/packages/pg-protocol/src/buffer-writer.ts +++ b/packages/pg-protocol/src/buffer-writer.ts @@ -1,6 +1,4 @@ -//binary data writer tuned for creating -//postgres message packets as effeciently as possible by reusing the -//same buffer to avoid memcpy and limit memory allocations +//binary data writer tuned for encoding binary specific to the postgres binary protocol export class Writer { private buffer: Buffer; diff --git a/packages/pg-protocol/src/serializer.ts b/packages/pg-protocol/src/serializer.ts index 2909c6486..71ac3c878 100644 --- a/packages/pg-protocol/src/serializer.ts +++ b/packages/pg-protocol/src/serializer.ts @@ -73,6 +73,8 @@ type ParseOpts = { text: string; } +const emptyArray: any[] = [] + const parse = (query: ParseOpts): Buffer => { // expect something like this: // { name: 'queryName', @@ -80,23 +82,26 @@ const parse = (query: ParseOpts): Buffer => { // types: ['int8', 'bool'] } // normalize missing query names to allow for null - query.name = query.name || '' - if (query.name.length > 63) { + const name = query.name || '' + if (name.length > 63) { /* eslint-disable no-console */ console.error('Warning! Postgres only supports 63 characters for query names.') - console.error('You supplied %s (%s)', query.name, query.name.length) + console.error('You supplied %s (%s)', name, name.length) console.error('This can cause conflicts and silent errors executing queries') /* eslint-enable no-console */ } - // normalize null type array - query.types = query.types || [] - var len = query.types.length + + const types = query.types || emptyArray + + var len = types.length + var buffer = writer - .addCString(query.name) // name of query + .addCString(name) // name of query .addCString(query.text) // actual query text .addInt16(len) + for (var i = 0; i < len; i++) { - buffer.addInt32(query.types[i]) + buffer.addInt32(types[i]) } return writer.flush(code.parse) @@ -114,7 +119,7 @@ const bind = (config: BindOpts = {}): Buffer => { const portal = config.portal || '' const statement = config.statement || '' const binary = config.binary || false - var values = config.values || [] + var values = config.values || emptyArray var len = values.length var useBinary = false @@ -162,13 +167,27 @@ type ExecOpts = { rows?: number; } -const execute = (config: ExecOpts = {}): Buffer => { +const emptyExecute = Buffer.from([code.execute, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00]) + +const execute = (config?: ExecOpts): Buffer => { + // this is the happy path for most queries + if (!config || !config.portal && !config.rows) { + return emptyExecute; + } + const portal = config.portal || '' const rows = config.rows || 0 - return writer - .addCString(portal) - .addInt32(rows) - .flush(code.execute) + + const portalLength = Buffer.byteLength(portal) + const len = 4 + portalLength + 1 + 4 + // one extra bit for code + const buff = Buffer.allocUnsafe(1 + len) + buff[0] = code.execute + buff.writeInt32BE(len, 1) + buff.write(portal, 5, 'utf-8') + buff[portalLength + 5] = 0; // null terminate portal cString + buff.writeUInt32BE(rows, buff.length - 4) + return buff; } const cancel = (processID: number, secretKey: number): Buffer => { @@ -187,12 +206,26 @@ type PortalOpts = { } const cstringMessage = (code: code, string: string): Buffer => { - return writer.addCString(string).flush(code) + const stringLen = Buffer.byteLength(string) + const len = 4 + stringLen + 1 + // one extra bit for code + const buffer = Buffer.allocUnsafe(1 + len) + buffer[0] = code + buffer.writeInt32BE(len, 1) + buffer.write(string, 5, 'utf-8') + buffer[len] = 0 // null terminate cString + return buffer } +const emptyDescribePortal = writer.addCString('P').flush(code.describe) +const emptyDescribeStatement = writer.addCString('S').flush(code.describe) + const describe = (msg: PortalOpts): Buffer => { - const text = `${msg.type}${msg.name || ''}` - return cstringMessage(code.describe, text) + return msg.name ? + cstringMessage(code.describe,`${msg.type}${msg.name || ''}`) : + msg.type === 'P' ? + emptyDescribePortal : + emptyDescribeStatement; } const close = (msg: PortalOpts): Buffer => { From 5668b2821e0549ea925e0e5ec2a00b44c23d26cd Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Thu, 9 Apr 2020 11:49:29 -0500 Subject: [PATCH 6/6] Rename w/o underscore --- packages/pg-protocol/src/buffer-writer.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/pg-protocol/src/buffer-writer.ts b/packages/pg-protocol/src/buffer-writer.ts index 102b3413a..2299070d1 100644 --- a/packages/pg-protocol/src/buffer-writer.ts +++ b/packages/pg-protocol/src/buffer-writer.ts @@ -8,7 +8,7 @@ export class Writer { this.buffer = Buffer.alloc(size) } - private _ensure(size: number): void { + private ensure(size: number): void { var remaining = this.buffer.length - this.offset; if (remaining < size) { var oldBuffer = this.buffer; @@ -21,7 +21,7 @@ export class Writer { } public addInt32(num: number): Writer { - this._ensure(4); + this.ensure(4); this.buffer[this.offset++] = (num >>> 24 & 0xFF); this.buffer[this.offset++] = (num >>> 16 & 0xFF); this.buffer[this.offset++] = (num >>> 8 & 0xFF); @@ -30,7 +30,7 @@ export class Writer { } public addInt16(num: number): Writer { - this._ensure(2); + this.ensure(2); this.buffer[this.offset++] = (num >>> 8 & 0xFF); this.buffer[this.offset++] = (num >>> 0 & 0xFF); return this; @@ -39,10 +39,10 @@ export class Writer { public addCString(string: string): Writer { if (!string) { - this._ensure(1); + this.ensure(1); } else { var len = Buffer.byteLength(string); - this._ensure(len + 1); // +1 for null terminator + this.ensure(len + 1); // +1 for null terminator this.buffer.write(string, this.offset, 'utf-8') this.offset += len; } @@ -53,14 +53,14 @@ export class Writer { public addString(string: string = ""): Writer { var len = Buffer.byteLength(string); - this._ensure(len); + this.ensure(len); this.buffer.write(string, this.offset); this.offset += len; return this; } public add(otherBuffer: Buffer): Writer { - this._ensure(otherBuffer.length); + this.ensure(otherBuffer.length); otherBuffer.copy(this.buffer, this.offset); this.offset += otherBuffer.length; return this;