Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balance rdy #406

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 46 additions & 69 deletions lib/readerrdy.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,24 +418,6 @@ class ReaderRdy extends NodeState {
return this.maxInFlight < this.connections.length
}

/**
* Message success handler.
*
* @param {ConnectionRdy} connectionRdy
*/
onMessageSuccess(connectionRdy) {
if (!this.isPaused()) {
if (this.isLowRdy()) {
// Balance the RDY count amoung existing connections given the
// low RDY condition.
this.balance()
} else {
// Restore RDY count for connection to the connection max.
connectionRdy.bump()
}
}
}

/**
* Add a new connection to the pool.
*
Expand All @@ -446,7 +428,6 @@ class ReaderRdy extends NodeState {

conn.on(NSQDConnection.CLOSED, () => {
this.removeConnection(connectionRdy)
this.balance()
})

conn.on(NSQDConnection.FINISHED, () => this.raise('success', connectionRdy))
Expand All @@ -472,15 +453,17 @@ class ReaderRdy extends NodeState {
this.connections.push(connectionRdy)
this.roundRobinConnections.add(connectionRdy)

this.balance()
if (this.current_state_name === 'ZERO') {
this.goto('MAX')
} else if (['TRY_ONE', 'MAX'].includes(this.current_state_name)) {
connectionRdy.bump()
} else {
// rebalance
this.raise('balance')
}
})
}



/**
* Remove a connection from the pool.
*
Expand All @@ -489,6 +472,7 @@ class ReaderRdy extends NodeState {
removeConnection(conn) {
this.connections.splice(this.connections.indexOf(conn), 1)
this.roundRobinConnections.remove(conn)
this.raise('balance')

if (this.connections.length === 0) {
this.goto('ZERO')
Expand All @@ -504,18 +488,11 @@ class ReaderRdy extends NodeState {
return this.connections.map((conn) => conn.bump())
}

/**
* Try to balance the connection pool.
*/
try() {
this.balance()
}

/**
* Raise a `BACKOFF` event for each connection in the pool.
*/
backoff() {
this.connections.forEach((conn) => conn.backoff())
this.backoffAllConn()

if (this.backoffId) {
clearTimeout(this.backoffId)
Expand All @@ -541,22 +518,6 @@ class ReaderRdy extends NodeState {
return this.connections.reduce(add, 0)
}

/**
* The max connections readily available.
*
* @return {Number}
*/
maxConnectionsRdy() {
switch (this.current_state_name) {
case 'TRY_ONE':
return 1
case 'PAUSE':
return 0
default:
return this.maxInFlight
}
}

/**
* Evenly or fairly distributes RDY count based on the maxInFlight across
* all nsqd connections.
Expand All @@ -569,38 +530,33 @@ class ReaderRdy extends NodeState {
* the RDY count is distributed to the next waiting connection. If
* the connection does nothing with it's RDY count, then it should
* timeout and give it's RDY count to another connection.
* @param {Number} maxConnectionsRdy
*/
balance() {
balance(maxConnectionsRdy) {
this.log('balance')
clearTimeout(this.balanceId)
this.balanceId = null
if (!this.connections.length) return

if (this.balanceId != null) {
clearTimeout(this.balanceId)
this.balanceId = null
}

const max = this.maxConnectionsRdy()
const perConnectionMax = Math.floor(max / this.connections.length)
const perConnectionMax = Math.floor(maxConnectionsRdy / this.connections.length)

// Low RDY and try conditions
if (perConnectionMax === 0) {
/**
* Backoff on all connections. In-flight messages from
* connections will still be processed.
*/
this.connections.forEach((conn) => conn.backoff())
this.backoffAllConn()

const nextConnCount = Math.max(maxConnectionsRdy - this.inFlight(), 0);
// Distribute available RDY count to the connections next in line.
this.roundRobinConnections.next(max - this.inFlight()).forEach((conn) => {
this.roundRobinConnections.next(nextConnCount).forEach((conn) => {
conn.setConnectionRdyMax(1)
conn.bump()
})

// Rebalance periodically. Needed when no messages are received.
this.balanceId = setTimeout(() => {
this.balance()
this.raise('balance')
}, this.lowRdyTimeout)
} else {
let rdyRemainder = this.maxInFlight % this.connections.length
let rdyRemainder = maxConnectionsRdy % this.connections.length
this.connections.forEach((c) => {
let connMax = perConnectionMax

Expand All @@ -618,6 +574,20 @@ class ReaderRdy extends NodeState {
})
}
}

backoffAllConn() {
/**
* for ZERO state
*/
if (!this.connections) {
return;
}
/**
* Backoff on all connections. In-flight messages from
* connections will still be processed.
*/
this.connections.forEach((conn) => conn.backoff())
}
}

/**
Expand All @@ -638,6 +608,7 @@ ReaderRdy.prototype.states = {
backoff() {}, // No-op
success() {}, // No-op
try() {}, // No-op
balance() {}, // No-op
pause() {
// No-op
return this.goto('PAUSE')
Expand All @@ -647,11 +618,12 @@ ReaderRdy.prototype.states = {

PAUSE: {
Enter() {
return this.connections.map((conn) => conn.backoff())
this.backoffAllConn()
},
backoff() {}, // No-op
success() {}, // No-op
try() {}, // No-op
balance() {}, // No-op
pause() {}, // No-op
unpause() {
return this.goto('TRY_ONE')
Expand All @@ -660,17 +632,19 @@ ReaderRdy.prototype.states = {

TRY_ONE: {
Enter() {
return this.try()
return this.raise('balance')
},
backoff() {
return this.goto('BACKOFF')
},
success(connectionRdy) {
success() {
this.backoffTimer.success()
this.onMessageSuccess(connectionRdy)
return this.goto('MAX')
},
try() {}, // No-op
balance() {
this.balance(1);
},
pause() {
return this.goto('PAUSE')
},
Expand All @@ -679,17 +653,19 @@ ReaderRdy.prototype.states = {

MAX: {
Enter() {
this.balance()
return this.bump()
this.raise('balance')
},
backoff() {
return this.goto('BACKOFF')
},
success(connectionRdy) {
this.backoffTimer.success()
return this.onMessageSuccess(connectionRdy)
connectionRdy.bump()
},
try() {}, // No-op
balance() {
this.balance(this.maxInFlight);
},
pause() {
return this.goto('PAUSE')
},
Expand All @@ -706,6 +682,7 @@ ReaderRdy.prototype.states = {
try() {
return this.goto('TRY_ONE')
},
balance() {}, // No-op
pause() {
return this.goto('PAUSE')
},
Expand Down
Loading