-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: authenticated file upload #463
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still need to review the tests. There are a lot of things to change in the implementation as it is.
from aleph.web.controllers.p2p import ( | ||
_mq_read_one_message, | ||
_processing_status_to_http_status, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Python, functions starting with an underscore should not be imported. What you need to do is move these functions to a common module (ex: controllers/utils.py
) and rename them to remove the underscore (=making them public).
src/aleph/web/controllers/storage.py
Outdated
|
||
except Exception as e: | ||
verified = False | ||
return verified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very clearly duplicated from the code in ChainService
. The right thing to do here is to add an instance of ChainService
to each API in api_entrypoint.py
:
- Create an
APP_CHAIN_SERVICE
constant and a corresponding getter inapp_state_getters.py
- Instantiate a chain service in
api_entrypoint.py
- Add it to the app state (
app[APP_CHAIN_SERVICE] = chain_service
) - Use the getter in this file to verify the signature.
chain_service = get_chain_service_from_request(request)
await chain_service.verify_signature(message)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment it is half implemented (must be added to ccn_api_client to be able to use it)
src/aleph/web/controllers/storage.py
Outdated
elif actual_item_hash != c_item_hash: | ||
output = {"status": "Unprocessable Content"} | ||
return web.json_response(output, status=422) | ||
elif len(content) > 25_000 and not message: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define constants like this 25_000 at the top of the file. Also, out of consistency now that we decided to use MiB everywhere, define it as 25 * MiB
.
src/aleph/web/controllers/storage.py
Outdated
post_data: MultiDictProxy[Union[str, bytes, FileField]] | ||
) -> Tuple[dict, int]: | ||
message_bytearray = post_data.get("message", b"") | ||
value = post_data.get("size") or 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay so I actually expected you to put the message and metadata in the body of the request and not in the multipart form. Why did you put it in the form?
Also, use better variable names than value
. You're retrieving a size here, so size
or file_size
are definitely clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we probably need a maximum size somewhere. Put 1 GiB at the moment and return a 413 (Payload too large) if the file is larger.
src/aleph/web/controllers/storage.py
Outdated
if isinstance(message_bytearray, bytearray): | ||
message_string = message_bytearray.decode("utf-8") | ||
message_dict = json.loads(message_string) | ||
message_dict["time"] = float(message_dict["time"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of validation is missing. You need to use the BasePendingMessage
to parse the message dictionary, catch errors and raise a 422 if the message is not correct. See how it's done in POST /messages
(_validate_message_dict
). You can directly use PendingStoreMessage.parse_obj(message_dict)
as we only want to process STORE messages and ignore the rest.
src/aleph/web/controllers/storage.py
Outdated
content = file_io.read(size) | ||
item_content = json.loads(message["item_content"]) | ||
actual_item_hash = sha256(content).hexdigest() | ||
c_item_hash = item_content["item_hash"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why you prefixed the variable with c_
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c == client on my head.
It's the hash give by the "client"
src/aleph/web/controllers/storage.py
Outdated
routing_key=f"*.{pending_message_db.item_hash}", | ||
) | ||
|
||
is_valid_message = await verify_and_handle_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Poor variable name. is_valid_name
implies a boolean value (True if the message is valid, false otherwise). To improve this, you can either define your custom exception types or use aiohttp exceptions in verify_and_handle_request
.
Example:
# In verify_and_handle_request
if balance < required_balance:
raise web.HTTPPaymentRequired()
# Here
try:
verify_and_handle_request(...)
except web.HTTPClientError as e:
return web.json_response({"status": "rejected"}, status=e.status_code)
src/aleph/web/controllers/storage.py
Outdated
message_dict=message, reception_time=dt.datetime.now(), fetched=True | ||
) | ||
mq_channel = await get_mq_channel_from_request(request, logger=logger) | ||
mq_queue = await mq_make_aleph_message_topic_queue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MQ stuff only needs to be done if a) there is a message attached to the request and b) the user specified sync=True
in the request. The sync
parameter is nowhere to be found in your PR, add it.
src/aleph/web/controllers/storage.py
Outdated
async def storage_add_file(request: web.Request): | ||
post = await request.post() | ||
if post.get("message", b"") is not None and post.get("size") is not None: | ||
return await storage_add_file_with_message(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This split is not ideal because you duplicate the part where you read the file and add it in the DB. Also, there is a major bug in your new function because you never actually write the file to disk nor add the corresponding entry in the files
table. I'd rather have one function where you call other functions, example (fleshed out, just to give an example):
if message and size:
pending_message = await verify_message(message)
await verify_balance(pending_message.content.address, size)
expected_file_hash = pending_message.content.item_hash
else:
expected_file_hash = None
file_io = multidict_proxy_to_io(post)
file_content, item_hash = await read_and_check_file(file_io, expected_file_hash)
# Note that this requires overloading the `StorageService.add_file` method to support passing bytes instead of an IO object
await storage_service.add_file(session=session, file=file_content, engine=ItemType.storage)
src/aleph/web/controllers/p2p.py
Outdated
@@ -26,7 +26,8 @@ | |||
get_p2p_client_from_request, | |||
get_mq_channel_from_request, | |||
) | |||
from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue | |||
from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue, processing_status_to_http_status, \ | |||
mq_read_one_message, validate_message_dict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run the black formatter on your PR.
src/aleph/api_entrypoint.py
Outdated
@@ -67,6 +74,7 @@ async def configure_aiohttp_app( | |||
app[APP_STATE_NODE_CACHE] = node_cache | |||
app[APP_STATE_STORAGE_SERVICE] = storage_service | |||
app[APP_STATE_SESSION_FACTORY] = session_factory | |||
# app[APP_STATE_CHAIN_SERVICE] = chain_service |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this line commented?
src/aleph/web/controllers/storage.py
Outdated
|
||
|
||
async def _verify_user_file(message: PendingStoreMessage, size: int, file_io) -> None: | ||
file_io.seek(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to seek? Are you reading the file twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_content = fileobject.read()
The functions add_file on Storage Service class wasn't doing seek when after read (he was doing it for ipf, I replace the seek here and not on _verify_user_file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We really shouldn't read the file twice, that's an expensive operation.
src/aleph/web/controllers/storage.py
Outdated
|
||
session.add(pending_message_db) | ||
session.commit() | ||
if storage_metadata.sync and mq_queue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if storage_metadata.sync and mq_queue: | |
if mq_queue: |
is enough.
src/aleph/web/controllers/storage.py
Outdated
|
||
if mq_message is None: | ||
raise web.HTTPAccepted() | ||
if mq_message.routing_key is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if routing_key
is None?
Note that since you subscribe using a routing key, this case should never happen.
src/aleph/web/controllers/storage.py
Outdated
|
||
# No need to pin it here anymore. | ||
# TODO: find a way to specify linked ipfs hashes in posts/aggr. | ||
# TODO : Add chainservice to ccn_api_client to be able to call get_chainservice_from_request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic of the endpoint is still not what I'd expect. You read the file multiple times, add it to the storage before checks, etc.
src/aleph/storage.py
Outdated
@@ -271,7 +273,7 @@ async def add_file( | |||
elif engine == ItemType.storage: | |||
file_content = fileobject.read() | |||
file_hash = sha256(file_content).hexdigest() | |||
|
|||
fileobject.seek(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As previously discussed, we should never have to read a file twice. If the signature of the function doesn't allow what you need, change it/add a new function to support your use case.
current_balance = get_total_balance( | ||
session=session, address=pending_message_db.sender | ||
) or Decimal(0) | ||
required_balance = (size / MiB) / 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the computation in the view is incorrect, as it still uses MB instead of MiB and counts 3 Aleph/MB instead of 3 MiB/Aleph. Could you fix this (in a different PR if possible)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1048576 = (1024 * 1024) so i thinks the view is correct ?
src/aleph/web/controllers/storage.py
Outdated
status_str, _item_hash = mq_message.routing_key.split(".") | ||
processing_status = MessageProcessingStatus(status_str) | ||
status_code = processing_status_to_http_status(processing_status) | ||
return web.json_response(status=status_code, text=_item_hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not returning a response anymore in the case if not mq_queue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/aleph/web/controllers/storage.py
Outdated
|
||
|
||
async def storage_add_file_with_message( | ||
request: web.Request, session: DbSession, chain_service, storage_metadata, file_io |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing some typing:
request: web.Request, session: DbSession, chain_service, storage_metadata, file_io | |
request: web.Request, session: DbSession, chain_service: ChainService, storage_metadata: StorageMetadata, file_io: BinaryIO |
src/aleph/web/controllers/storage.py
Outdated
config = get_config_from_request(request) | ||
mq_queue = None | ||
|
||
pending_store_message = PendingStoreMessage.parse_obj(storage_metadata.message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call to parse_obj
is not required anymore as storage_metadata
must contain a valid PendingStoreMessage
object already.
src/aleph/web/controllers/storage.py
Outdated
item_content = json.loads(message.item_content) | ||
actual_item_hash = sha256(content).hexdigest() | ||
client_item_hash = item_content["item_hash"] | ||
if len(content) > (1000 * MiB): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You read the file before checking the size. Doing the opposite would make a lot more sense, ex if size > (1000 * MiB)
at the start of the function. You can also perform this check earlier to avoid checking the signature + user balance if the file is too large anyway.
src/aleph/web/controllers/storage.py
Outdated
storage_metadata = StorageMetadata.parse_raw(metadata_content) | ||
else: | ||
storage_metadata = StorageMetadata.parse_raw(metadata) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid wide except Exception
clauses. What are you trying to intercept?
src/aleph/web/controllers/storage.py
Outdated
if isinstance(metadata, FileField): | ||
metadata_content = metadata.file.read() | ||
storage_metadata = StorageMetadata.parse_raw(metadata_content) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure this else
breaks the endpoint when the metadata is not specified. You default to b""
so you will get a Pydantic exception in this branch.
if len(file_io.read()) > (25 * MiB): | ||
raise web.HTTPUnauthorized() | ||
file_io.seek(0) | ||
|
||
with session_factory() as session: | ||
file_hash = await storage_service.add_file( | ||
session=session, fileobject=file_io, engine=ItemType.storage | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping this block for all cases means that you add the file directly in all cases before checking the signature, balance etc.
src/aleph/web/controllers/utils.py
Outdated
try: | ||
return parse_message(message_dict) | ||
except InvalidMessageException as e: | ||
raise web.HTTPUnprocessableEntity(body=str(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: configure your IDE to add an empty line on save.
Co-authored-by: Olivier Desenfans <desenfans.olivier@gmail.com>
This reverts commit f5c8e60.
Problem: file upload is performed in two steps, one to push the file and one to push the associated STORE message.
Solution: improve
/api/v0/storage/add_file
to allow the user to send a STORE message along with his file, making upload an atomic operation.