Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
Changelog
=========

Next release
============

- Breaking change: the "pyaleph_processing_pending_messages_action_total" and
"pyaleph_processing_pending_messages_action_total" values are no longer part of the metrics.

Version 0.2.1
=============

Expand Down
94 changes: 44 additions & 50 deletions src/aleph/chains/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
import json
import logging
from enum import IntEnum
from typing import Dict, Optional, Union, Tuple, List
from typing import Dict, Optional, Tuple, List

from aleph_message.models import MessageType
from bson import ObjectId
from pymongo import UpdateOne

from aleph.handlers.forget import handle_forget_message
from aleph.handlers.storage import handle_new_storage
from aleph.model.db_bulk_operation import DbBulkOperation
from aleph.model.filepin import PermanentPin
from aleph.model.messages import Message, CappedMessage
from aleph.model.messages import CappedMessage, Message
from aleph.model.pending import PendingMessage, PendingTX
from aleph.network import check_message as check_message_fn
from aleph.permissions import check_sender_authorization
Expand All @@ -26,7 +27,7 @@
LOGGER = logging.getLogger("chains.common")


async def get_verification_buffer(message):
async def get_verification_buffer(message: Dict) -> bytes:
"""Returns a serialized string to verify the message integrity
(this is was it signed)
"""
Expand Down Expand Up @@ -98,33 +99,32 @@ async def mark_message_for_retry(


async def incoming(
message,
message: Dict,
chain_name: Optional[str] = None,
tx_hash: Optional[str] = None,
height: Optional[int] = None,
seen_ids: Optional[Dict[Tuple, int]] = None,
check_message: bool = False,
retrying: bool = False,
bulk_operation: bool = False,
existing_id: Optional[ObjectId] = None,
) -> Union[IncomingStatus, UpdateOne]:
) -> Tuple[IncomingStatus, List[DbBulkOperation]]:
"""New incoming message from underlying chain.

For regular messages it will be marked as confirmed
if existing in database, created if not.
"""

hash = message["item_hash"]
item_hash = message["item_hash"]
sender = message["sender"]
ids_key = (hash, sender, chain_name)
ids_key = (item_hash, sender, chain_name)

if chain_name and tx_hash and height and seen_ids is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED
return IncomingStatus.MESSAGE_HANDLED, []

filters = {
"item_hash": hash,
"item_hash": item_hash,
"chain": message["chain"],
"sender": message["sender"],
"type": message["type"],
Expand All @@ -142,12 +142,12 @@ async def incoming(
)

if message is None:
return IncomingStatus.MESSAGE_HANDLED
return IncomingStatus.MESSAGE_HANDLED, []

if retrying:
LOGGER.debug("(Re)trying %s." % hash)
LOGGER.debug("(Re)trying %s." % item_hash)
else:
LOGGER.info("Incoming %s." % hash)
LOGGER.info("Incoming %s." % item_hash)

# we set the incoming chain as default for signature
message["chain"] = message.get("chain", chain_name)
Expand Down Expand Up @@ -182,7 +182,7 @@ async def incoming(
if seen_ids is not None and height is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED
return IncomingStatus.MESSAGE_HANDLED, []
else:
seen_ids[ids_key] = height
else:
Expand All @@ -194,7 +194,7 @@ async def incoming(
# chain_name in [c['chain'] for c in existing['confirmations']]):
# return

LOGGER.debug("Updating %s." % hash)
LOGGER.debug("Updating %s." % item_hash)

if chain_name and tx_hash and height:
# we need to update messages adding the confirmation
Expand All @@ -209,12 +209,12 @@ async def incoming(
content = await get_message_content(message)

except InvalidContent:
LOGGER.warning("Can't get content of object %r, won't retry." % hash)
return IncomingStatus.FAILED_PERMANENTLY
LOGGER.warning("Can't get content of object %r, won't retry." % item_hash)
return IncomingStatus.FAILED_PERMANENTLY, []

except (ContentCurrentlyUnavailable, Exception) as e:
if not isinstance(e, ContentCurrentlyUnavailable):
LOGGER.exception("Can't get content of object %r" % hash)
LOGGER.exception("Can't get content of object %r" % item_hash)
await mark_message_for_retry(
message=message,
chain_name=chain_name,
Expand All @@ -224,7 +224,7 @@ async def incoming(
retrying=retrying,
existing_id=existing_id,
)
return IncomingStatus.RETRYING_LATER
return IncomingStatus.RETRYING_LATER, []

json_content = content.value
if json_content.get("address", None) is None:
Expand All @@ -249,8 +249,8 @@ async def incoming(
else:
handling_result = True
except UnknownHashError:
LOGGER.warning(f"Invalid IPFS hash for message {hash}, won't retry.")
return IncomingStatus.FAILED_PERMANENTLY
LOGGER.warning(f"Invalid IPFS hash for message {item_hash}, won't retry.")
return IncomingStatus.FAILED_PERMANENTLY, []
except Exception:
LOGGER.exception("Error using the message type handler")
handling_result = None
Expand All @@ -266,29 +266,29 @@ async def incoming(
retrying=retrying,
existing_id=existing_id,
)
return IncomingStatus.RETRYING_LATER
return IncomingStatus.RETRYING_LATER, []

if not handling_result:
LOGGER.warning(
"Message type handler has failed permanently for "
"%r, won't retry." % hash
"%r, won't retry." % item_hash
)
return IncomingStatus.FAILED_PERMANENTLY
return IncomingStatus.FAILED_PERMANENTLY, []

if not await check_sender_authorization(message, json_content):
LOGGER.warning("Invalid sender for %s" % hash)
return IncomingStatus.MESSAGE_HANDLED
LOGGER.warning("Invalid sender for %s" % item_hash)
return IncomingStatus.MESSAGE_HANDLED, []

if seen_ids is not None and height is not None:
if ids_key in seen_ids.keys():
if height > seen_ids[ids_key]:
return IncomingStatus.MESSAGE_HANDLED
return IncomingStatus.MESSAGE_HANDLED, []
else:
seen_ids[ids_key] = height
else:
seen_ids[ids_key] = height

LOGGER.debug("New message to store for %s." % hash)
LOGGER.debug("New message to store for %s." % item_hash)
# message.update(new_values)
updates["$set"] = {
"content": json_content,
Expand All @@ -308,20 +308,23 @@ async def incoming(
# await pin_hash(hash)

if should_commit:
action = UpdateOne(filters, updates, upsert=True)
if not bulk_operation:
await Message.collection.bulk_write([action])
await CappedMessage.collection.bulk_write([action])
else:
return action
return IncomingStatus.MESSAGE_HANDLED


async def invalidate(chain_name, block_height):
"""Invalidates a particular block height from an underlying chain
(in case of forks)
update_op = UpdateOne(filters, updates, upsert=True)
bulk_ops = [
DbBulkOperation(CappedMessage, update_op),
DbBulkOperation(Message, update_op),
]
return IncomingStatus.MESSAGE_HANDLED, bulk_ops

return IncomingStatus.MESSAGE_HANDLED, []


async def process_one_message(message: Dict):
"""
pass
Helper function to process a message on the spot.
"""
status, ops = await incoming(message)
for op in ops:
await op.collection.collection.bulk_write([op.operation])


async def get_chaindata(messages, bulk_threshold=2000):
Expand Down Expand Up @@ -409,12 +412,3 @@ async def incoming_chaindata(content, context):
For now we only add it to the database, it will be processed later.
"""
await PendingTX.collection.insert_one({"content": content, "context": context})


async def join_tasks(tasks, seen_ids):
try:
await asyncio.gather(*tasks)
except Exception:
LOGGER.exception("error in incoming task")
# seen_ids.clear()
tasks.clear()
Loading