Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/aleph/handlers/forget.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import logging
from typing import Dict, Optional
from typing import Dict

from aioipfs.api import RepoAPI
from aioipfs.exceptions import NotPinnedError
from aleph_message.models import ForgetMessage, MessageType
from aleph_message.models import ForgetMessage, ItemType, MessageType, StoreContent

from aleph.model.filepin import PermanentPin
from aleph_message.models import StoreContent
from aleph.model.hashes import delete_value
from aleph.model.messages import Message
from aleph.services.ipfs.common import get_ipfs_api
from aleph.storage import get_message_content
from aleph.types import ItemType
from aleph.utils import item_type_from_hash

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,13 +46,13 @@ async def garbage_collect(storage_hash: str, storage_type: ItemType):
return

# Unpin the file from IPFS or remove it from local storage
storage_detected: ItemType = ItemType.from_hash(storage_hash)
storage_detected: ItemType = item_type_from_hash(storage_hash)

if storage_type != storage_detected:
raise ValueError(f"Inconsistent ItemType {storage_type} != {storage_detected} "
f"for hash '{storage_hash}'")

if storage_type == ItemType.IPFS:
if storage_type == ItemType.ipfs:
api = await get_ipfs_api(timeout=5)
logger.debug(f"Removing from IPFS: {storage_hash}")
try:
Expand All @@ -67,7 +66,7 @@ async def garbage_collect(storage_hash: str, storage_type: ItemType):
except NotPinnedError:
logger.debug("File not pinned")
logger.debug(f"Removed from IPFS: {storage_hash}")
elif storage_type == ItemType.Storage:
elif storage_type == ItemType.storage:
logger.debug(f"Removing from Gridfs: {storage_hash}")
await delete_value(storage_hash)
logger.debug(f"Removed from Gridfs: {storage_hash}")
Expand Down
11 changes: 6 additions & 5 deletions src/aleph/handlers/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

import aioipfs
from aioipfs import InvalidCIDError
from aleph_message.models import ItemType, StoreMessage
from pydantic import ValidationError

from aleph.config import get_config
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.services.ipfs.common import get_ipfs_api
from aleph.storage import get_hash_content
from aleph.types import ItemType
from aleph_message.models import StoreMessage
from pydantic import ValidationError
from aleph.utils import item_type_from_hash

LOGGER = logging.getLogger("HANDLERS.STORAGE")

Expand Down Expand Up @@ -52,8 +53,8 @@ async def handle_new_storage(message: Dict, content: Dict):
do_standard_lookup = True
size = 0

if engine == ItemType.IPFS and ipfs_enabled:
if ItemType.from_hash(item_hash) != ItemType.IPFS:
if engine == ItemType.ipfs and ipfs_enabled:
if item_type_from_hash(item_hash) != ItemType.ipfs:
LOGGER.warning("Invalid IPFS hash: '%s'", item_hash)
raise UnknownHashError(f"Invalid IPFS hash: '{item_hash}'")

Expand Down
5 changes: 2 additions & 3 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import List, Dict, Tuple

import sentry_sdk
from aleph_message.models import MessageType
from aleph_message.models import ItemType, MessageType
from pymongo import DeleteOne, DeleteMany, ASCENDING
from setproctitle import setproctitle

Expand All @@ -17,7 +17,6 @@
from aleph.model.db_bulk_operation import DbBulkOperation
from aleph.model.pending import PendingMessage
from aleph.services.p2p import singleton
from aleph.types import ItemType
from .job_utils import prepare_loop, gather_and_perform_db_operations

LOGGER = getLogger("jobs.pending_messages")
Expand Down Expand Up @@ -94,7 +93,7 @@ async def process_pending_messages(shared_stats: Dict):
)

if (
pending["message"]["item_type"] == ItemType.IPFS
pending["message"]["item_type"] == ItemType.ipfs
or pending["message"]["type"] == MessageType.store
):
i += 15
Expand Down
11 changes: 6 additions & 5 deletions src/aleph/network.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import asyncio
import json
import logging
from typing import Coroutine, Dict, List, Optional
from typing import Coroutine, Dict, List
from urllib.parse import unquote

from aleph_message.models import ItemType
from p2pclient import Client as P2PClient

from aleph.exceptions import InvalidMessageError
from aleph.register_chain import VERIFIER_REGISTER
from aleph.services.ipfs.pubsub import incoming_channel as incoming_ipfs_channel
from aleph.types import ItemType
from aleph.exceptions import InvalidMessageError
from aleph.utils import get_sha256
from aleph.utils import item_type_from_hash

LOGGER = logging.getLogger("NETWORK")

Expand Down Expand Up @@ -116,11 +117,11 @@ async def check_message(
else:
raise InvalidMessageError("Unknown hash type %s" % message["hash_type"])

message["item_type"] = ItemType.Inline.value
message["item_type"] = ItemType.inline.value

else:
try:
message["item_type"] = ItemType.from_hash(message["item_hash"]).value
message["item_type"] = item_type_from_hash(message["item_hash"]).value
except ValueError as error:
LOGGER.warning(error)

Expand Down
37 changes: 19 additions & 18 deletions src/aleph/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@
from hashlib import sha256
from typing import Any, AnyStr, Dict, IO, Optional

from aleph_message.models import ItemType

from aleph.config import get_config
from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable
from aleph.services.filestore import get_value, set_value
from aleph.services.ipfs.common import get_cid_version
from aleph.services.ipfs.storage import add_bytes as add_ipfs_bytes
from aleph.services.ipfs.storage import add_file as ipfs_add_file
from aleph.services.ipfs.storage import get_ipfs_content
from aleph.services.ipfs.storage import pin_add as ipfs_pin_add
from aleph.services.p2p.http import request_hash as p2p_http_request_hash
from aleph.services.p2p.singleton import get_streamer
from aleph.types import ItemType
from aleph.utils import run_in_executor, get_sha256
from aleph.services.ipfs.common import get_cid_version
from aleph.config import get_config
from aleph.utils import get_sha256, run_in_executor

LOGGER = logging.getLogger("STORAGE")

Expand Down Expand Up @@ -66,12 +67,12 @@ async def json_async_loads(s: AnyStr):


async def get_message_content(message: Dict) -> MessageContent:
item_type: str = message.get("item_type", ItemType.IPFS)
item_type: str = message.get("item_type", ItemType.ipfs)
item_hash = message["item_hash"]

if item_type in (ItemType.IPFS, ItemType.Storage):
if item_type in (ItemType.ipfs, ItemType.storage):
return await get_json(item_hash, engine=ItemType(item_type))
elif item_type == ItemType.Inline:
elif item_type == ItemType.inline:
if "item_content" not in message:
error_msg = f"No item_content in message {message.get('item_hash')}"
LOGGER.warning(error_msg)
Expand Down Expand Up @@ -149,13 +150,13 @@ async def verify_content_hash(
config = get_config()
ipfs_enabled = config.ipfs.enabled.value

if engine == ItemType.IPFS and ipfs_enabled:
if engine == ItemType.ipfs and ipfs_enabled:
try:
cid_version = get_cid_version(expected_hash)
except ValueError as e:
raise InvalidContent(e) from e
compute_hash_task = compute_content_hash_ipfs(content, cid_version)
elif engine == ItemType.Storage:
elif engine == ItemType.storage:
compute_hash_task = compute_content_hash_sha256(content)
else:
raise ValueError(f"Invalid storage engine: '{engine}'.")
Expand All @@ -175,7 +176,7 @@ async def verify_content_hash(

async def get_hash_content(
content_hash: str,
engine: ItemType = ItemType.IPFS,
engine: ItemType = ItemType.ipfs,
timeout: int = 2,
tries: int = 1,
use_network: bool = True,
Expand All @@ -198,7 +199,7 @@ async def get_hash_content(
source = ContentSource.P2P

if content is None:
if ipfs_enabled and engine == ItemType.IPFS and use_ipfs:
if ipfs_enabled and engine == ItemType.ipfs and use_ipfs:
content = await get_ipfs_content(content_hash, timeout=timeout, tries=tries)
source = ContentSource.IPFS

Expand All @@ -218,7 +219,7 @@ async def get_hash_content(


async def get_json(
content_hash: str, engine=ItemType.IPFS, timeout: int = 2, tries: int = 1
content_hash: str, engine=ItemType.ipfs, timeout: int = 2, tries: int = 1
) -> MessageContent:
content = await get_hash_content(
content_hash, engine=engine, timeout=timeout, tries=tries
Expand All @@ -242,13 +243,13 @@ async def pin_hash(chash: str, timeout: int = 2, tries: int = 1):
return await ipfs_pin_add(chash, timeout=timeout, tries=tries)


async def add_json(value: Any, engine: ItemType = ItemType.IPFS) -> str:
async def add_json(value: Any, engine: ItemType = ItemType.ipfs) -> str:
# TODO: determine which storage engine to use
content = await run_in_executor(None, json.dumps, value)
content = content.encode("utf-8")
if engine == ItemType.IPFS:
if engine == ItemType.ipfs:
chash = await add_ipfs_bytes(content)
elif engine == ItemType.Storage:
elif engine == ItemType.storage:
if isinstance(content, str):
content = content.encode("utf-8")
chash = sha256(content).hexdigest()
Expand All @@ -259,15 +260,15 @@ async def add_json(value: Any, engine: ItemType = ItemType.IPFS) -> str:
return chash


async def add_file(fileobject: IO, engine: ItemType = ItemType.IPFS) -> str:
async def add_file(fileobject: IO, engine: ItemType = ItemType.ipfs) -> str:

if engine == ItemType.IPFS:
if engine == ItemType.ipfs:
output = await ipfs_add_file(fileobject)
file_hash = output["Hash"]
fileobject.seek(0)
file_content = fileobject.read()

elif engine == ItemType.Storage:
elif engine == ItemType.storage:
file_content = fileobject.read()
file_hash = sha256(file_content).hexdigest()

Expand Down
23 changes: 1 addition & 22 deletions src/aleph/types.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,6 @@
from __future__ import annotations
from enum import Enum

from aleph.exceptions import UnknownHashError


class ItemType(str, Enum):
"""Item storage options"""
Inline = "inline"
IPFS = "ipfs"
Storage = "storage"

@classmethod
def from_hash(cls, hash: str) -> ItemType:
assert isinstance(hash, str)
# https://docs.ipfs.io/concepts/content-addressing/#identifier-formats
if hash.startswith("Qm") and 44 <= len(hash) <= 46: # CIDv0
return cls.IPFS
elif hash.startswith("bafy") and len(hash) == 59: # CIDv1
return cls.IPFS
elif len(hash) == 64:
return cls.Storage
else:
raise UnknownHashError(f"Unknown hash {len(hash)} {hash}")
from enum import Enum


class Protocol(str, Enum):
Expand Down
15 changes: 15 additions & 0 deletions src/aleph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from hashlib import sha256
from typing import Union

from aleph_message.models import ItemType

from aleph.exceptions import UnknownHashError
from aleph.settings import settings


Expand All @@ -13,6 +16,18 @@ async def run_in_executor(executor, func, *args):
return func(*args)


def item_type_from_hash(item_hash: str) -> ItemType:
# https://docs.ipfs.io/concepts/content-addressing/#identifier-formats
if item_hash.startswith("Qm") and 44 <= len(item_hash) <= 46: # CIDv0
return ItemType.ipfs
elif item_hash.startswith("bafy") and len(item_hash) == 59: # CIDv1
return ItemType.ipfs
elif len(item_hash) == 64:
return ItemType.storage
else:
raise UnknownHashError(f"Unknown hash {len(item_hash)} {item_hash}")


def get_sha256(content: Union[str, bytes]) -> str:
if isinstance(content, str):
content = content.encode("utf-8")
Expand Down
14 changes: 7 additions & 7 deletions src/aleph/web/controllers/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.handlers.forget import count_file_references
from aleph.storage import add_json, get_hash_content, add_file
from aleph.types import ItemType
from aleph.utils import run_in_executor
from aleph_message.models import ItemType
from aleph.utils import run_in_executor, item_type_from_hash

logger = logging.getLogger(__name__)

Expand All @@ -16,23 +16,23 @@ async def add_ipfs_json_controller(request):
"""Forward the json content to IPFS server and return an hash"""
data = await request.json()

output = {"status": "success", "hash": await add_json(data, engine=ItemType.IPFS)}
output = {"status": "success", "hash": await add_json(data, engine=ItemType.ipfs)}
return web.json_response(output)


async def add_storage_json_controller(request):
"""Forward the json content to IPFS server and return an hash"""
data = await request.json()

output = {"status": "success", "hash": await add_json(data, engine=ItemType.Storage)}
output = {"status": "success", "hash": await add_json(data, engine=ItemType.storage)}
return web.json_response(output)


async def storage_add_file(request):
# No need to pin it here anymore.
# TODO: find a way to specify linked ipfs hashes in posts/aggr.
post = await request.post()
file_hash = await add_file(post["file"].file, engine=ItemType.Storage)
file_hash = await add_file(post["file"].file, engine=ItemType.storage)

output = {"status": "success", "hash": file_hash}
return web.json_response(output)
Expand All @@ -47,7 +47,7 @@ async def get_hash(request):
if item_hash is None:
return web.HTTPBadRequest(text="No hash provided")
try:
engine = ItemType.from_hash(item_hash)
engine = item_type_from_hash(item_hash)
except UnknownHashError as e:
logger.warning(e.args[0])
return web.HTTPBadRequest(text="Invalid hash provided")
Expand Down Expand Up @@ -83,7 +83,7 @@ async def get_raw_hash(request):
raise web.HTTPBadRequest(text="No hash provided")

try:
engine = ItemType.from_hash(item_hash)
engine = item_type_from_hash(item_hash)
except UnknownHashError:
raise web.HTTPBadRequest(text="Invalid hash")

Expand Down
6 changes: 3 additions & 3 deletions tests/storage/test_get_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable
from aleph.storage import ContentSource, get_hash_content, get_json, get_message_content
from aleph.types import ItemType
from aleph_message.models import ItemType


@pytest.mark.asyncio
Expand Down Expand Up @@ -139,7 +139,7 @@ async def test_get_inline_content(mock_config):
]
json_bytes = json.dumps(json_content).encode("utf-8")
message = {
"item_type": ItemType.Inline.value,
"item_type": ItemType.inline.value,
"item_hash": content_hash,
"item_content": json_bytes,
}
Expand Down Expand Up @@ -185,7 +185,7 @@ async def test_get_stored_message_content(mocker, mock_config):
mocker.patch("aleph.storage.get_value", return_value=json_bytes)

message = {
"item_type": ItemType.IPFS.value,
"item_type": ItemType.ipfs.value,
"item_hash": content_hash,
}

Expand Down