Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
fix(grpc): subscribe for topology update after sync is done
Browse files Browse the repository at this point in the history
fix #2128
  • Loading branch information
korhaliv committed May 1, 2019
1 parent 62bb654 commit 2feb869
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 17 deletions.
35 changes: 27 additions & 8 deletions services/grpc/grpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,22 @@ class Grpc extends EventEmitter {
async onBeforeActivateLightning() {
const { Lightning } = this.services
await Lightning.connect()
// creates listener that re-emits specified event
const forwardEvent = event => data => this.emit(event, data)
Lightning.on('subscribeInvoices.data', forwardEvent('subscribeInvoices.data'))
Lightning.on('subscribeTransactions.data', forwardEvent('subscribeTransactions.data'))
Lightning.on('subscribeChannelGraph.data', forwardEvent('subscribeChannelGraph.data'))

// setups listener that re-emits specified event
const forwardEvent = event => {
Lightning.on(event, data => this.emit(event, data))
}

// forwards `data` and `error` events of the specified `base` subscription
const forwardAll = baseEvent => {
forwardEvent(`${baseEvent}.data`)
forwardEvent(`${baseEvent}.error`)
}

forwardAll('subscribeInvoices')
forwardAll('subscribeChannelGraph')
forwardAll('subscribeTransactions')
forwardAll('subscribeGetInfo')
}

async onAfterActivateLightning() {
Expand All @@ -157,9 +168,17 @@ class Grpc extends EventEmitter {

onLeaveActive() {
const { Lightning } = this.services
Lightning.removeAllListeners('subscribeInvoices.data')
Lightning.removeAllListeners('subscribeTransactions.data')
Lightning.removeAllListeners('subscribeChannelGraph.data')

// removes `data` and `error` events listeners of the specified `base` subscription
const removeAll = baseEvent => {
Lightning.removeAllListeners(`${baseEvent}.data`)
Lightning.removeAllListeners(`${baseEvent}.error`)
}

removeAll('subscribeInvoices')
removeAll('subscribeChannelGraph')
removeAll('subscribeTransactions')
removeAll('subscribeGetInfo')
}

// ------------------------------------
Expand Down
9 changes: 8 additions & 1 deletion services/grpc/grpcService.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ class GrpcService extends EventEmitter {
grpcLog.info(`gRPC subscription "${this.serviceName}.${key}" ended.`)
delete this.subscriptions[key]
})

call.on('status', callStatus => {
if (callStatus.code === status.CANCELLED) {
delete this.subscriptions[key]
grpcLog.info(`gRPC subscription "${this.serviceName}.${key}" ended.`)
}
})
})
}

Expand Down Expand Up @@ -199,7 +206,7 @@ class GrpcService extends EventEmitter {

// Initiate cancellation request.
call.cancel()
// Resolve once we recieve confirmation of the call's cancellation.
// Resolve once we receive confirmation of the call's cancellation.
return result
}

Expand Down
22 changes: 14 additions & 8 deletions services/grpc/lightning/lightning.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,24 @@ class Lightning extends GrpcService {
}

onAfterConnect() {
this.subscriptions['channelGraph'] = this.subscribeChannelGraph()
this.subscriptions['invoices'] = this.subscribeInvoices()
this.subscriptions['transactions'] = this.subscribeTransactions()
this.subscriptions['getinfo'] = this.subscribeGetInfo()
super.subscribe('invoices', 'transactions', 'getinfo')
grpcLog.info(`Connected to ${this.serviceName} gRPC service`)
}
super.subscribe()

async subscribeChannelGraph() {
grpcLog.info('resubscribeChannelGraph')
await this.unsubscribe('channelGraph')
return await this.subscribe('channelGraph')
// subscribe to graph updates only after sync is complete
// this is needed because LND chanRouter waits for chain sync
// to complete before accepting subscriptions
this.on('subscribeGetInfo.data', data => {
const { synced_to_chain } = data
if (synced_to_chain && !this.subscriptions['channelGraph']) {
grpcLog.info('subscribeChannelGraph')
this.subscriptions['channelGraph'] = this.subscribeChannelGraph()
super.subscribe('channelGraph')
}
})

grpcLog.info(`Connected to ${this.serviceName} gRPC service`)
}
}

Expand Down

0 comments on commit 2feb869

Please sign in to comment.