-
Notifications
You must be signed in to change notification settings - Fork 0
/
logFetcher.ts
68 lines (65 loc) · 1.95 KB
/
logFetcher.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import {
BaseEntityFetcher,
Blockchain,
IndexableEntityType,
PendingEntityStorage,
} from '@aleph-indexer/framework'
import { ServiceBroker } from 'moleculer'
import { EthereumClient } from '../../../../sdk/client.js'
import { EthereumRawLog } from '../../../../types.js'
import { EthereumRawLogStorage } from './dal/rawLog.js'
/**
* The main class of the fetcher service.
*/
export class EthereumLogFetcher extends BaseEntityFetcher<EthereumRawLog> {
constructor(
protected ethereumClient: EthereumClient,
protected broker: ServiceBroker,
protected pendingLogDAL: PendingEntityStorage,
protected pendingLogCacheDAL: PendingEntityStorage,
protected pendingLogFetchDAL: PendingEntityStorage,
protected rawLogDAL: EthereumRawLogStorage,
protected blockchainId: Blockchain = Blockchain.Ethereum,
) {
super(
IndexableEntityType.Log,
blockchainId,
broker,
pendingLogDAL,
pendingLogCacheDAL,
pendingLogFetchDAL,
rawLogDAL,
)
}
protected filterEntityId(id: string): boolean {
// @todo: Filter valid ethereum signatures
return id.toLocaleLowerCase() === id
}
protected async remoteFetchIds(
ids: string[],
isRetry: boolean,
): Promise<(EthereumRawLog | null | undefined)[]> {
// @note: Look for them on the cache the first time.
// In case they are not there do a fallback request to rpc
if (!isRetry) {
// @note: Right now we are caching all logs in level
// @note: This is not necessary (just a performance hack)
return Promise.all(
ids.map(async (id) => {
try {
const log = await this.rawLogDAL.get([id])
return log || null
} catch (e) {
this.log('remoteFetchIds error', e)
return null
}
}),
)
} else {
this.log('fetching logs from RPC')
return this.ethereumClient.getLogs(ids, {
swallowErrors: true,
})
}
}
}