-
Notifications
You must be signed in to change notification settings - Fork 133
/
filters.py
177 lines (140 loc) · 4.88 KB
/
filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from eth_utils import (
to_tuple,
is_bytes,
is_address,
is_same_address,
is_integer,
)
from queue import (
Queue,
Empty,
)
class Filter(object):
filter_params = None
filter_fn = None
values = None
queue = None
def __init__(self, filter_params, filter_fn=None):
self.filter_params = filter_params
self.filter_fn = filter_fn
self.values = []
self.queue = Queue()
@to_tuple
def get_changes(self):
while True:
try:
yield self.queue.get_nowait()
except Empty:
break
def get_all(self):
return tuple(self.values)
def add(self, *values):
for item in values:
if self.filter_fn is not None and not self.filter_fn(item):
continue
self.values.append(item)
self.queue.put_nowait(item)
def remove(self, *values):
try:
values_to_remove = set(values)
except TypeError:
# log filters are dicts which are not hashable
values_to_remove = values
queued_values = self.get_changes()
self.values = [
value
for value
in self.get_all()
if value not in values_to_remove
]
for value in queued_values:
if value in values_to_remove:
continue
self.queue.put_nowait(value)
def is_tuple(value):
return isinstance(value, tuple)
def is_topic_string(value):
return is_bytes(value) and len(value) == 32
def is_topic(value):
return value is None or is_topic_string(value)
def is_flat_topic_array(value):
return is_tuple(value) and all(is_topic(item) for item in value)
def is_nested_topic_array(value):
return bool(value) and is_tuple(value) and all((is_topic_array(item) for item in value))
def is_topic_array(value):
return is_flat_topic_array(value) or is_nested_topic_array(value)
def check_single_topic_match(log_topic, filter_topic):
if filter_topic is None:
return True
# python2 thinks string and bytes values can be equal.
return filter_topic == log_topic and type(log_topic) is type(filter_topic)
def check_if_from_block_match(block_number, _type, from_block):
if from_block is None or from_block == "latest":
return _type == "mined"
elif from_block in {"earliest", "pending"}:
return _type == "pending"
elif is_integer(from_block):
return is_integer(block_number) and block_number >= from_block
else:
raise ValueError("Unrecognized from_block format: {0}".format(from_block))
def check_if_to_block_match(block_number, _type, to_block):
if to_block is None or to_block == "latest":
return _type == "mined"
elif to_block in {"earliest", "pending"}:
return _type == "pending"
elif is_integer(to_block):
return is_integer(block_number) and block_number <= to_block
else:
raise ValueError("Unrecognized to_block format: {0}".format(to_block))
def check_if_log_matches_flat_topics(log_topics, filter_topics):
if not filter_topics:
return True
elif len(log_topics) < len(filter_topics):
return False
else:
return all(
check_single_topic_match(left, right)
for left, right
in zip(log_topics, filter_topics)
)
def check_if_topics_match(log_topics, filter_topics):
if filter_topics is None:
return True
elif is_flat_topic_array(filter_topics):
return check_if_log_matches_flat_topics(log_topics, filter_topics)
elif is_nested_topic_array(filter_topics):
return any(
check_if_log_matches_flat_topics(log_topics, sub_filter_topics)
for sub_filter_topics
in filter_topics
)
else:
raise ValueError("Unrecognized topics format: {0}".format(filter_topics))
def check_if_address_match(address, addresses):
if addresses is None:
return True
if is_tuple(addresses):
return any(
is_same_address(address, item)
for item
in addresses
)
elif is_address(addresses):
return is_same_address(addresses, address)
else:
raise ValueError("Unrecognized address format: {0}".format(addresses))
def check_if_log_matches(log_entry,
from_block,
to_block,
addresses,
topics):
if not check_if_from_block_match(log_entry['block_number'], log_entry['type'], from_block):
return False
elif not check_if_to_block_match(log_entry['block_number'], log_entry['type'], to_block):
return False
elif not check_if_address_match(log_entry['address'], addresses):
return False
elif not check_if_topics_match(log_entry['topics'], topics):
return False
else:
return True