Skip to content
Permalink
Browse files

fix(grpc): subscribe for topology update after sync is done

fix #2128
  • Loading branch information...
korhaliv committed Apr 30, 2019
1 parent 62bb654 commit 2feb869d2b0a66f9b3b9e09263564f983c8263d2
Showing with 49 additions and 17 deletions.
  1. +27 −8 services/grpc/grpc.js
  2. +8 −1 services/grpc/grpcService.js
  3. +14 −8 services/grpc/lightning/lightning.js
@@ -144,11 +144,22 @@ class Grpc extends EventEmitter {
async onBeforeActivateLightning() {
const { Lightning } = this.services
await Lightning.connect()
// creates listener that re-emits specified event
const forwardEvent = event => data => this.emit(event, data)
Lightning.on('subscribeInvoices.data', forwardEvent('subscribeInvoices.data'))
Lightning.on('subscribeTransactions.data', forwardEvent('subscribeTransactions.data'))
Lightning.on('subscribeChannelGraph.data', forwardEvent('subscribeChannelGraph.data'))

// setups listener that re-emits specified event
const forwardEvent = event => {
Lightning.on(event, data => this.emit(event, data))
}

// forwards `data` and `error` events of the specified `base` subscription
const forwardAll = baseEvent => {
forwardEvent(`${baseEvent}.data`)
forwardEvent(`${baseEvent}.error`)
}

forwardAll('subscribeInvoices')
forwardAll('subscribeChannelGraph')
forwardAll('subscribeTransactions')
forwardAll('subscribeGetInfo')
}

async onAfterActivateLightning() {
@@ -157,9 +168,17 @@ class Grpc extends EventEmitter {

onLeaveActive() {
const { Lightning } = this.services
Lightning.removeAllListeners('subscribeInvoices.data')
Lightning.removeAllListeners('subscribeTransactions.data')
Lightning.removeAllListeners('subscribeChannelGraph.data')

// removes `data` and `error` events listeners of the specified `base` subscription
const removeAll = baseEvent => {
Lightning.removeAllListeners(`${baseEvent}.data`)
Lightning.removeAllListeners(`${baseEvent}.error`)
}

removeAll('subscribeInvoices')
removeAll('subscribeChannelGraph')
removeAll('subscribeTransactions')
removeAll('subscribeGetInfo')
}

// ------------------------------------
@@ -155,6 +155,13 @@ class GrpcService extends EventEmitter {
grpcLog.info(`gRPC subscription "${this.serviceName}.${key}" ended.`)
delete this.subscriptions[key]
})

call.on('status', callStatus => {
if (callStatus.code === status.CANCELLED) {
delete this.subscriptions[key]
grpcLog.info(`gRPC subscription "${this.serviceName}.${key}" ended.`)
}
})
})
}

@@ -199,7 +206,7 @@ class GrpcService extends EventEmitter {

// Initiate cancellation request.
call.cancel()
// Resolve once we recieve confirmation of the call's cancellation.
// Resolve once we receive confirmation of the call's cancellation.
return result
}

@@ -50,18 +50,24 @@ class Lightning extends GrpcService {
}

onAfterConnect() {
this.subscriptions['channelGraph'] = this.subscribeChannelGraph()
this.subscriptions['invoices'] = this.subscribeInvoices()
this.subscriptions['transactions'] = this.subscribeTransactions()
this.subscriptions['getinfo'] = this.subscribeGetInfo()
super.subscribe('invoices', 'transactions', 'getinfo')
grpcLog.info(`Connected to ${this.serviceName} gRPC service`)
}
super.subscribe()

async subscribeChannelGraph() {
grpcLog.info('resubscribeChannelGraph')
await this.unsubscribe('channelGraph')
return await this.subscribe('channelGraph')
// subscribe to graph updates only after sync is complete
// this is needed because LND chanRouter waits for chain sync
// to complete before accepting subscriptions
this.on('subscribeGetInfo.data', data => {
const { synced_to_chain } = data
if (synced_to_chain && !this.subscriptions['channelGraph']) {
grpcLog.info('subscribeChannelGraph')
this.subscriptions['channelGraph'] = this.subscribeChannelGraph()
super.subscribe('channelGraph')
}
})

grpcLog.info(`Connected to ${this.serviceName} gRPC service`)
}
}

0 comments on commit 2feb869

Please sign in to comment.
You can’t perform that action at this time.