-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy patherc20token.py
More file actions
109 lines (90 loc) · 5 KB
/
erc20token.py
File metadata and controls
109 lines (90 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import List
from common.utils.web3_utils import ZERO_ADDRESS
# Dependency dataclass
from indexer.domain.log import Log
from indexer.domain.token_transfer import TokenTransfer
from indexer.jobs.base_job import Collector, FilterTransactionDataJob
# Custom dataclass
from indexer.modules.custom.demo_job.domain.erc20_token_transfer import ERC20TokenTransfer
from indexer.specification.specification import TopicSpecification, TransactionFilterByLogs
# Utility
from indexer.utils.abi_setting import ERC20_TRANSFER_EVENT
def _filter_erc20_transfer_event(logs: List[Log]) -> List[TokenTransfer]:
token_transfers = []
for log in logs:
if log.topic0 == ERC20_TRANSFER_EVENT.get_signature():
decoded_data = ERC20_TRANSFER_EVENT.decode_log(log)
if decoded_data["from"] != ZERO_ADDRESS:
token_transfers.append(
ERC20TokenTransfer(
address=decoded_data["to"],
token_address=log.address,
value=decoded_data["value"],
block_timestamp=log.block_timestamp,
block_number=log.block_number,
transaction_hash=log.transaction_hash,
log_index=log.log_index,
)
)
return token_transfers
class DemoJob(FilterTransactionDataJob):
able_to_reorg = True
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._contract_list = self.user_defined_config.get("contract_address")
self.logger.info("Contracts to process %s", self._contract_list)
def get_filter(self):
topic_filter_list = [
TopicSpecification(addresses=self._contract_list, topics=[ERC20_TRANSFER_EVENT.get_signature()])
]
return TransactionFilterByLogs(topic_filter_list)
def _udf(self, logs: List[Log], output: Collector[ERC20TokenTransfer]):
"""Process input data and collect output results.
Args:
*args: Variable number of input parameters. Each parameter must be a list composed of Domain
subclasses. These parameters will declare existing dataclass you may need for your job.
The indexer will automatically run other jobs and prepare the dataclass
Example: List[Block], List[Transaction]
output: A Collector type output parameter. Must declare the Domain types it will collect in its
generic type. The generic type will declare output dataclass your job outputs. This is helpful
if you write other job which depends on these dataclasses and the indexer should run this job
or not.
Example: Collector[DomainA] or Collector[Union[DomainA, DomainB]].
Note:
- All input parameters must be lists of Domain subclasses
- The output parameter is required
- The output parameter must accurately declare collected data types through generics
- Indexer will automatically establish task dependencies and schedule based on the function signature
- Type annotations are strictly enforced by indexer. Although Python itself does not enforce type hints,
only _udf parameter definitions that comply with the typing rules can be executed properly.
Example:
>>> def _udf(self, blocks: List[Block], output: Collector[Transaction]):
... # Process block data, collect transaction data
... for block in blocks:
... output.Collects(block.transactions)
>>> def _udf(
... self,
... erc20_token_transfers: List[ERC20TokenTransfer],
... erc721_token_transfers: List[ERC721TokenTransfer],
... erc1155_token_transfers: List[ERC1155TokenTransfer],
... output: Collector[Union[TokenBalance, CurrentTokenBalance]]
... ):
... # Process ERC20TokenTransfer, ERC721TokenTransfer and ERC1155TokenTransfer
... # collect TokenBalance, CurrentTokenBalance
... token_transfers = erc20_token_transfers + erc721_token_transfers + erc1155_token_transfers
...
... token_balances = []
... for token_transfer in token_transfers:
... token_balance = deal_with(token_transfer)
... output.collect(token_balance)
... token_balances.append(token_balance)
... current_token_balances = distinct(token_balances)
... output.collects(current_token_balances)
"""
# Core logic of UDF
erc20_token_transfers = _filter_erc20_transfer_event(logs)
# This is one of the functions that collect dataclass into unified buffer
# The other functions can be found in indexer/jobs/base_job.py
output.collect_domains(erc20_token_transfers)