Skip to content
Permalink
Browse files

fix(grpc): wait for stream cancellations on disconnect

  • Loading branch information...
mrfelton committed Apr 19, 2019
1 parent 6f6ab82 commit 8b1f8ec6c3c089432dbaa2c34da9fcfc1fd7e088
Showing with 30 additions and 8 deletions.
  1. +30 −8 services/grpc/grpcService.js
@@ -1,6 +1,6 @@
import { join } from 'path'
import EventEmitter from 'events'
import { credentials, loadPackageDefinition } from '@grpc/grpc-js'
import { credentials, loadPackageDefinition, status } from '@grpc/grpc-js'
import { load } from '@grpc/proto-loader'
import lndgrpc from 'lnd-grpc'
import StateMachine from 'javascript-state-machine'
@@ -161,14 +161,36 @@ class GrpcService extends EventEmitter {
/**
* Unsubscribe from all streams.
*/
unsubscribe() {
grpcLog.info(`Unsubscribing from ${this.serviceName} gRPC streams`)
Object.keys(this.subscriptions).forEach(subscription => {
if (this.subscriptions[subscription]) {
grpcLog.info(` > Unsubscribing from ${subscription} stream`)
this.subscriptions[subscription].cancel()
}
async unsubscribe() {
const activeSubKeys = Object.keys(this.subscriptions)
if (activeSubKeys.length) {
grpcLog.info(`Unsubscribing from all ${this.serviceName} gRPC streams: %o`, activeSubKeys)
const cancellations = activeSubKeys.map(key => this._cancelSubscription(key))
return Promise.all(cancellations)
}
}

/**
* Unsubscribe from a single stream.
*/
async _cancelSubscription(key) {
grpcLog.info(`Unsubscribing from ${key} gRPC stream`)
const call = this.subscriptions[key]

// Cancellation status callback handler.
const result = new Promise(resolve => {
call.on('status', callStatus => {
if (callStatus.code === status.CANCELLED) {
grpcLog.info(`Unsubscribed from ${key} gRPC stream`)
resolve()
}
})
})

// Initiate cancellation request.
call.cancel()
// Resolve once we recieve confirmation of the call's cancellation.
return result
}
}

0 comments on commit 8b1f8ec

Please sign in to comment.
You can’t perform that action at this time.