Skip to content

Commit

Permalink
Add streaming parser
Browse files Browse the repository at this point in the history
  • Loading branch information
brianc committed Dec 19, 2019
1 parent fa44905 commit e500479
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -6,3 +6,4 @@ node_modules/
package-lock.json
*.swp
dist
.DS_Store
15 changes: 3 additions & 12 deletions packages/pg-packet-stream/src/index.ts
@@ -1,11 +1,9 @@
import { Transform, TransformCallback, TransformOptions } from 'stream';
import assert from 'assert'

export const hello = () => 'Hello world!'

// this is a single byte
// every message is prefixed with a single bye
const CODE_LENGTH = 1;
// this is a Uint32
// every message has an int32 length which includes itself but does
// NOT include the code in the length
const LEN_LENGTH = 4;

export type Packet = {
Expand Down Expand Up @@ -415,9 +413,7 @@ class DatabaseError extends Error {

class Field {
constructor(public readonly name: string, public readonly tableID: number, public readonly columnID: number, public readonly dataTypeID: number, public readonly dataTypeSize: number, public readonly dataTypeModifier: number, public readonly format: FieldFormat) {

}

}

class RowDescriptionMessage {
Expand All @@ -438,32 +434,27 @@ class ParameterStatusMessage {
class BackendKeyDataMessage {
public readonly name: string = 'backendKeyData';
constructor(public readonly length: number, public readonly processID: number, public readonly secretKey: number) {

}
}

class NotificationResponseMessage {
public readonly name: string = 'notification';
constructor(public readonly length: number, public readonly processId: number, public readonly channel: string, public readonly payload: string) {

}
}

class ReadyForQueryMessage {
public readonly name: string = 'readyForQuery';
constructor(public readonly length: number, public readonly status: string) {

}
}

class CommandCompleteMessage {
public readonly name: string = 'commandComplete'
constructor(public readonly length: number, public readonly text: string) {

}
}


class DataRowMessage {
public readonly fieldCount: number;
public readonly name: string = 'dataRow'
Expand Down
41 changes: 21 additions & 20 deletions packages/pg/lib/connection-fast.js
Expand Up @@ -138,26 +138,27 @@ Connection.prototype.attachListeners = function (stream) {
})
}

// Connection.prototype.attachListeners = function (stream) {
// var self = this
// const mode = this._mode === TEXT_MODE ? 'text' : 'binary';
// const packetStream = new PacketStream.PgPacketStream({ mode })
// packetStream.on('message', (msg) => {
// self.emit(msg.name, msg)
// })
// stream.pipe(packetStream).on('data', (packet) => {
// // console.log('buff', packet)
// var msg = self.parseMessage(packet)
// var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
// if (self._emitMessage) {
// self.emit('message', msg)
// }
// self.emit(eventName, msg)
// })
// stream.on('end', function () {
// self.emit('end')
// })
// }
Connection.prototype.attachListeners = function (stream) {
var self = this
const mode = this._mode === TEXT_MODE ? 'text' : 'binary';
const packetStream = new PacketStream.PgPacketStream({ mode })
packetStream.on('message', (msg) => {
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
self.emit(eventName, msg)
})
stream.pipe(packetStream).on('data', (packet) => {
// console.log('buff', packet)
var msg = self.parseMessage(packet)
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (self._emitMessage) {
self.emit('message', msg)
}
self.emit(eventName, msg)
})
stream.on('end', function () {
self.emit('end')
})
}

Connection.prototype.requestSsl = function () {
var bodyBuffer = this.writer
Expand Down

0 comments on commit e500479

Please sign in to comment.