Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
fix(grpc): clean up disconnected streams
Browse files Browse the repository at this point in the history
  • Loading branch information
mrfelton committed Apr 24, 2019
1 parent 5cd1a5b commit 8c67eff
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
24 changes: 17 additions & 7 deletions services/grpc/grpcService.js
Expand Up @@ -145,32 +145,42 @@ class GrpcService extends EventEmitter {
*/
subscribe() {
// this.subscriptions['something'] = this.service.subscribeToSomething()
// super.subscribe()

// Close and clear subscriptions when they emit an end event.
const activeSubKeys = Object.keys(this.subscriptions)
activeSubKeys.forEach(key => {
const call = this.subscriptions[key]
call.on('end', () => {
grpcLog.info(`gRPC subscription "${this.serviceName}.${key}" ended.`)
delete this.subscriptions[key]
})
})
}

/**
* Unsubscribe from all streams.
*/
async unsubscribe() {
const activeSubKeys = Object.keys(this.subscriptions)
if (activeSubKeys.length) {
grpcLog.info(`Unsubscribing from all ${this.serviceName} gRPC streams: %o`, activeSubKeys)
const cancellations = activeSubKeys.map(key => this._cancelSubscription(key))
return Promise.all(cancellations)
}
grpcLog.info(`Unsubscribing from all ${this.serviceName} gRPC streams: %o`, activeSubKeys)
const cancellations = activeSubKeys.map(key => this._cancelSubscription(key))
await Promise.all(cancellations)
}

/**
* Unsubscribe from a single stream.
*/
async _cancelSubscription(key) {
grpcLog.info(`Unsubscribing from ${key} gRPC stream`)
grpcLog.info(`Unsubscribing from ${this.serviceName}.${key} gRPC stream`)
const call = this.subscriptions[key]

// Cancellation status callback handler.
const result = new Promise(resolve => {
call.on('status', callStatus => {
if (callStatus.code === status.CANCELLED) {
grpcLog.info(`Unsubscribed from ${key} gRPC stream`)
delete this.subscriptions[key]
grpcLog.info(`Unsubscribed from ${this.serviceName}.${key} gRPC stream`)
resolve()
}
})
Expand Down
1 change: 1 addition & 0 deletions services/grpc/lightning/lightning.js
Expand Up @@ -61,6 +61,7 @@ class Lightning extends LndGrpcService {
this.subscriptions['channelGraph'] = this.subscribeChannelGraph()
this.subscriptions['invoices'] = this.subscribeInvoices()
this.subscriptions['transactions'] = this.subscribeTransactions()
super.subscribe()
}
}

Expand Down

0 comments on commit 8c67eff

Please sign in to comment.