diff --git a/src/aleph/handlers/forget.py b/src/aleph/handlers/forget.py index 12264835f..34be579cd 100644 --- a/src/aleph/handlers/forget.py +++ b/src/aleph/handlers/forget.py @@ -1,17 +1,16 @@ import logging -from typing import Dict, Optional +from typing import Dict from aioipfs.api import RepoAPI from aioipfs.exceptions import NotPinnedError -from aleph_message.models import ForgetMessage, MessageType +from aleph_message.models import ForgetMessage, ItemType, MessageType, StoreContent from aleph.model.filepin import PermanentPin -from aleph_message.models import StoreContent from aleph.model.hashes import delete_value from aleph.model.messages import Message from aleph.services.ipfs.common import get_ipfs_api from aleph.storage import get_message_content -from aleph.types import ItemType +from aleph.utils import item_type_from_hash logger = logging.getLogger(__name__) @@ -47,13 +46,13 @@ async def garbage_collect(storage_hash: str, storage_type: ItemType): return # Unpin the file from IPFS or remove it from local storage - storage_detected: ItemType = ItemType.from_hash(storage_hash) + storage_detected: ItemType = item_type_from_hash(storage_hash) if storage_type != storage_detected: raise ValueError(f"Inconsistent ItemType {storage_type} != {storage_detected} " f"for hash '{storage_hash}'") - if storage_type == ItemType.IPFS: + if storage_type == ItemType.ipfs: api = await get_ipfs_api(timeout=5) logger.debug(f"Removing from IPFS: {storage_hash}") try: @@ -67,7 +66,7 @@ async def garbage_collect(storage_hash: str, storage_type: ItemType): except NotPinnedError: logger.debug("File not pinned") logger.debug(f"Removed from IPFS: {storage_hash}") - elif storage_type == ItemType.Storage: + elif storage_type == ItemType.storage: logger.debug(f"Removing from Gridfs: {storage_hash}") await delete_value(storage_hash) logger.debug(f"Removed from Gridfs: {storage_hash}") diff --git a/src/aleph/handlers/storage.py b/src/aleph/handlers/storage.py index b6fe55d2f..89f938aa7 100644 --- a/src/aleph/handlers/storage.py +++ b/src/aleph/handlers/storage.py @@ -14,13 +14,14 @@ import aioipfs from aioipfs import InvalidCIDError +from aleph_message.models import ItemType, StoreMessage +from pydantic import ValidationError + from aleph.config import get_config from aleph.exceptions import AlephStorageException, UnknownHashError from aleph.services.ipfs.common import get_ipfs_api from aleph.storage import get_hash_content -from aleph.types import ItemType -from aleph_message.models import StoreMessage -from pydantic import ValidationError +from aleph.utils import item_type_from_hash LOGGER = logging.getLogger("HANDLERS.STORAGE") @@ -52,8 +53,8 @@ async def handle_new_storage(message: Dict, content: Dict): do_standard_lookup = True size = 0 - if engine == ItemType.IPFS and ipfs_enabled: - if ItemType.from_hash(item_hash) != ItemType.IPFS: + if engine == ItemType.ipfs and ipfs_enabled: + if item_type_from_hash(item_hash) != ItemType.ipfs: LOGGER.warning("Invalid IPFS hash: '%s'", item_hash) raise UnknownHashError(f"Invalid IPFS hash: '{item_hash}'") diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 3314b5c5f..8ce5f64f6 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -8,7 +8,7 @@ from typing import List, Dict, Tuple import sentry_sdk -from aleph_message.models import MessageType +from aleph_message.models import ItemType, MessageType from pymongo import DeleteOne, DeleteMany, ASCENDING from setproctitle import setproctitle @@ -17,7 +17,6 @@ from aleph.model.db_bulk_operation import DbBulkOperation from aleph.model.pending import PendingMessage from aleph.services.p2p import singleton -from aleph.types import ItemType from .job_utils import prepare_loop, gather_and_perform_db_operations LOGGER = getLogger("jobs.pending_messages") @@ -94,7 +93,7 @@ async def process_pending_messages(shared_stats: Dict): ) if ( - pending["message"]["item_type"] == ItemType.IPFS + pending["message"]["item_type"] == ItemType.ipfs or pending["message"]["type"] == MessageType.store ): i += 15 diff --git a/src/aleph/network.py b/src/aleph/network.py index 89729def9..fce2b79a9 100644 --- a/src/aleph/network.py +++ b/src/aleph/network.py @@ -1,16 +1,17 @@ import asyncio import json import logging -from typing import Coroutine, Dict, List, Optional +from typing import Coroutine, Dict, List from urllib.parse import unquote +from aleph_message.models import ItemType from p2pclient import Client as P2PClient +from aleph.exceptions import InvalidMessageError from aleph.register_chain import VERIFIER_REGISTER from aleph.services.ipfs.pubsub import incoming_channel as incoming_ipfs_channel -from aleph.types import ItemType -from aleph.exceptions import InvalidMessageError from aleph.utils import get_sha256 +from aleph.utils import item_type_from_hash LOGGER = logging.getLogger("NETWORK") @@ -116,11 +117,11 @@ async def check_message( else: raise InvalidMessageError("Unknown hash type %s" % message["hash_type"]) - message["item_type"] = ItemType.Inline.value + message["item_type"] = ItemType.inline.value else: try: - message["item_type"] = ItemType.from_hash(message["item_hash"]).value + message["item_type"] = item_type_from_hash(message["item_hash"]).value except ValueError as error: LOGGER.warning(error) diff --git a/src/aleph/storage.py b/src/aleph/storage.py index 4340c8ba2..fa096c8fc 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -9,18 +9,19 @@ from hashlib import sha256 from typing import Any, AnyStr, Dict, IO, Optional +from aleph_message.models import ItemType + +from aleph.config import get_config from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable from aleph.services.filestore import get_value, set_value +from aleph.services.ipfs.common import get_cid_version from aleph.services.ipfs.storage import add_bytes as add_ipfs_bytes from aleph.services.ipfs.storage import add_file as ipfs_add_file from aleph.services.ipfs.storage import get_ipfs_content from aleph.services.ipfs.storage import pin_add as ipfs_pin_add from aleph.services.p2p.http import request_hash as p2p_http_request_hash from aleph.services.p2p.singleton import get_streamer -from aleph.types import ItemType -from aleph.utils import run_in_executor, get_sha256 -from aleph.services.ipfs.common import get_cid_version -from aleph.config import get_config +from aleph.utils import get_sha256, run_in_executor LOGGER = logging.getLogger("STORAGE") @@ -66,12 +67,12 @@ async def json_async_loads(s: AnyStr): async def get_message_content(message: Dict) -> MessageContent: - item_type: str = message.get("item_type", ItemType.IPFS) + item_type: str = message.get("item_type", ItemType.ipfs) item_hash = message["item_hash"] - if item_type in (ItemType.IPFS, ItemType.Storage): + if item_type in (ItemType.ipfs, ItemType.storage): return await get_json(item_hash, engine=ItemType(item_type)) - elif item_type == ItemType.Inline: + elif item_type == ItemType.inline: if "item_content" not in message: error_msg = f"No item_content in message {message.get('item_hash')}" LOGGER.warning(error_msg) @@ -149,13 +150,13 @@ async def verify_content_hash( config = get_config() ipfs_enabled = config.ipfs.enabled.value - if engine == ItemType.IPFS and ipfs_enabled: + if engine == ItemType.ipfs and ipfs_enabled: try: cid_version = get_cid_version(expected_hash) except ValueError as e: raise InvalidContent(e) from e compute_hash_task = compute_content_hash_ipfs(content, cid_version) - elif engine == ItemType.Storage: + elif engine == ItemType.storage: compute_hash_task = compute_content_hash_sha256(content) else: raise ValueError(f"Invalid storage engine: '{engine}'.") @@ -175,7 +176,7 @@ async def verify_content_hash( async def get_hash_content( content_hash: str, - engine: ItemType = ItemType.IPFS, + engine: ItemType = ItemType.ipfs, timeout: int = 2, tries: int = 1, use_network: bool = True, @@ -198,7 +199,7 @@ async def get_hash_content( source = ContentSource.P2P if content is None: - if ipfs_enabled and engine == ItemType.IPFS and use_ipfs: + if ipfs_enabled and engine == ItemType.ipfs and use_ipfs: content = await get_ipfs_content(content_hash, timeout=timeout, tries=tries) source = ContentSource.IPFS @@ -218,7 +219,7 @@ async def get_hash_content( async def get_json( - content_hash: str, engine=ItemType.IPFS, timeout: int = 2, tries: int = 1 + content_hash: str, engine=ItemType.ipfs, timeout: int = 2, tries: int = 1 ) -> MessageContent: content = await get_hash_content( content_hash, engine=engine, timeout=timeout, tries=tries @@ -242,13 +243,13 @@ async def pin_hash(chash: str, timeout: int = 2, tries: int = 1): return await ipfs_pin_add(chash, timeout=timeout, tries=tries) -async def add_json(value: Any, engine: ItemType = ItemType.IPFS) -> str: +async def add_json(value: Any, engine: ItemType = ItemType.ipfs) -> str: # TODO: determine which storage engine to use content = await run_in_executor(None, json.dumps, value) content = content.encode("utf-8") - if engine == ItemType.IPFS: + if engine == ItemType.ipfs: chash = await add_ipfs_bytes(content) - elif engine == ItemType.Storage: + elif engine == ItemType.storage: if isinstance(content, str): content = content.encode("utf-8") chash = sha256(content).hexdigest() @@ -259,15 +260,15 @@ async def add_json(value: Any, engine: ItemType = ItemType.IPFS) -> str: return chash -async def add_file(fileobject: IO, engine: ItemType = ItemType.IPFS) -> str: +async def add_file(fileobject: IO, engine: ItemType = ItemType.ipfs) -> str: - if engine == ItemType.IPFS: + if engine == ItemType.ipfs: output = await ipfs_add_file(fileobject) file_hash = output["Hash"] fileobject.seek(0) file_content = fileobject.read() - elif engine == ItemType.Storage: + elif engine == ItemType.storage: file_content = fileobject.read() file_hash = sha256(file_content).hexdigest() diff --git a/src/aleph/types.py b/src/aleph/types.py index 898bc193f..faad5d58e 100644 --- a/src/aleph/types.py +++ b/src/aleph/types.py @@ -1,27 +1,6 @@ from __future__ import annotations -from enum import Enum - -from aleph.exceptions import UnknownHashError - -class ItemType(str, Enum): - """Item storage options""" - Inline = "inline" - IPFS = "ipfs" - Storage = "storage" - - @classmethod - def from_hash(cls, hash: str) -> ItemType: - assert isinstance(hash, str) - # https://docs.ipfs.io/concepts/content-addressing/#identifier-formats - if hash.startswith("Qm") and 44 <= len(hash) <= 46: # CIDv0 - return cls.IPFS - elif hash.startswith("bafy") and len(hash) == 59: # CIDv1 - return cls.IPFS - elif len(hash) == 64: - return cls.Storage - else: - raise UnknownHashError(f"Unknown hash {len(hash)} {hash}") +from enum import Enum class Protocol(str, Enum): diff --git a/src/aleph/utils.py b/src/aleph/utils.py index 3afe1acfb..e5eb4c956 100644 --- a/src/aleph/utils.py +++ b/src/aleph/utils.py @@ -2,6 +2,9 @@ from hashlib import sha256 from typing import Union +from aleph_message.models import ItemType + +from aleph.exceptions import UnknownHashError from aleph.settings import settings @@ -13,6 +16,18 @@ async def run_in_executor(executor, func, *args): return func(*args) +def item_type_from_hash(item_hash: str) -> ItemType: + # https://docs.ipfs.io/concepts/content-addressing/#identifier-formats + if item_hash.startswith("Qm") and 44 <= len(item_hash) <= 46: # CIDv0 + return ItemType.ipfs + elif item_hash.startswith("bafy") and len(item_hash) == 59: # CIDv1 + return ItemType.ipfs + elif len(item_hash) == 64: + return ItemType.storage + else: + raise UnknownHashError(f"Unknown hash {len(item_hash)} {item_hash}") + + def get_sha256(content: Union[str, bytes]) -> str: if isinstance(content, str): content = content.encode("utf-8") diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 59536fb65..4d1b23e18 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -6,8 +6,8 @@ from aleph.exceptions import AlephStorageException, UnknownHashError from aleph.handlers.forget import count_file_references from aleph.storage import add_json, get_hash_content, add_file -from aleph.types import ItemType -from aleph.utils import run_in_executor +from aleph_message.models import ItemType +from aleph.utils import run_in_executor, item_type_from_hash logger = logging.getLogger(__name__) @@ -16,7 +16,7 @@ async def add_ipfs_json_controller(request): """Forward the json content to IPFS server and return an hash""" data = await request.json() - output = {"status": "success", "hash": await add_json(data, engine=ItemType.IPFS)} + output = {"status": "success", "hash": await add_json(data, engine=ItemType.ipfs)} return web.json_response(output) @@ -24,7 +24,7 @@ async def add_storage_json_controller(request): """Forward the json content to IPFS server and return an hash""" data = await request.json() - output = {"status": "success", "hash": await add_json(data, engine=ItemType.Storage)} + output = {"status": "success", "hash": await add_json(data, engine=ItemType.storage)} return web.json_response(output) @@ -32,7 +32,7 @@ async def storage_add_file(request): # No need to pin it here anymore. # TODO: find a way to specify linked ipfs hashes in posts/aggr. post = await request.post() - file_hash = await add_file(post["file"].file, engine=ItemType.Storage) + file_hash = await add_file(post["file"].file, engine=ItemType.storage) output = {"status": "success", "hash": file_hash} return web.json_response(output) @@ -47,7 +47,7 @@ async def get_hash(request): if item_hash is None: return web.HTTPBadRequest(text="No hash provided") try: - engine = ItemType.from_hash(item_hash) + engine = item_type_from_hash(item_hash) except UnknownHashError as e: logger.warning(e.args[0]) return web.HTTPBadRequest(text="Invalid hash provided") @@ -83,7 +83,7 @@ async def get_raw_hash(request): raise web.HTTPBadRequest(text="No hash provided") try: - engine = ItemType.from_hash(item_hash) + engine = item_type_from_hash(item_hash) except UnknownHashError: raise web.HTTPBadRequest(text="Invalid hash") diff --git a/tests/storage/test_get_content.py b/tests/storage/test_get_content.py index 1a78a9661..18678cec4 100644 --- a/tests/storage/test_get_content.py +++ b/tests/storage/test_get_content.py @@ -4,7 +4,7 @@ from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable from aleph.storage import ContentSource, get_hash_content, get_json, get_message_content -from aleph.types import ItemType +from aleph_message.models import ItemType @pytest.mark.asyncio @@ -139,7 +139,7 @@ async def test_get_inline_content(mock_config): ] json_bytes = json.dumps(json_content).encode("utf-8") message = { - "item_type": ItemType.Inline.value, + "item_type": ItemType.inline.value, "item_hash": content_hash, "item_content": json_bytes, } @@ -185,7 +185,7 @@ async def test_get_stored_message_content(mocker, mock_config): mocker.patch("aleph.storage.get_value", return_value=json_bytes) message = { - "item_type": ItemType.IPFS.value, + "item_type": ItemType.ipfs.value, "item_hash": content_hash, }