This repository has been archived by the owner on Nov 17, 2023. It is now read-only.
/
grpcService.js
224 lines (195 loc) · 7.28 KB
/
grpcService.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import { join } from 'path'
import EventEmitter from 'events'
import intersection from 'lodash.intersection'
import { credentials, loadPackageDefinition, status } from '@grpc/grpc-js'
import { load } from '@grpc/proto-loader'
import lndgrpc from 'lnd-grpc'
import StateMachine from 'javascript-state-machine'
import { grpcLog } from '@zap/utils/log'
import promisifiedCall from '@zap/utils/promisifiedCall'
import waitForFile from '@zap/utils/waitForFile'
import grpcOptions from '@zap/utils/grpcOptions'
import getDeadline from '@zap/utils/getDeadline'
import createSslCreds from '@zap/utils/createSslCreds'
import createMacaroonCreds from '@zap/utils/createMacaroonCreds'
/**
* Base class for lnd gRPC services.
* @extends EventEmitter
*/
class GrpcService extends EventEmitter {
constructor(serviceName, lndConfig) {
super()
this.serviceName = serviceName
this.fsm = new StateMachine({
init: 'ready',
transitions: [
{ name: 'connect', from: 'ready', to: 'connected' },
{ name: 'disconnect', from: 'connected', to: 'ready' },
],
methods: {
onBeforeConnect: this.onBeforeConnect.bind(this),
onAfterConnect: this.onAfterConnect.bind(this),
onBeforeDisconnect: this.onBeforeDisconnect.bind(this),
},
})
this.subscriptions = []
this.useMacaroon = true
this.service = null
this.lndConfig = lndConfig
}
// ------------------------------------
// FSM Proxies
// ------------------------------------
connect(...args) {
return this.fsm.connect(args)
}
disconnect(...args) {
return this.fsm.disconnect(args)
}
is(...args) {
return this.fsm.is(args)
}
can(...args) {
return this.fsm.can(args)
}
// ------------------------------------
// FSM Callbacks
// ------------------------------------
/**
* Connect to the gRPC interface.
*/
async onBeforeConnect() {
grpcLog.info(`Connecting to ${this.serviceName} gRPC service`)
// Establish a connection.
const { useMacaroon, waitForMacaroon } = this._getConnectionSettings()
await this.establishConnection({ useMacaroon, waitForMacaroon })
}
/**
* Disconnect from the gRPC service.
*/
async onBeforeDisconnect() {
grpcLog.info(`Disconnecting from ${this.serviceName} gRPC service`)
await this.unsubscribe()
if (this.service) {
this.service.close()
}
}
// ------------------------------------
// Helpers
// ------------------------------------
/**
* Establish a connection to the Lightning interface.
*/
async establishConnection(options = {}) {
const { version, useMacaroon, waitForMacaroon } = options
const { host, cert, macaroon, protoPath } = this.lndConfig
// Find the most recent rpc.proto file
const versionToUse = version || (await lndgrpc.getLatestProtoVersion({ path: protoPath }))
const filepath = join(protoPath, `${versionToUse}.proto`)
grpcLog.info(`Establishing gRPC connection to ${this.serviceName} with proto file %s`, filepath)
// Load gRPC package definition as a gRPC object hierarchy.
const packageDefinition = await load(filepath, grpcOptions)
const rpc = loadPackageDefinition(packageDefinition)
// Create ssl credentials to use with the gRPC client.
let creds = await createSslCreds(cert)
// Add macaroon to crenentials if service requires macaroons.
if (useMacaroon) {
// If we are trying to connect to the internal lnd, wait up to 20 seconds for the macaroon to be generated.
if (waitForMacaroon) {
await waitForFile(macaroon, 20000)
}
const macaroonCreds = await createMacaroonCreds(macaroon)
creds = credentials.combineChannelCredentials(creds, macaroonCreds)
}
// Create a new gRPC client instance.
this.service = new rpc.lnrpc[this.serviceName](host, creds)
try {
// Wait up to 10 seconds for the gRPC connection to be established.
return await promisifiedCall(this.service, this.service.waitForReady, getDeadline(10))
} catch (e) {
grpcLog.warn(`Unable to connect to ${this.serviceName} service`, e)
this.service.close()
throw e
}
}
/**
* Subscribe to streams.
* Subclasses should implement this and add subscription streams to this.subscriptions.
*/
/**
* @param {...string} services optional list of services to subscribe to. if omitted, uses all services
* @services must be a subset of `this.subscriptions`
* @memberof GrpcService
*/
subscribe(...services) {
// this.subscriptions['something'] = this.service.subscribeToSomething()
// super.subscribe()
const allSubKeys = Object.keys(this.subscriptions)
// make sure we are subscribing to known services if a specific list is provided
const activeSubKeys =
services && services.length ? intersection(allSubKeys, services) : allSubKeys
// Close and clear subscriptions when they emit an end event.
activeSubKeys.forEach(key => {
const call = this.subscriptions[key]
call.on('end', () => {
grpcLog.info(`gRPC subscription "${this.serviceName}.${key}" ended.`)
delete this.subscriptions[key]
})
call.on('status', callStatus => {
if (callStatus.code === status.CANCELLED) {
delete this.subscriptions[key]
grpcLog.info(`gRPC subscription "${this.serviceName}.${key}" ended.`)
}
})
})
}
/**
* Unsubscribe from all streams.
* @param {...string} services optional list of services to unsubscribe from. if omitted, uses all services
* @services must be a subset of `this.subscriptions`
*/
async unsubscribe(...services) {
const allSubKeys = Object.keys(this.subscriptions)
// make sure we are unsubscribing from known services if a specific list is provided
const activeSubKeys =
services && services.length ? intersection(allSubKeys, services) : allSubKeys
grpcLog.info(`Unsubscribing from all ${this.serviceName} gRPC streams: %o`, activeSubKeys)
const cancellations = activeSubKeys.map(key => this._cancelSubscription(key))
await Promise.all(cancellations)
}
/**
* Unsubscribe from a single stream.
*/
async _cancelSubscription(key) {
grpcLog.info(`Unsubscribing from ${this.serviceName}.${key} gRPC stream`)
const call = this.subscriptions[key]
// Cancellation status callback handler.
const result = new Promise(resolve => {
call.on('status', callStatus => {
if (callStatus.code === status.CANCELLED) {
delete this.subscriptions[key]
grpcLog.info(`Unsubscribed from ${this.serviceName}.${key} gRPC stream`)
resolve()
}
})
call.on('end', () => {
delete this.subscriptions[key]
grpcLog.info(`Unsubscribed from ${this.serviceName}.${key} gRPC stream`)
resolve()
})
})
// Initiate cancellation request.
call.cancel()
// Resolve once we receive confirmation of the call's cancellation.
return result
}
_getConnectionSettings() {
const { id, type } = this.lndConfig
// Don't use macaroons when connecting to the local tmp instance.
const useMacaroon = this.useMacaroon && id !== 'tmp'
// If connecting to a local instance, wait for the macaroon file to exist.
const waitForMacaroon = type === 'local'
return { waitForMacaroon, useMacaroon }
}
}
export default GrpcService