-
Notifications
You must be signed in to change notification settings - Fork 7
/
generic_worker.py
564 lines (509 loc) · 22 KB
/
generic_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
import asyncio
import json
import multiprocessing
import resource
import time
from functools import partial
from signal import SIGINT
from signal import signal
from signal import SIGQUIT
from signal import SIGTERM
from typing import Dict
from typing import Union
from uuid import uuid4
import httpx
import tenacity
from aio_pika import IncomingMessage
from aio_pika import Message
from aio_pika.pool import Pool
from eth_utils import keccak
from httpx import AsyncClient
from httpx import AsyncHTTPTransport
from httpx import Limits
from httpx import Timeout
from ipfs_client.dag import IPFSAsyncClientError
from ipfs_client.main import AsyncIPFSClient
from pydantic import BaseModel
from redis import asyncio as aioredis
from tenacity import retry
from tenacity import stop_after_attempt
from tenacity import wait_random_exponential
from web3 import Web3
from snapshotter.settings.config import settings
from snapshotter.utils.callback_helpers import get_rabbitmq_channel
from snapshotter.utils.callback_helpers import get_rabbitmq_robust_connection_async
from snapshotter.utils.callback_helpers import send_failure_notifications_async
from snapshotter.utils.data_utils import get_source_chain_id
from snapshotter.utils.default_logger import logger
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.models.data_models import SnapshotterIssue
from snapshotter.utils.models.data_models import SnapshotterReportState
from snapshotter.utils.models.data_models import SnapshotterStates
from snapshotter.utils.models.data_models import SnapshotterStateUpdate
from snapshotter.utils.models.data_models import UnfinalizedSnapshot
from snapshotter.utils.models.message_models import AggregateBase
from snapshotter.utils.models.message_models import PayloadCommitMessage
from snapshotter.utils.models.message_models import PowerloomCalculateAggregateMessage
from snapshotter.utils.models.message_models import PowerloomSnapshotProcessMessage
from snapshotter.utils.models.message_models import PowerloomSnapshotSubmittedMessage
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping
from snapshotter.utils.redis.redis_keys import submitted_unfinalized_snapshot_cids
from snapshotter.utils.rpc import RpcHelper
def web3_storage_retry_state_callback(retry_state: tenacity.RetryCallState):
"""
Callback function to handle retry attempts for web3 storage upload.
Args:
retry_state (tenacity.RetryCallState): The current state of the retry call.
Returns:
None
"""
if retry_state and retry_state.outcome.failed:
logger.warning(
f'Encountered web3 storage upload exception: {retry_state.outcome.exception()} | args: {retry_state.args}, kwargs:{retry_state.kwargs}',
)
def ipfs_upload_retry_state_callback(retry_state: tenacity.RetryCallState):
"""
Callback function to handle retry attempts for IPFS uploads.
Args:
retry_state (tenacity.RetryCallState): The current state of the retry attempt.
Returns:
None
"""
if retry_state and retry_state.outcome.failed:
logger.warning(
f'Encountered ipfs upload exception: {retry_state.outcome.exception()} | args: {retry_state.args}, kwargs:{retry_state.kwargs}',
)
class GenericAsyncWorker(multiprocessing.Process):
_async_transport: AsyncHTTPTransport
_rmq_connection_pool: Pool
_rmq_channel_pool: Pool
_aioredis_pool: RedisPoolCache
_redis_conn: aioredis.Redis
_rpc_helper: RpcHelper
_anchor_rpc_helper: RpcHelper
_httpx_client: AsyncClient
_web3_storage_upload_transport: AsyncHTTPTransport
_web3_storage_upload_client: AsyncClient
def __init__(self, name, **kwargs):
"""
Initializes a GenericAsyncWorker instance.
Args:
name (str): The name of the worker.
**kwargs: Additional keyword arguments to pass to the superclass constructor.
"""
self._core_rmq_consumer: asyncio.Task
self._exchange_name = f'{settings.rabbitmq.setup.callbacks.exchange}:{settings.namespace}'
self._unique_id = f'{name}-' + keccak(text=str(uuid4())).hex()[:8]
self._running_callback_tasks: Dict[str, asyncio.Task] = dict()
super(GenericAsyncWorker, self).__init__(name=name, **kwargs)
self._protocol_state_contract = None
self._qos = 1
self._rate_limiting_lua_scripts = None
self.protocol_state_contract_address = settings.protocol_state.address
self._commit_payload_exchange = (
f'{settings.rabbitmq.setup.commit_payload.exchange}:{settings.namespace}'
)
self._event_detector_exchange = f'{settings.rabbitmq.setup.event_detector.exchange}:{settings.namespace}'
self._event_detector_routing_key_prefix = f'powerloom-event-detector:{settings.namespace}:{settings.instance_id}.'
self._commit_payload_routing_key = (
f'powerloom-backend-commit-payload:{settings.namespace}:{settings.instance_id}.Data'
)
self._initialized = False
def _signal_handler(self, signum, frame):
"""
Signal handler function that cancels the core RMQ consumer when a SIGINT, SIGTERM or SIGQUIT signal is received.
Args:
signum (int): The signal number.
frame (frame): The current stack frame at the time the signal was received.
"""
if signum in [SIGINT, SIGTERM, SIGQUIT]:
self._core_rmq_consumer.cancel()
@retry(
wait=wait_random_exponential(multiplier=1, max=10),
stop=stop_after_attempt(5),
retry=tenacity.retry_if_not_exception_type(httpx.HTTPStatusError),
after=web3_storage_retry_state_callback,
)
async def _upload_web3_storage(self, snapshot: bytes):
"""
Uploads the given snapshot to web3 storage.
Args:
snapshot (bytes): The snapshot to upload.
Returns:
None
Raises:
HTTPError: If the upload fails.
"""
web3_storage_settings = settings.web3storage
# if no api token is provided, skip
if not web3_storage_settings.api_token:
return
files = {'file': snapshot}
r = await self._web3_storage_upload_client.post(
url=f'{web3_storage_settings.url}{web3_storage_settings.upload_url_suffix}',
files=files,
)
r.raise_for_status()
resp = r.json()
self._logger.info('Uploaded snapshot to web3 storage: {} | Response: {}', snapshot, resp)
@retry(
wait=wait_random_exponential(multiplier=1, max=10),
stop=stop_after_attempt(5),
retry=tenacity.retry_if_not_exception_type(IPFSAsyncClientError),
after=ipfs_upload_retry_state_callback,
)
async def _upload_to_ipfs(self, snapshot: bytes, _ipfs_writer_client: AsyncIPFSClient):
"""
Uploads a snapshot to IPFS using the provided AsyncIPFSClient.
Args:
snapshot (bytes): The snapshot to upload.
_ipfs_writer_client (AsyncIPFSClient): The IPFS client to use for uploading.
Returns:
str: The CID of the uploaded snapshot.
"""
snapshot_cid = await _ipfs_writer_client.add_bytes(snapshot)
return snapshot_cid
async def _commit_payload(
self,
task_type: str,
_ipfs_writer_client: AsyncIPFSClient,
project_id: str,
epoch: Union[
PowerloomSnapshotProcessMessage,
PowerloomSnapshotSubmittedMessage,
PowerloomCalculateAggregateMessage,
],
snapshot: Union[BaseModel, AggregateBase],
storage_flag: bool,
):
"""
Commits the given snapshot to IPFS and web3 storage (if enabled), and sends messages to the event detector and relayer
dispatch queues.
Args:
task_type (str): The type of task being committed.
_ipfs_writer_client (AsyncIPFSClient): The IPFS client to use for uploading the snapshot.
project_id (str): The ID of the project the snapshot belongs to.
epoch (Union[PowerloomSnapshotProcessMessage, PowerloomSnapshotSubmittedMessage, PowerloomCalculateAggregateMessage]): The epoch the snapshot belongs to.
snapshot (Union[BaseModel, AggregateBase]): The snapshot to commit.
storage_flag (bool): Whether to upload the snapshot to web3 storage.
Returns:
None
"""
# payload commit sequence begins
# upload to IPFS
snapshot_json = json.dumps(snapshot.dict(by_alias=True), sort_keys=True, separators=(',', ':'))
snapshot_bytes = snapshot_json.encode('utf-8')
try:
snapshot_cid = await self._upload_to_ipfs(snapshot_bytes, _ipfs_writer_client)
except Exception as e:
self._logger.opt(exception=True).error(
'Exception uploading snapshot to IPFS for epoch {}: {}, Error: {},'
'sending failure notifications', epoch, snapshot, e,
)
notification_message = SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.MISSED_SNAPSHOT.value,
projectID=project_id,
epochId=str(epoch.epochId),
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': f'Error : {e}'}),
)
await send_failure_notifications_async(
client=self._client, message=notification_message,
)
else:
# add to zset of unfinalized snapshot CIDs
unfinalized_entry = UnfinalizedSnapshot(
snapshotCid=snapshot_cid,
snapshot=snapshot.dict(by_alias=True),
)
await self._redis_conn.zadd(
name=submitted_unfinalized_snapshot_cids(project_id),
mapping={unfinalized_entry.json(sort_keys=True): epoch.epochId},
)
# publish snapshot submitted event to event detector queue
snapshot_submitted_message = PowerloomSnapshotSubmittedMessage(
snapshotCid=snapshot_cid,
epochId=epoch.epochId,
projectId=project_id,
timestamp=int(time.time()),
)
try:
async with self._rmq_connection_pool.acquire() as connection:
async with self._rmq_channel_pool.acquire() as channel:
# Prepare a message to send
commit_payload_exchange = await channel.get_exchange(
name=self._event_detector_exchange,
)
message_data = snapshot_submitted_message.json().encode()
# Prepare a message to send
message = Message(message_data)
await commit_payload_exchange.publish(
message=message,
routing_key=self._event_detector_routing_key_prefix + 'SnapshotSubmitted',
)
self._logger.debug(
'Sent snapshot submitted message to event detector queue | '
'Project: {} | Epoch: {} | Snapshot CID: {}',
project_id, epoch.epochId, snapshot_cid,
)
except Exception as e:
self._logger.opt(exception=True).error(
'Exception sending snapshot submitted message to event detector queue: {} | Project: {} | Epoch: {} | Snapshot CID: {}',
e, project_id, epoch.epochId, snapshot_cid,
)
try:
await self._redis_conn.zremrangebyscore(
name=submitted_unfinalized_snapshot_cids(project_id),
min='-inf',
max=epoch.epochId - 32,
)
except:
pass
# send to relayer dispatch queue
await self._send_payload_commit_service_queue(
task_type=task_type,
project_id=project_id,
epoch=epoch,
snapshot_cid=snapshot_cid,
)
# upload to web3 storage
if storage_flag:
asyncio.ensure_future(self._upload_web3_storage(snapshot_bytes))
async def _rabbitmq_consumer(self, loop):
"""
Consume messages from a RabbitMQ queue.
Args:
loop (asyncio.AbstractEventLoop): The event loop to use for the consumer.
Returns:
None
"""
self._rmq_connection_pool = Pool(get_rabbitmq_robust_connection_async, max_size=5, loop=loop)
self._rmq_channel_pool = Pool(
partial(get_rabbitmq_channel, self._rmq_connection_pool), max_size=20,
loop=loop,
)
async with self._rmq_channel_pool.acquire() as channel:
await channel.set_qos(self._qos)
exchange = await channel.get_exchange(
name=self._exchange_name,
)
q_obj = await channel.get_queue(
name=self._q,
ensure=False,
)
self._logger.debug(
f'Consuming queue {self._q} with routing key {self._rmq_routing}...',
)
await q_obj.bind(exchange, routing_key=self._rmq_routing)
await q_obj.consume(self._on_rabbitmq_message)
async def _send_payload_commit_service_queue(
self,
task_type: str,
project_id: str,
epoch: Union[
PowerloomSnapshotProcessMessage,
PowerloomSnapshotSubmittedMessage,
PowerloomCalculateAggregateMessage,
],
snapshot_cid: str,
):
"""
Sends a commit payload message to the commit payload queue via RabbitMQ.
Args:
task_type (str): The type of task being performed.
project_id (str): The ID of the project.
epoch (Union[PowerloomSnapshotProcessMessage, PowerloomSnapshotSubmittedMessage, PowerloomCalculateAggregateMessage]): The epoch object.
snapshot_cid (str): The CID of the snapshot.
Raises:
Exception: If there is an error getting the source chain ID or sending the message to the commit payload queue.
Returns:
None
"""
try:
source_chain_details = await get_source_chain_id(
redis_conn=self._redis_conn,
rpc_helper=self._anchor_rpc_helper,
state_contract_obj=self._protocol_state_contract,
)
except Exception as e:
self._logger.opt(exception=True).error(
'Exception getting source chain id: {}', e,
)
raise e
commit_payload = PayloadCommitMessage(
sourceChainId=source_chain_details,
projectId=project_id,
epochId=epoch.epochId,
snapshotCID=snapshot_cid,
)
# send through rabbitmq
try:
async with self._rmq_connection_pool.acquire() as connection:
async with self._rmq_channel_pool.acquire() as channel:
# Prepare a message to send
commit_payload_exchange = await channel.get_exchange(
name=self._commit_payload_exchange,
)
message_data = commit_payload.json().encode()
# Prepare a message to send
message = Message(message_data)
await commit_payload_exchange.publish(
message=message,
routing_key=self._commit_payload_routing_key,
)
self._logger.info(
'Sent message to commit payload queue: {}', commit_payload,
)
except Exception as e:
self._logger.opt(exception=True).error(
(
'Exception committing snapshot CID {} to commit payload queue:'
' {} | dump: {}'
),
snapshot_cid,
e,
)
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch.epochId, SnapshotterStates.SNAPSHOT_SUBMIT_PAYLOAD_COMMIT.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='failed', error=str(e), timestamp=int(time.time()),
).json(),
},
)
else:
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch.epochId, SnapshotterStates.SNAPSHOT_SUBMIT_PAYLOAD_COMMIT.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()),
).json(),
},
)
async def _on_rabbitmq_message(self, message: IncomingMessage):
"""
Callback function that is called when a message is received from RabbitMQ.
:param message: The incoming message from RabbitMQ.
"""
pass
async def _init_redis_pool(self):
"""
Initializes the Redis connection pool and sets the `_redis_conn` attribute to the created connection pool.
"""
self._aioredis_pool = RedisPoolCache()
await self._aioredis_pool.populate()
self._redis_conn = self._aioredis_pool._aioredis_pool
async def _init_rpc_helper(self):
"""
Initializes the RpcHelper objects for the worker and anchor chain, and sets up the protocol state contract.
"""
self._rpc_helper = RpcHelper(rpc_settings=settings.rpc)
self._anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc)
self._protocol_state_contract = self._anchor_rpc_helper.get_current_node()['web3_client'].eth.contract(
address=Web3.toChecksumAddress(
self.protocol_state_contract_address,
),
abi=read_json_file(
settings.protocol_state.abi,
self._logger,
),
)
async def _init_httpx_client(self):
"""
Initializes the HTTPX client and transport objects for making HTTP requests.
"""
self._async_transport = AsyncHTTPTransport(
limits=Limits(
max_connections=200,
max_keepalive_connections=50,
keepalive_expiry=None,
),
)
self._client = AsyncClient(
timeout=Timeout(timeout=5.0),
follow_redirects=False,
transport=self._async_transport,
)
self._web3_storage_upload_transport = AsyncHTTPTransport(
limits=Limits(
max_connections=200,
max_keepalive_connections=settings.web3storage.max_idle_conns,
keepalive_expiry=settings.web3storage.idle_conn_timeout,
),
)
self._web3_storage_upload_client = AsyncClient(
timeout=Timeout(timeout=settings.web3storage.timeout),
follow_redirects=False,
transport=self._web3_storage_upload_transport,
headers={'Authorization': 'Bearer ' + settings.web3storage.api_token},
)
async def _init_protocol_meta(self):
# TODO: combine these into a single call
try:
source_block_time = await self._anchor_rpc_helper.web3_call(
[self._protocol_state_contract.functions.SOURCE_CHAIN_BLOCK_TIME()],
redis_conn=self._redis_conn,
)
# source_block_time = self._protocol_state_contract.functions.SOURCE_CHAIN_BLOCK_TIME().call()
except Exception as e:
self._logger.exception(
'Exception in querying protocol state for source chain block time: {}',
e,
)
else:
source_block_time = source_block_time[0]
self._source_chain_block_time = source_block_time / 10 ** 4
self._logger.debug('Set source chain block time to {}', self._source_chain_block_time)
try:
epoch_size = await self._anchor_rpc_helper.web3_call(
[self._protocol_state_contract.functions.EPOCH_SIZE()],
redis_conn=self._redis_conn,
)
except Exception as e:
self._logger.exception(
'Exception in querying protocol state for epoch size: {}',
e,
)
else:
self._epoch_size = epoch_size[0]
self._logger.debug('Set epoch size to {}', self._epoch_size)
async def init(self):
"""
Initializes the worker by initializing the Redis pool, HTTPX client, and RPC helper.
"""
if not self._initialized:
await self._init_redis_pool()
await self._init_httpx_client()
await self._init_rpc_helper()
await self._init_protocol_meta()
self._initialized = True
def run(self) -> None:
"""
Runs the worker by setting resource limits, registering signal handlers, starting the RabbitMQ consumer, and
running the event loop until it is stopped.
"""
self._logger = logger.bind(module=self.name)
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(
resource.RLIMIT_NOFILE,
(settings.rlimit.file_descriptors, hard),
)
for signame in [SIGINT, SIGTERM, SIGQUIT]:
signal(signame, self._signal_handler)
ev_loop = asyncio.get_event_loop()
self._logger.debug(
f'Starting asynchronous callback worker {self._unique_id}...',
)
self._core_rmq_consumer = asyncio.ensure_future(
self._rabbitmq_consumer(ev_loop),
)
try:
ev_loop.run_forever()
finally:
ev_loop.close()