-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.mjs
139 lines (112 loc) · 3.69 KB
/
index.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import { XrplClient } from 'xrpl-client'
import { createDirectory } from './lib/createDirectory.mjs'
import { onValidation } from './lib/onValidation.mjs'
import { onLedger } from './lib/onLedger.mjs'
import 'dotenv/config'
import assert from 'assert'
import wtf from 'wtfnode'
import { _health } from './bin/webserver.mjs'
const noLedgerTimeoutSec = Number(process.env.LEDGERTIMEOUTSEC || 15) || 15
let sigintEventHandler = false
let quitting = false
let aliveInterval
assert(process.env?.NODES, 'ENV var missing: NODES, containing: a comma separated list of websocket endpoints')
await createDirectory('store')
await createDirectory('store/xpop')
const connections = []
const connect = () => {
console.log('<<<<< CONNECTING >>>>>')
_health.reconnectCount++;
connections.map(c => {
console.log('# # # CLOSING', c.getState()?.server?.uri)
c.removeAllListeners('validation')
c.removeAllListeners('ledger')
c.removeAllListeners('online')
c.removeAllListeners('state')
c.removeAllListeners('error')
c.close()
})
connections.length = 0
process.env.NODES.split(',').map(h => h.trim())
.map(h => new XrplClient(h, {
assumeOfflineAfterSeconds: 10,
connectAttemptTimeoutSeconds: 10,
maxConnectionAttempts: null,
}))
.forEach(c => {
console.log('* * * CONNECTING', c.getState()?.server?.uri)
connections.push(c)
})
connections
.map(async c => {
const subscribe = async () => {
// await c.ready()
/**
* TODO: Auto disconnect if no messages for X
* TODO: Generate xPOPs for matching transactions
*/
try {
c.send({ command: "subscribe", streams: [ "validations" ] })
c.send({ command: "subscribe", streams: [ "ledger" ] })
// No transactions, to make it easier for clients transactions are
// processed in order (sorted on sequence) and emitted in order
// to clients to prevent async tx sequence problems.
} catch (e) {
console.log(e.message)
}
}
c.on("validation", validation => onValidation({
connectionUrl: c.getState()?.server?.uri,
networkId: c.getState()?.server?.networkId,
validation,
}))
c.on("ledger", ledger => {
clearTimeout(aliveInterval)
aliveInterval = setTimeout(() => {
console.log('Reconnecting, no recently closed ledger after sec.', noLedgerTimeoutSec)
connect()
}, noLedgerTimeoutSec * 1000)
return onLedger({
connectionUrl: c.getState()?.server?.uri,
networkId: c.getState()?.server?.networkId,
ledger,
connection: c,
})
})
c.on('online', subscribe)
c.on('state', subscribe)
// c.on('retry', () => subscribe())
// c.on('round', () => subscribe())
c.on('error', e => console.error(e?.message || e))
})
}
// Play nice with Docker etc.
if (!sigintEventHandler) {
sigintEventHandler = true
const quit = () => {
if (!quitting) {
clearTimeout(aliveInterval)
quitting = true
// Allow for re-quit shortly after
setTimeout(() => {
quitting = false
}, 1000)
console.log('Closing (interrupting) connections', connections.length)
connections
.map(async c => {
console.info('Interrupted', c.getState()?.server?.uri)
c.close()
})
if (process.env?.DEBUG) {
// Display open handles
console.log('-------------------')
wtf.dump()
console.log('-------------------' + `\n`)
}
}
}
process.on('SIGINT', quit) // Node
process.on('SIGTERM', quit) // Docker
}
// Here we go
connect()