Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/aleph/web/controllers/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,21 @@ def _validate_request_data(config: Config, request_data: Dict) -> None:

topic = request_data.get("topic")

# Currently, we only check validate messages
# Only accept publishing on the message topic.
message_topic = config.aleph.queue_topic.value
if topic == message_topic:
if topic != message_topic:
raise web.HTTPForbidden(reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}.")

data = request_data.get("data")
if not isinstance(data, str):
raise web.HTTPUnprocessableEntity(reason="'data': expected a serialized JSON string.")

try:
message_dict = json.loads(cast(str, request_data.get("data")))
_validate_message_dict(message_dict)
except ValueError:
raise web.HTTPUnprocessableEntity(reason="'data': must be deserializable as JSON.")

_validate_message_dict(message_dict)


async def _pub_on_p2p_topics(
Expand Down
65 changes: 65 additions & 0 deletions tests/api/test_p2p.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import copy
import json

import pytest
from configmanager import Config

P2P_PUB_URI = "/api/v0/p2p/pubsub/pub"

MESSAGE_DICT = {
"chain": "NULS2",
"item_hash": "4bbcfe7c4775492c2e602d322d68f558891468927b5e0d6cb89ff880134f323e",
"sender": "NULSd6Hgbhr42Dm5nEgf6foEUT5bgwHesZQJB",
"type": "STORE",
"channel": "MYALEPH",
"item_content": '{"address":"NULSd6Hgbhr42Dm5nEgf6foEUT5bgwHesZQJB","item_type":"ipfs","item_hash":"QmUDS8mpQmpPyptyUEedHxHMkxo7ueRRiAvrpgvJMpjXwW","time":1577325086.513}',
"item_type": "inline",
"signature": "G7/xlWoMjjOr1NBN4SiZ8USYYVM9Q3JHXChR9hPw9/YSItfAplshWysqYDkvmBZiwbICG0IVB3ilMPJ/ZVgPNlk=",
"time": 1608297193.717,
}


@pytest.mark.asyncio
async def test_pubsub_pub_valid_message(ccn_api_client, mock_config: Config):
message_topic = mock_config.aleph.queue_topic.value

response = await ccn_api_client.post(
P2P_PUB_URI, json={"topic": message_topic, "data": json.dumps(MESSAGE_DICT)}
)
assert response.status == 200, await response.text()
response_json = await response.json()

assert response_json["status"] == "success"


@pytest.mark.asyncio
async def test_pubsub_pub_errors(ccn_api_client, mock_config: Config):
# Invalid topic
serialized_message_dict = json.dumps(MESSAGE_DICT)
response = await ccn_api_client.post(
P2P_PUB_URI, json={"topic": "random-topic", "data": serialized_message_dict}
)
assert response.status == 403, await response.text()

message_topic = mock_config.aleph.queue_topic.value

# Do not serialize the message
response = await ccn_api_client.post(
P2P_PUB_URI, json={"topic": message_topic, "data": MESSAGE_DICT}
)
assert response.status == 422, await response.text()

# Invalid JSON
response = await ccn_api_client.post(
P2P_PUB_URI, json={"topic": message_topic, "data": serialized_message_dict[:-2]}
)
assert response.status == 422, await response.text()

# Invalid message
message_dict = copy.deepcopy(MESSAGE_DICT)
del message_dict["item_content"]

response = await ccn_api_client.post(
P2P_PUB_URI, json={"topic": message_topic, "data": json.dumps(message_dict)}
)
assert response.status == 422, await response.text()