Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 2 additions & 56 deletions src/aleph/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@
from typing import Coroutine, Dict, List
from urllib.parse import unquote

from aleph_message.models import ItemType
from p2pclient import Client as P2PClient

from aleph.exceptions import InvalidMessageError
from aleph.register_chain import VERIFIER_REGISTER
from aleph.schemas.pending_messages import parse_message
from aleph.services.ipfs.pubsub import incoming_channel as incoming_ipfs_channel
from aleph.utils import get_sha256
from aleph.utils import item_type_from_hash

LOGGER = logging.getLogger("NETWORK")

MAX_INLINE_SIZE = 200000 # 200kb max inline content size.

INCOMING_MESSAGE_AUTHORIZED_FIELDS = [
"item_hash",
Expand All @@ -29,8 +26,6 @@
"signature",
]

HOST = None


async def incoming_check(ipfs_pubsub_message: Dict) -> Dict:
"""Verifies an incoming message is sane, protecting from spam in the
Expand Down Expand Up @@ -71,57 +66,8 @@ async def check_message(

TODO: Implement it fully! Dangerous!
"""
if not isinstance(message, dict):
raise InvalidMessageError("Message must be a dict")

if not message:
raise InvalidMessageError("Message must not be empty")

for field in INCOMING_MESSAGE_AUTHORIZED_FIELDS:
if field not in message:
raise InvalidMessageError(
f"Missing field '{field}' in message {message['item_hash']}"
)

if not isinstance(message["item_hash"], str):
raise InvalidMessageError("Unknown hash %s" % message["item_hash"])

if not isinstance(message["chain"], str):
raise InvalidMessageError("Unknown chain %s" % message["chain"])

if message.get("channel", None) is not None:
if not isinstance(message.get("channel", None), str):
raise InvalidMessageError("Unknown channel %s" % message["channel"])

if not isinstance(message["sender"], str):
raise InvalidMessageError("Unknown sender %s" % message["sender"])

if not isinstance(message["signature"], str):
raise InvalidMessageError("Unknown signature %s" % message["signature"])

if message.get("item_content", None) is not None:
if len(message["item_content"]) > MAX_INLINE_SIZE:
raise InvalidMessageError("Message too long")
await asyncio.sleep(0)

if message.get("hash_type", "sha256") == "sha256": # leave the door open.
if not trusted:
item_hash = get_sha256(message["item_content"])
# item_hash = await loop.run_in_executor(None, get_sha256, message['item_content'])
# item_hash = sha256(message['item_content'].encode('utf-8')).hexdigest()

if message["item_hash"] != item_hash:
raise InvalidMessageError("Bad hash")
else:
raise InvalidMessageError("Unknown hash type %s" % message["hash_type"])

message["item_type"] = ItemType.inline.value

else:
try:
message["item_type"] = item_type_from_hash(message["item_hash"]).value
except ValueError as error:
LOGGER.warning(error)
_ = parse_message(message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of _ =

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me neither, but it's to show that I'm not ignoring the return value until this line goes away in #277.


if trusted:
# only in the case of a message programmatically built here
Expand Down
Empty file added src/aleph/schemas/__init__.py
Empty file.
176 changes: 176 additions & 0 deletions src/aleph/schemas/pending_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""
Schemas to process raw messages coming from users to the Aleph network.
These schemas are used to parse messages coming from the network into
more practical Python objects.

While extremely similar to the functionalities of the aleph message module
(of which we reuse some classes), this implementation differs in several
ways:
1. We do not expect the `content` key to be provided. At best, we get
an `item_content` field for inline type messages. Otherwise,
the content has to be fetched (and validated) later from the network.
2. We do not care for confirmations, as the message we are checking is
not even integrated yet.

TODO: this module should reasonably be part of aleph message, if only
to make the schemas available for the validation of client data
in aleph-client.
"""


import json
from hashlib import sha256
from typing import Any, Literal, Optional

from aleph_message.models import (
AggregateContent,
BaseContent,
BaseMessage,
Chain,
ForgetContent,
PostContent,
ProgramContent,
StoreContent,
)
from aleph_message.models import MessageType, ItemType
from pydantic import BaseModel, root_validator, validator

from aleph.exceptions import InvalidMessageError
from aleph.utils import item_type_from_hash

MAX_INLINE_SIZE = 200000 # 200kb max inline content size.


class BasePendingMessage(BaseModel):
"""
A raw Aleph message, as sent by users to the Aleph network.
"""

sender: str
chain: Chain
signature: str
type: MessageType
item_content: Optional[str]
item_type: ItemType
item_hash: str
time: float
channel: Optional[str] = None
content: Optional[BaseContent] = None

@root_validator(pre=True)
def load_content(cls, values):
"""
Preload inline content. We let the CCN populate this field later
on for ipfs and storage item types.
"""

item_type = values["item_type"]
item_content = values.get("item_content")
if item_type == ItemType.inline:
if item_content is None:
raise ValueError("Item content not specified for inline item type")

if len(item_content) > MAX_INLINE_SIZE:
raise ValueError("Message too long")
try:
values["content"] = json.loads(item_content)
except json.JSONDecodeError as e:
raise ValueError("Message content is not valid JSON data") from e
else:
if item_content is not None:
raise ValueError(f"{item_type} messages cannot define item_content")

return values

@root_validator()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a root validator instead of a simple validator ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if only because we're accessing other fields. The main problem is that check_item_hash uses item_type and check_item_type uses item_hash, so at least one of the functions has to be a root validator.

def check_item_type(cls, values):
"""
Checks that the item hash of the message matches the one inferred from the hash.
Only applicable to storage/ipfs item types.
"""
item_type = ItemType(values["item_type"])
if item_type == ItemType.inline:
return values

expected_item_type = item_type_from_hash(values["item_hash"])
if item_type != expected_item_type:
raise ValueError(
f"Expected {expected_item_type} based on hash but item type is {item_type}."
)
return values

@validator("item_hash")
def check_item_hash(cls, v, values):
"""
For inline item types, check that the item hash is equal to
the hash of the item content.
"""

item_type = values["item_type"]
if item_type == ItemType.inline:
item_content: str = values["item_content"]

computed_hash: str = sha256(item_content.encode()).hexdigest()
if v != computed_hash:
raise ValueError(
"'item_hash' do not match 'sha256(item_content)'"
f", expecting {computed_hash}"
)
elif item_type == ItemType.ipfs:
# TODO: CHeck that the hash looks like an IPFS multihash
pass
else:
assert item_type == ItemType.storage
return v
Comment on lines +102 to +124
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is identical to the one in aleph-message. Can we do something like this, or do the decorators get in the way ?

Suggested change
@validator("item_hash")
def check_item_hash(cls, v, values):
"""
For inline item types, check that the item hash is equal to
the hash of the item content.
"""
item_type = values["item_type"]
if item_type == ItemType.inline:
item_content: str = values["item_content"]
computed_hash: str = sha256(item_content.encode()).hexdigest()
if v != computed_hash:
raise ValueError(
"'item_hash' do not match 'sha256(item_content)'"
f", expecting {computed_hash}"
)
elif item_type == ItemType.ipfs:
# TODO: CHeck that the hash looks like an IPFS multihash
pass
else:
assert item_type == ItemType.storage
return v
check_item_hash = aleph_message.models.BaseMessage.check_item_hash
Suggested change
@validator("item_hash")
def check_item_hash(cls, v, values):
"""
For inline item types, check that the item hash is equal to
the hash of the item content.
"""
item_type = values["item_type"]
if item_type == ItemType.inline:
item_content: str = values["item_content"]
computed_hash: str = sha256(item_content.encode()).hexdigest()
if v != computed_hash:
raise ValueError(
"'item_hash' do not match 'sha256(item_content)'"
f", expecting {computed_hash}"
)
elif item_type == ItemType.ipfs:
# TODO: CHeck that the hash looks like an IPFS multihash
pass
else:
assert item_type == ItemType.storage
return v
@validator("item_hash")
def check_item_hash(cls, v, values):
return aleph_message.models.BaseMessage.check_item_hash(v, values)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried it, the validator does not get in the way but I removed the hash_type member of the message model, if only because we will probably never use it (better to use multihash/multicodec instead). If that's okay for you, I think I'll leave it like that for the moment, and we'll do a large update of aleph_message once we're fixed on the models.

I basically want to have a class tree like this:

  • PendingMessage: base class, = the data sent by aleph-client or similar
  • DbMessage: inherits PendingMessage (or a common base class), = the schema of the data stored in MongoDB.
  • ApiMessage: inherits DbMessage (or a common base class), = the schema of the API response. Similar to the current aleph_message Message.



class PendingAggregateMessage(BasePendingMessage):
type: Literal[MessageType.aggregate] # type: ignore
content: Optional[AggregateContent] = None


class PendingForgetMessage(BasePendingMessage):
type: Literal[MessageType.forget] # type: ignore
content: Optional[ForgetContent] = None


class PendingPostMessage(BasePendingMessage):
type: Literal[MessageType.post] # type: ignore
content: Optional[PostContent] = None


class PendingProgramMessage(BasePendingMessage):
type: Literal[MessageType.program] # type: ignore
content: Optional[ProgramContent] = None


class PendingStoreMessage(BasePendingMessage):
type: Literal[MessageType.store] # type: ignore
content: Optional[StoreContent] = None


MESSAGE_TYPE_TO_CLASS = {
MessageType.aggregate: PendingAggregateMessage,
MessageType.forget: PendingForgetMessage,
MessageType.post: PendingPostMessage,
MessageType.program: PendingProgramMessage,
MessageType.store: PendingStoreMessage,
}


def parse_message(message_dict: Any) -> BasePendingMessage:
if not isinstance(message_dict, dict):
raise InvalidMessageError("Message is not a dictionary")

raw_message_type = message_dict.get("type")
try:
message_type = MessageType(raw_message_type)
except ValueError as e:
raise InvalidMessageError(f"Invalid message_type: '{raw_message_type}'") from e

msg_cls = MESSAGE_TYPE_TO_CLASS[message_type]

try:
return msg_cls(**message_dict)
except ValueError as e:
raise InvalidMessageError("Could not parse message") from e
Loading