Skip to content

Commit eb77592

Browse files
committed
Merge branch 'queued-query-errors' of https://github.com/charmander/node-postgres
2 parents 87dd65f + 57bd144 commit eb77592

File tree

5 files changed

+120
-21
lines changed

5 files changed

+120
-21
lines changed

lib/client.js

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ var Client = function (config) {
3636
this._connecting = false
3737
this._connected = false
3838
this._connectionError = false
39+
this._queryable = true
3940

4041
this.connection = c.connection || new Connection({
4142
stream: c.stream,
@@ -126,15 +127,39 @@ Client.prototype.connect = function (callback) {
126127
}
127128

128129
const connectedErrorHandler = (err) => {
130+
this._queryable = false
131+
132+
const enqueueError = (query) => {
133+
process.nextTick(() => {
134+
query.handleError(err, con)
135+
})
136+
}
137+
129138
if (this.activeQuery) {
130-
var activeQuery = self.activeQuery
139+
enqueueError(this.activeQuery)
131140
this.activeQuery = null
132-
return activeQuery.handleError(err, con)
133141
}
142+
143+
this.queryQueue.forEach(enqueueError)
144+
this.queryQueue = []
145+
134146
this.emit('error', err)
135147
}
136148

149+
const connectedErrorMessageHandler = (msg) => {
150+
const activeQuery = this.activeQuery
151+
152+
if (!activeQuery) {
153+
connectedErrorHandler(msg)
154+
return
155+
}
156+
157+
this.activeQuery = null
158+
activeQuery.handleError(msg, con)
159+
}
160+
137161
con.on('error', connectingErrorHandler)
162+
con.on('errorMessage', connectingErrorHandler)
138163

139164
// hook up query handling events to connection
140165
// after the connection initially becomes ready for queries
@@ -143,7 +168,9 @@ Client.prototype.connect = function (callback) {
143168
self._connected = true
144169
self._attachListeners(con)
145170
con.removeListener('error', connectingErrorHandler)
171+
con.removeListener('errorMessage', connectingErrorHandler)
146172
con.on('error', connectedErrorHandler)
173+
con.on('errorMessage', connectedErrorMessageHandler)
147174

148175
// process possible callback argument to Client#connect
149176
if (callback) {
@@ -181,10 +208,10 @@ Client.prototype.connect = function (callback) {
181208
if (callback) {
182209
callback(error)
183210
} else {
184-
this.emit('error', error)
211+
connectedErrorHandler(error)
185212
}
186213
} else if (!this._connectionError) {
187-
this.emit('error', error)
214+
connectedErrorHandler(error)
188215
}
189216
}
190217
this.emit('end')
@@ -353,7 +380,15 @@ Client.prototype._pulseQueryQueue = function () {
353380
if (this.activeQuery) {
354381
this.readyForQuery = false
355382
this.hasExecuted = true
356-
this.activeQuery.submit(this.connection)
383+
384+
const queryError = this.activeQuery.submit(this.connection)
385+
if (queryError) {
386+
process.nextTick(() => {
387+
this.activeQuery.handleError(queryError, this.connection)
388+
this.readyForQuery = true
389+
this._pulseQueryQueue()
390+
})
391+
}
357392
} else if (this.hasExecuted) {
358393
this.activeQuery = null
359394
this.emit('drain')
@@ -389,25 +424,40 @@ Client.prototype.query = function (config, values, callback) {
389424
query._result._getTypeParser = this._types.getTypeParser.bind(this._types)
390425
}
391426

427+
if (!this._queryable) {
428+
process.nextTick(() => {
429+
query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
430+
})
431+
return result
432+
}
433+
434+
if (this._ending) {
435+
process.nextTick(() => {
436+
query.handleError(new Error('Client was closed and is not queryable'), this.connection)
437+
})
438+
return result
439+
}
440+
392441
this.queryQueue.push(query)
393442
this._pulseQueryQueue()
394443
return result
395444
}
396445

397446
Client.prototype.end = function (cb) {
398447
this._ending = true
448+
399449
if (this.activeQuery) {
400450
// if we have an active query we need to force a disconnect
401451
// on the socket - otherwise a hung query could block end forever
402-
this.connection.stream.destroy(new Error('Connection terminated by user'))
403-
return cb ? cb() : Promise.resolve()
452+
this.connection.stream.destroy()
453+
} else {
454+
this.connection.end()
404455
}
456+
405457
if (cb) {
406-
this.connection.end()
407458
this.connection.once('end', cb)
408459
} else {
409-
return new global.Promise((resolve, reject) => {
410-
this.connection.end()
460+
return new Promise((resolve) => {
411461
this.connection.once('end', resolve)
412462
})
413463
}

lib/connection.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,11 @@ Connection.prototype.attachListeners = function (stream) {
111111
var packet = self._reader.read()
112112
while (packet) {
113113
var msg = self.parseMessage(packet)
114+
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
114115
if (self._emitMessage) {
115116
self.emit('message', msg)
116117
}
117-
self.emit(msg.name, msg)
118+
self.emit(eventName, msg)
118119
packet = self._reader.read()
119120
}
120121
})

lib/query.js

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,22 +147,17 @@ Query.prototype.handleError = function (err, connection) {
147147

148148
Query.prototype.submit = function (connection) {
149149
if (typeof this.text !== 'string' && typeof this.name !== 'string') {
150-
const err = new Error('A query must have either text or a name. Supplying neither is unsupported.')
151-
connection.emit('error', err)
152-
connection.emit('readyForQuery')
153-
return
150+
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
154151
}
155152
if (this.values && !Array.isArray(this.values)) {
156-
const err = new Error('Query values must be an array')
157-
connection.emit('error', err)
158-
connection.emit('readyForQuery')
159-
return
153+
return new Error('Query values must be an array')
160154
}
161155
if (this.requiresPreparation()) {
162156
this.prepare(connection)
163157
} else {
164158
connection.query(this.text)
165159
}
160+
return null
166161
}
167162

168163
Query.prototype.hasBeenParsed = function (connection) {

test/integration/client/error-handling-tests.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ suite.test('re-using connections results in promise rejection', (done) => {
5050
})
5151
})
5252

53+
suite.test('using a client after closing it results in error', (done) => {
54+
const client = new Client()
55+
client.connect(() => {
56+
client.end(assert.calls(() => {
57+
client.query('SELECT 1', assert.calls((err) => {
58+
assert.equal(err.message, 'Client was closed and is not queryable')
59+
done()
60+
}))
61+
}))
62+
})
63+
})
64+
5365
suite.test('query receives error on client shutdown', function (done) {
5466
var client = new Client()
5567
client.connect(assert.success(function () {

test/integration/connection-pool/error-tests.js

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@ const pg = helper.pg
55
// first make pool hold 2 clients
66
pg.defaults.poolSize = 2
77

8-
const pool = new pg.Pool()
9-
108
const suite = new helper.Suite()
119
suite.test('connecting to invalid port', (cb) => {
1210
const pool = new pg.Pool({ port: 13801 })
1311
pool.connect().catch(e => cb())
1412
})
1513

1614
suite.test('errors emitted on pool', (cb) => {
15+
const pool = new pg.Pool()
1716
// get first client
1817
pool.connect(assert.success(function (client, done) {
1918
client.id = 1
@@ -48,3 +47,45 @@ suite.test('errors emitted on pool', (cb) => {
4847
})
4948
}))
5049
})
50+
51+
suite.test('connection-level errors cause queued queries to fail', (cb) => {
52+
const pool = new pg.Pool()
53+
pool.connect(assert.success((client, done) => {
54+
client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => {
55+
assert.equal(err.code, '57P01')
56+
}))
57+
58+
pool.once('error', assert.calls((err, brokenClient) => {
59+
assert.equal(client, brokenClient)
60+
}))
61+
62+
client.query('SELECT 1', assert.calls((err) => {
63+
assert.equal(err.message, 'Connection terminated unexpectedly')
64+
65+
done()
66+
pool.end()
67+
cb()
68+
}))
69+
}))
70+
})
71+
72+
suite.test('connection-level errors cause future queries to fail', (cb) => {
73+
const pool = new pg.Pool()
74+
pool.connect(assert.success((client, done) => {
75+
client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => {
76+
assert.equal(err.code, '57P01')
77+
}))
78+
79+
pool.once('error', assert.calls((err, brokenClient) => {
80+
assert.equal(client, brokenClient)
81+
82+
client.query('SELECT 1', assert.calls((err) => {
83+
assert.equal(err.message, 'Client has encountered a connection error and is not queryable')
84+
85+
done()
86+
pool.end()
87+
cb()
88+
}))
89+
}))
90+
}))
91+
})

0 commit comments

Comments
 (0)