-
Notifications
You must be signed in to change notification settings - Fork 101
/
ChainFollower.ts
125 lines (119 loc) · 4.23 KB
/
ChainFollower.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import {
ChainSyncClient,
createChainSyncClient,
isMaryBlock,
Schema
} from '@cardano-ogmios/client'
import pRetry from 'p-retry'
import { assetFingerprint } from './assetFingerprint'
import { Config } from './Config'
import util, { errors, RunnableModuleState } from '@cardano-graphql/util'
import { HasuraClient } from './HasuraClient'
import PgBoss from 'pg-boss'
import { dummyLogger, Logger } from 'ts-log'
const MODULE_NAME = 'ChainFollower'
export class ChainFollower {
private chainSyncClient: ChainSyncClient
private queue: PgBoss
private state: RunnableModuleState
constructor (
readonly hasuraClient: HasuraClient,
private logger: Logger = dummyLogger,
queueConfig: Config['db']
) {
this.state = null
this.queue = new PgBoss({
application_name: 'cardano-graphql',
...queueConfig
})
}
public async initialize (ogmiosConfig: Config['ogmios']) {
if (this.state !== null) return
this.state = 'initializing'
this.logger.info({ module: MODULE_NAME }, 'Initializing')
await pRetry(async () => {
this.chainSyncClient = await createChainSyncClient({
rollBackward: async ({ point, tip }, requestNext) => {
if (point !== 'origin') {
this.logger.info(
{ module: MODULE_NAME, tip, rollbackPoint: point }, 'Rolling back'
)
const deleteResult = await this.hasuraClient.deleteAssetsAfterSlot(point.slot)
this.logger.info({ module: MODULE_NAME }, `Deleted ${deleteResult} assets`)
} else {
this.logger.info({ module: MODULE_NAME }, 'Rolling back to genesis')
const deleteResult = await this.hasuraClient.deleteAssetsAfterSlot(0)
this.logger.info({ module: MODULE_NAME }, `Deleted ${deleteResult} assets`)
}
requestNext()
},
rollForward: async ({ block }, requestNext) => {
let b: Schema.BlockMary
if (isMaryBlock(block)) {
b = block.mary as Schema.BlockMary
}
if (b !== undefined) {
for (const tx of b.body) {
for (const entry of Object.entries(tx.body.mint.assets)) {
const [policyId, assetName] = entry[0].split('.')
const assetId = `${policyId}${assetName !== undefined ? assetName : ''}`
if (!(await this.hasuraClient.hasAsset(assetId))) {
const asset = {
assetId,
assetName,
firstAppearedInSlot: b.header.slot,
fingerprint: assetFingerprint(policyId, assetName),
policyId
}
await this.hasuraClient.insertAssets([asset])
const SIX_HOURS = 21600
const THREE_MONTHS = 365
await this.queue.publish('asset-metadata-fetch-initial', { assetId }, {
retryDelay: SIX_HOURS,
retryLimit: THREE_MONTHS
})
}
}
}
}
requestNext()
}
},
this.logger.error,
(code, reason) => {
this.logger.error({ module: MODULE_NAME, code }, reason)
},
{ connection: ogmiosConfig })
}, {
factor: 1.2,
retries: 100,
onFailedAttempt: util.onFailedAttemptFor(
'Establishing connection to cardano-node chain-sync',
this.logger
)
})
this.state = 'initialized'
this.logger.info({ module: MODULE_NAME }, 'Initialized')
}
public async start (points: Schema.Point[]) {
if (this.state !== 'initialized') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'start')
}
this.logger.info({ module: MODULE_NAME }, 'Starting')
await this.queue.start()
await this.chainSyncClient.startSync(points)
this.logger.info({ module: MODULE_NAME }, 'Started')
}
public async shutdown () {
if (this.state !== 'running') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'shutdown')
}
this.logger.info({ module: MODULE_NAME }, 'Shutting down')
await this.chainSyncClient.shutdown()
await this.queue.stop()
this.state = 'initialized'
this.logger.info(
{ module: MODULE_NAME },
'Shutdown complete')
}
}