Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 35 additions & 41 deletions src/aleph/chains/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from enum import IntEnum
from typing import Dict, Optional, Tuple, List

from aleph_message.models import MessageType
from bson import ObjectId
from pymongo import UpdateOne

Expand All @@ -23,11 +22,11 @@
from aleph.model.filepin import PermanentPin
from aleph.model.messages import CappedMessage, Message
from aleph.model.pending import PendingMessage, PendingTX
from aleph.network import check_message as check_message_fn
from aleph.network import verify_signature
from aleph.permissions import check_sender_authorization
from aleph.storage import get_json, pin_hash, add_json, get_message_content
from .tx_context import TxContext
from ..schemas.pending_messages import BasePendingMessage
from ..schemas.pending_messages import BasePendingMessage, PendingForgetMessage, PendingStoreMessage

LOGGER = logging.getLogger("chains.common")

Expand All @@ -53,12 +52,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height):
}


async def delayed_incoming(message, chain_name=None, tx_hash=None, height=None):
async def delayed_incoming(
message: BasePendingMessage,
chain_name: Optional[str] = None,
tx_hash: Optional[str] = None,
height: Optional[int] = None,
):
if message is None:
return
await PendingMessage.collection.insert_one(
{
"message": message,
"message": message.dict(exclude={"content"}),
"source": dict(
chain_name=chain_name,
tx_hash=tx_hash,
Expand All @@ -76,18 +80,20 @@ class IncomingStatus(IntEnum):


async def mark_message_for_retry(
message: Dict,
message: BasePendingMessage,
chain_name: Optional[str],
tx_hash: Optional[str],
height: Optional[int],
check_message: bool,
retrying: bool,
existing_id,
):
message_dict = message.dict(exclude={"content"})

if not retrying:
await PendingMessage.collection.insert_one(
{
"message": message,
"message": message_dict,
"source": dict(
chain_name=chain_name,
tx_hash=tx_hash,
Expand All @@ -105,7 +111,7 @@ async def mark_message_for_retry(


async def incoming(
message: Dict,
message: BasePendingMessage,
chain_name: Optional[str] = None,
tx_hash: Optional[str] = None,
height: Optional[int] = None,
Expand All @@ -120,8 +126,8 @@ async def incoming(
if existing in database, created if not.
"""

item_hash = message["item_hash"]
sender = message["sender"]
item_hash = message.item_hash
sender = message.sender
ids_key = (item_hash, sender, chain_name)

if chain_name and tx_hash and height and seen_ids is not None:
Expand All @@ -131,61 +137,49 @@ async def incoming(

filters = {
"item_hash": item_hash,
"chain": message["chain"],
"sender": message["sender"],
"type": message["type"],
"chain": message.chain,
"sender": message.sender,
"type": message.type,
}
existing = await Message.collection.find_one(
filters,
projection={"confirmed": 1, "confirmations": 1, "time": 1, "signature": 1},
)

if check_message:
if existing is None or (existing["signature"] != message["signature"]):
if existing is None or (existing["signature"] != message.signature):
# check/sanitize the message if needed
try:
message = await check_message_fn(
message, from_chain=(chain_name is not None)
)
await verify_signature(message)
except InvalidMessageError:
return IncomingStatus.FAILED_PERMANENTLY, []

if message is None:
return IncomingStatus.MESSAGE_HANDLED, []

if retrying:
LOGGER.debug("(Re)trying %s." % item_hash)
else:
LOGGER.info("Incoming %s." % item_hash)

# we set the incoming chain as default for signature
message["chain"] = message.get("chain", chain_name)

# if existing is None:
# # TODO: verify if search key is ok. do we need an unique key for messages?
# existing = await Message.collection.find_one(
# filters, projection={'confirmed': 1, 'confirmations': 1, 'time': 1})
updates: Dict[str, Dict]

if chain_name and tx_hash and height:
# We are getting a confirmation here
new_values = await mark_confirmed_data(chain_name, tx_hash, height)

updates = {
"$set": {
"confirmed": True,
},
"$min": {"time": message["time"]},
"$min": {"time": message.time},
"$addToSet": {"confirmations": new_values["confirmations"][0]},
}
else:
updates = {
"$max": {
"confirmed": False,
},
"$min": {"time": message["time"]},
"$min": {"time": message.time},
"$set": {},
}

# new_values = {'confirmed': False} # this should be our default.
should_commit = False
if existing:
if seen_ids is not None and height is not None:
Expand Down Expand Up @@ -237,19 +231,19 @@ async def incoming(

json_content = content.value
if json_content.get("address", None) is None:
json_content["address"] = message["sender"]
json_content["address"] = message.sender

if json_content.get("time", None) is None:
json_content["time"] = message["time"]
json_content["time"] = message.time

# warning: those handlers can modify message and content in place
# and return a status. None has to be retried, -1 is discarded, True is
# handled and kept.
# TODO: change this, it's messy.
try:
if message["type"] == MessageType.store:
if isinstance(message, PendingStoreMessage):
handling_result = await handle_new_storage(message, json_content)
elif message["type"] == MessageType.forget:
elif isinstance(message, PendingForgetMessage):
# Handling it here means that there we ensure that the message
# has been forgotten before it is saved on the node.
# We may want the opposite instead: ensure that the message has
Expand Down Expand Up @@ -298,14 +292,14 @@ async def incoming(
seen_ids[ids_key] = height

LOGGER.debug("New message to store for %s." % item_hash)
# message.update(new_values)

updates["$set"] = {
"content": json_content,
"size": len(content.raw_value),
"item_content": message.get("item_content"),
"item_type": message.get("item_type"),
"channel": message.get("channel"),
"signature": message.get("signature"),
"item_content": message.item_content,
"item_type": message.item_type.value,
"channel": message.channel,
"signature": message.signature,
**updates.get("$set", {}),
}
should_commit = True
Expand All @@ -324,7 +318,7 @@ async def incoming(
return IncomingStatus.MESSAGE_HANDLED, []


async def process_one_message(message: Dict, *args, **kwargs):
async def process_one_message(message: BasePendingMessage, *args, **kwargs):
"""
Helper function to process a message on the spot.
"""
Expand Down
13 changes: 6 additions & 7 deletions src/aleph/handlers/forget.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from aioipfs.api import RepoAPI
from aioipfs.exceptions import NotPinnedError
from aleph_message.models import ForgetMessage, MessageType
from aleph_message.models import ItemType
from aleph_message.models import ItemType, MessageType, ForgetContent

from aleph.model.filepin import PermanentPin
from aleph.model.hashes import delete_value
from aleph.model.messages import Message
from aleph.schemas.pending_messages import PendingForgetMessage
from aleph.services.ipfs.common import get_ipfs_api
from aleph.utils import item_type_from_hash

Expand Down Expand Up @@ -118,7 +118,7 @@ async def garbage_collect(storage_hash: str, storage_type: ItemType):


async def is_allowed_to_forget(
target_info: TargetMessageInfo, by: ForgetMessage
target_info: TargetMessageInfo, by: PendingForgetMessage
) -> bool:
"""Check if a forget message is allowed to 'forget' the target message given its hash."""
# Both senders are identical:
Expand All @@ -136,7 +136,7 @@ async def is_allowed_to_forget(


async def forget_if_allowed(
target_info: TargetMessageInfo, forget_message: ForgetMessage
target_info: TargetMessageInfo, forget_message: PendingForgetMessage
) -> None:
"""Forget a message.

Expand Down Expand Up @@ -210,10 +210,9 @@ async def get_target_message_info(target_hash: str) -> Optional[TargetMessageInf
return TargetMessageInfo.from_db_object(message_dict)


async def handle_forget_message(message: Dict, content: Dict):
async def handle_forget_message(forget_message: PendingForgetMessage, content: Dict):
# Parsing and validation
forget_message = ForgetMessage(**message, content=content)
logger.debug(f"Handling forget message {forget_message.item_hash}")
forget_message.content = ForgetContent(**content)

for target_hash in forget_message.content.hashes:
target_info = await get_target_message_info(target_hash)
Expand Down
5 changes: 3 additions & 2 deletions src/aleph/handlers/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@

from aleph.config import get_config
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.schemas.pending_messages import PendingStoreMessage
from aleph.services.ipfs.common import get_ipfs_api
from aleph.storage import get_hash_content
from aleph.utils import item_type_from_hash

LOGGER = logging.getLogger("HANDLERS.STORAGE")


async def handle_new_storage(message: Dict, content: Dict):
async def handle_new_storage(message: PendingStoreMessage, content: Dict):
config = get_config()
if not config.storage.store_files.value:
return True # Ignore

# TODO: ideally the content should be transformed earlier, but this requires more clean up
# (ex: no more in place modification of content, simplification of the flow)
try:
store_message = StoreMessage(**message, content=content)
store_message = StoreMessage(**message.dict(exclude={"content"}), content=content)
except ValidationError as e:
print(e)
return -1 # Invalid store message, discard
Expand Down
5 changes: 4 additions & 1 deletion src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from aleph.model.pending import PendingMessage
from aleph.services.p2p import singleton
from .job_utils import prepare_loop, process_job_results
from ..schemas.pending_messages import parse_message

LOGGER = getLogger("jobs.pending_messages")

Expand Down Expand Up @@ -48,9 +49,11 @@ async def handle_pending_message(
seen_ids: Dict[Tuple, int],
) -> List[DbBulkOperation]:

message = parse_message(pending["message"])

async with sem:
status, operations = await incoming(
pending["message"],
message=message,
chain_name=pending["source"].get("chain_name"),
tx_hash=pending["source"].get("tx_hash"),
height=pending["source"].get("height"),
Expand Down
14 changes: 7 additions & 7 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from aleph.logging import setup_logging
from aleph.model.db_bulk_operation import DbBulkOperation
from aleph.model.pending import PendingMessage, PendingTX
from aleph.network import check_message
from aleph.services.p2p import singleton
from .job_utils import prepare_loop, process_job_results
from ..schemas.pending_messages import parse_message

LOGGER = logging.getLogger("jobs.pending_txs")

Expand All @@ -37,24 +37,24 @@ async def handle_pending_tx(
pending_tx["content"], tx_context, seen_ids=seen_ids
)
if messages:
for i, message in enumerate(messages):
message["time"] = tx_context.time + (i / 1000) # force order
for i, message_dict in enumerate(messages):

try:
message = await check_message(
message, trusted=True
) # we don't check signatures yet.
# we don't check signatures yet.
message = parse_message(message_dict)
except InvalidMessageError as error:
LOGGER.warning(error)
continue

message.time = tx_context.time + (i / 1000) # force order

# we add it to the message queue... bad idea? should we process it asap?
db_operations.append(
DbBulkOperation(
collection=PendingMessage,
operation=InsertOne(
{
"message": message,
"message": message.dict(exclude={"content"}),
"source": dict(
chain_name=tx_context.chain_name,
tx_hash=tx_context.tx_hash,
Expand Down
Loading