Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions lib/pool/ConnectionPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ class ConnectionPool extends BaseConnectionPool {
)
}

/**
* Checks if the connection is present in this.connections
*
* @param {number} connectionId
*
* @returns {boolean} True if the connection is found, otherwise False
*/
hasConnection (connectionId) {
return this.connections.find(conn => conn.id === connectionId)
}

removeDead (connectionId) {
const index = this.dead.indexOf(connectionId)
if (index > -1) {
this.dead.splice(index, 1)
}
}

/**
* Marks a connection as 'alive'.
* If needed removes the connection from the dead list
Expand All @@ -40,9 +58,14 @@ class ConnectionPool extends BaseConnectionPool {
*/
markAlive (connection) {
const { id } = connection
debug(`Marking as 'alive' connection '${id}'`)
const index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
if (!this.hasConnection(id)) {
debug(`markAlive: Cannot mark connection '${id}' as alive (connection not found)`)
this.removeDead(id)
return
}

debug(`markAlive: Marking as 'alive' connection '${id}'`)
this.removeDead(id)
connection.status = Connection.statuses.ALIVE
connection.deadCount = 0
connection.resurrectTimeout = 0
Expand All @@ -58,7 +81,12 @@ class ConnectionPool extends BaseConnectionPool {
*/
markDead (connection) {
const { id } = connection
debug(`Marking as 'dead' connection '${id}'`)
if (!this.hasConnection(id)) {
debug(`markDead: Cannot mark connection '${id}' as dead (connection not found)`)
return
}

debug(`markDead: Marking as 'dead' connection '${id}'`)
if (this.dead.indexOf(id) === -1) {
this.dead.push(id)
}
Expand Down
6 changes: 2 additions & 4 deletions test/behavior/resurrect.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ test('Should execute the recurrect API with the ping strategy', t => {
})

q.add((q, done) => {
cluster.kill('node0')
setTimeout(done, 100)
cluster.kill('node0', done)
})

q.add((q, done) => {
Expand Down Expand Up @@ -173,8 +172,7 @@ test('Should execute the recurrect API with the optimistic strategy', t => {
})

q.add((q, done) => {
cluster.kill('node0')
setTimeout(done, 100)
cluster.kill('node0', done)
})

q.add((q, done) => {
Expand Down
59 changes: 40 additions & 19 deletions test/behavior/sniff.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,38 +111,59 @@ test('Should handle hostnames in publish_address', t => {
})
})

test('Sniff interval', { skip: 'Flaky on CI' }, t => {
t.plan(10)
test('Sniff interval', t => {
t.plan(13)

buildCluster(({ nodes, shutdown, kill }) => {
const client = new Client({
node: nodes[Object.keys(nodes)[0]].url,
sniffInterval: 50
})

// this event will be triggered by api calls
client.on(events.SNIFF, (err, request) => {
// Should be 1 because SNIFF wasn't run yet
t.strictEqual(client.connectionPool.size, 1)

// SNIFF is triggered by API calls
// See Transport.js => makeRequest() => getConnection() => sniff()
let run = 0
let expectedSize
client.on(events.SNIFF, (err, result) => {
run += 1
if (run > 3) { return }

t.error(err)
const { hosts, reason } = request.meta.sniff
t.strictEqual(
client.connectionPool.size,
hosts.length
)
const { reason, hosts } = result.meta.sniff
t.strictEqual(reason, Transport.sniffReasons.SNIFF_INTERVAL)
})

t.strictEqual(client.connectionPool.size, 1)
setTimeout(() => client.info(t.error), 60)
// Test assumptions about connectionPool and hosts
t.strictEqual(client.connectionPool.size, expectedSize)
t.strictEqual(hosts.length, expectedSize)

setTimeout(() => {
// let's kill a node
kill('node1')
client.info(t.error)
}, 150)
if (run === 3) {
return // at this point, 'node1' and 'node2' should be killed
}

// Should kill the node and run SNIFF again
// Should get here 2x, to kill 'node1' and 'node2'
kill(`node${run}`, () => {
setTimeout(() => {
expectedSize = 4 - run // from 4 to 3, later 2
client.info()
}, 60) // wait > sniffInterval
})
})

// SNIFF should be run only when:
// 1) delay is greater than sniffInterval
// 2) delay is greater than time of last sniff + sniffInterval
setTimeout(() => client.info(), 20)
setTimeout(() => client.info(), 30)
setTimeout(() => {
t.strictEqual(client.connectionPool.size, 3)
}, 200)
expectedSize = 4
client.info()
}, 60) // meets 1) and 2)
setTimeout(() => client.info(), 70)
setTimeout(() => client.info(), 80)

t.teardown(shutdown)
})
Expand Down
42 changes: 26 additions & 16 deletions test/utils/buildCluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,41 @@ function buildCluster (options, callback) {
}

function shutdown () {
debug(`Shutting down cluster '${clusterId}'`)
Object.keys(nodes).forEach(kill)
}
const nodeIds = Object.keys(nodes)
debug(`Shutting down cluster '${clusterId}' with ${nodeIds.length} nodes`)

function kill (id) {
debug(`Shutting down cluster node '${id}' (cluster id: '${clusterId}')`)
nodes[id].server.stop()
delete nodes[id]
delete sniffResult.nodes[id]
let shut = 0
nodeIds.forEach(id => kill(id, (shutNodeId) => {
debug(`Shut down node '${shutNodeId}' (cluster '${clusterId}')`)

shut += 1
if (shut === nodeIds.length) {
debug(`Shutting down cluster '${clusterId}' DONE!`)
}
}))
}

function spawn (id, callback) {
debug(`Spawning cluster node '${id}' (cluster id: '${clusterId}')`)
q.add(bootNode, { id })
q.add((q, done) => {
callback()
done()
function kill (id, callback) {
if (!nodes[id]) {
callback(null)
return
}

debug(`Shutting down cluster node '${id}' (cluster id: '${clusterId}')`)
nodes[id].server.stop((err) => {
if (err) {
throw err // Failed to stop
}
delete nodes[id]
delete sniffResult.nodes[id]
callback(id)
})
}

const cluster = {
nodes,
shutdown,
kill,
spawn
kill
}

q.drain(done => {
Expand Down