From e500479382c12b661605b2e7f246e2474701e821 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Thu, 19 Dec 2019 14:41:05 -0600 Subject: [PATCH] Add streaming parser --- .gitignore | 1 + packages/pg-packet-stream/src/index.ts | 15 ++-------- packages/pg/lib/connection-fast.js | 41 +++++++++++++------------- 3 files changed, 25 insertions(+), 32 deletions(-) diff --git a/.gitignore b/.gitignore index 56eba3953..df95fda07 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ node_modules/ package-lock.json *.swp dist +.DS_Store diff --git a/packages/pg-packet-stream/src/index.ts b/packages/pg-packet-stream/src/index.ts index 4c03d9874..adc158d6d 100644 --- a/packages/pg-packet-stream/src/index.ts +++ b/packages/pg-packet-stream/src/index.ts @@ -1,11 +1,9 @@ import { Transform, TransformCallback, TransformOptions } from 'stream'; -import assert from 'assert' -export const hello = () => 'Hello world!' - -// this is a single byte +// every message is prefixed with a single bye const CODE_LENGTH = 1; -// this is a Uint32 +// every message has an int32 length which includes itself but does +// NOT include the code in the length const LEN_LENGTH = 4; export type Packet = { @@ -415,9 +413,7 @@ class DatabaseError extends Error { class Field { constructor(public readonly name: string, public readonly tableID: number, public readonly columnID: number, public readonly dataTypeID: number, public readonly dataTypeSize: number, public readonly dataTypeModifier: number, public readonly format: FieldFormat) { - } - } class RowDescriptionMessage { @@ -438,32 +434,27 @@ class ParameterStatusMessage { class BackendKeyDataMessage { public readonly name: string = 'backendKeyData'; constructor(public readonly length: number, public readonly processID: number, public readonly secretKey: number) { - } } class NotificationResponseMessage { public readonly name: string = 'notification'; constructor(public readonly length: number, public readonly processId: number, public readonly channel: string, public readonly payload: string) { - } } class ReadyForQueryMessage { public readonly name: string = 'readyForQuery'; constructor(public readonly length: number, public readonly status: string) { - } } class CommandCompleteMessage { public readonly name: string = 'commandComplete' constructor(public readonly length: number, public readonly text: string) { - } } - class DataRowMessage { public readonly fieldCount: number; public readonly name: string = 'dataRow' diff --git a/packages/pg/lib/connection-fast.js b/packages/pg/lib/connection-fast.js index aea9eacd4..58e63dac4 100644 --- a/packages/pg/lib/connection-fast.js +++ b/packages/pg/lib/connection-fast.js @@ -138,26 +138,27 @@ Connection.prototype.attachListeners = function (stream) { }) } -// Connection.prototype.attachListeners = function (stream) { -// var self = this -// const mode = this._mode === TEXT_MODE ? 'text' : 'binary'; -// const packetStream = new PacketStream.PgPacketStream({ mode }) -// packetStream.on('message', (msg) => { -// self.emit(msg.name, msg) -// }) -// stream.pipe(packetStream).on('data', (packet) => { -// // console.log('buff', packet) -// var msg = self.parseMessage(packet) -// var eventName = msg.name === 'error' ? 'errorMessage' : msg.name -// if (self._emitMessage) { -// self.emit('message', msg) -// } -// self.emit(eventName, msg) -// }) -// stream.on('end', function () { -// self.emit('end') -// }) -// } +Connection.prototype.attachListeners = function (stream) { + var self = this + const mode = this._mode === TEXT_MODE ? 'text' : 'binary'; + const packetStream = new PacketStream.PgPacketStream({ mode }) + packetStream.on('message', (msg) => { + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name + self.emit(eventName, msg) + }) + stream.pipe(packetStream).on('data', (packet) => { + // console.log('buff', packet) + var msg = self.parseMessage(packet) + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name + if (self._emitMessage) { + self.emit('message', msg) + } + self.emit(eventName, msg) + }) + stream.on('end', function () { + self.emit('end') + }) +} Connection.prototype.requestSsl = function () { var bodyBuffer = this.writer