From 06ecdf5ad3808dfab85b572753515bdc830a2ef5 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 12 Apr 2024 09:42:10 -0400 Subject: [PATCH] fsbucket support --- python/dendro/api_helpers/core/settings.py | 5 +- .../api_helpers/services/gui/create_job.py | 4 +- .../_get_fsbucket_signed_upload_url.py | 23 ++++++++++ .../services/processor/get_upload_url.py | 46 +++++++++++++------ .../services/processor/update_job_status.py | 2 +- 5 files changed, 60 insertions(+), 20 deletions(-) create mode 100644 python/dendro/api_helpers/services/processor/_get_fsbucket_signed_upload_url.py diff --git a/python/dendro/api_helpers/core/settings.py b/python/dendro/api_helpers/core/settings.py index f40096e2..93c9338b 100644 --- a/python/dendro/api_helpers/core/settings.py +++ b/python/dendro/api_helpers/core/settings.py @@ -18,9 +18,10 @@ def __init__(self) -> None: self.DEFAULT_COMPUTE_RESOURCE_ID: Optional[str] = os.environ.get("VITE_DEFAULT_COMPUTE_RESOURCE_ID") - self.OUTPUT_BUCKET_URI: Optional[str] = os.environ.get("OUTPUT_BUCKET_URI") - self.OUTPUT_BUCKET_CREDENTIALS: Optional[str] = os.environ.get("OUTPUT_BUCKET_CREDENTIALS") self.OUTPUT_BUCKET_BASE_URL: Optional[str] = os.environ.get("OUTPUT_BUCKET_BASE_URL") + self.OUTPUT_BUCKET_URI: Optional[str] = os.environ.get("OUTPUT_BUCKET_URI", None) + self.OUTPUT_BUCKET_CREDENTIALS: Optional[str] = os.environ.get("OUTPUT_BUCKET_CREDENTIALS", None) + self.FSBUCKET_SECRET_KEY: Optional[str] = os.environ.get("FSBUCKET_SECRET_KEY", None) def get_settings(): return Settings() diff --git a/python/dendro/api_helpers/services/gui/create_job.py b/python/dendro/api_helpers/services/gui/create_job.py index 6b49828c..74783c57 100644 --- a/python/dendro/api_helpers/services/gui/create_job.py +++ b/python/dendro/api_helpers/services/gui/create_job.py @@ -163,8 +163,8 @@ def filter_output_file_name(file_name): processorSpec=processor_spec, batchId=batch_id, dandiApiKey=dandi_api_key, - consoleOutputUrl=f"{output_bucket_base_url}/dendro-outputs/{job_id}/_console_output", - resourceUtilizationLogUrl=f"{output_bucket_base_url}/dendro-outputs/{job_id}/_resource_utilization_log", + consoleOutputUrl=f"{output_bucket_base_url}/dendro-outputs/{project_id}/{job_id}/_console_output", + resourceUtilizationLogUrl=f"{output_bucket_base_url}/dendro-outputs/{project_id}/{job_id}/_resource_utilization_log", requiredResources=required_resources, runMethod=run_method, pendingApproval=pending_approval diff --git a/python/dendro/api_helpers/services/processor/_get_fsbucket_signed_upload_url.py b/python/dendro/api_helpers/services/processor/_get_fsbucket_signed_upload_url.py new file mode 100644 index 00000000..44e98970 --- /dev/null +++ b/python/dendro/api_helpers/services/processor/_get_fsbucket_signed_upload_url.py @@ -0,0 +1,23 @@ +from typing import Optional +import time +import hashlib + + +def _get_fsbucket_signed_upload_url( + *, fsbucket_api_url, secret_key: str, object_key: str, size: Optional[int] = None +): + # expires in one hour + expires = int(time.time()) + 3600 + signature = _create_signature( + secret_key=secret_key, object_key=object_key, expires=expires, method="PUT" + ) + url = f"{fsbucket_api_url}/{object_key}?signature={signature}&expires={expires}" + return url + + +def _create_signature(*, secret_key: str, object_key: str, expires: int, method: str): + path = f'/{object_key}' + string_to_sign = f"{method}\n{path}\n{expires}\n{secret_key}" + hash = hashlib.sha256() + hash.update(string_to_sign.encode("utf-8")) + return hash.hexdigest() diff --git a/python/dendro/api_helpers/services/processor/get_upload_url.py b/python/dendro/api_helpers/services/processor/get_upload_url.py index 9b0d93a5..21e9f146 100644 --- a/python/dendro/api_helpers/services/processor/get_upload_url.py +++ b/python/dendro/api_helpers/services/processor/get_upload_url.py @@ -2,6 +2,7 @@ from dendro.mock import using_mock from ...core.settings import get_settings from ._get_signed_upload_url import _get_signed_upload_url +from ._get_fsbucket_signed_upload_url import _get_fsbucket_signed_upload_url # note that output_name of "_console_output" and "_resource_utilization_log" are special cases @@ -17,13 +18,16 @@ async def get_upload_url(job_id: str, output_name: str): # if len(aa) == 0: # raise Exception(f"No output with name {output_name} **") - object_key = f"dendro-outputs/{job_id}/{output_name}" + project_id = job_id.split('.')[0] + + object_key = f"dendro-outputs/{project_id}/{job_id}/{output_name}" upload_url, download_url = await _get_upload_url_for_object_key(object_key) return upload_url async def get_upload_url_for_folder_file(job_id: str, output_folder_name: str, output_folder_file_name: str): - object_key = f"dendro-outputs/{job_id}/{output_folder_name}/{output_folder_file_name}" + project_id = job_id.split('.')[0] + object_key = f"dendro-outputs/{project_id}/{job_id}/{output_folder_name}/{output_folder_file_name}" upload_url, download_url = await _get_upload_url_for_object_key(object_key) return upload_url @@ -32,7 +36,8 @@ async def get_additional_upload_url(*, job_id: str, sha1: str): if not _is_valid_sha1(sha1): raise Exception('Invalid sha1 string') - object_key = f"dendro-outputs/{job_id}/sha1/{sha1}" + project_id = job_id.split('.')[0] + object_key = f"dendro-outputs/{project_id}/{job_id}/sha1/{sha1}" upload_url, download_url = await _get_upload_url_for_object_key(object_key) return upload_url, download_url @@ -43,12 +48,6 @@ async def _get_upload_url_for_object_key(object_key: str, size: Optional[int] = if using_mock(): return f"https://mock-bucket.s3.amazonaws.com/{object_key}?mock-signature", f"https://mock-bucket/{object_key}" - OUTPUT_BUCKET_URI = settings.OUTPUT_BUCKET_URI - if OUTPUT_BUCKET_URI is None: - raise Exception('Environment variable not set: OUTPUT_BUCKET_URI') - OUTPUT_BUCKET_CREDENTIALS = settings.OUTPUT_BUCKET_CREDENTIALS - if OUTPUT_BUCKET_CREDENTIALS is None: - raise Exception('Environment variable not set: OUTPUT_BUCKET_CREDENTIALS') OUTPUT_BUCKET_BASE_URL = settings.OUTPUT_BUCKET_BASE_URL if OUTPUT_BUCKET_BASE_URL is None: raise Exception('Environment variable not set: OUTPUT_BUCKET_BASE_URL') @@ -57,17 +56,34 @@ async def _get_upload_url_for_object_key(object_key: str, size: Optional[int] = else: output_bucket_base_url = OUTPUT_BUCKET_BASE_URL - signed_upload_url = await _get_signed_upload_url( - bucket_uri=OUTPUT_BUCKET_URI, - bucket_credentials=OUTPUT_BUCKET_CREDENTIALS, - object_key=object_key, - size=size - ) + FSBUCKET_SECRET_KEY = settings.FSBUCKET_SECRET_KEY + if FSBUCKET_SECRET_KEY is not None: + signed_upload_url = _get_fsbucket_signed_upload_url( + fsbucket_api_url=OUTPUT_BUCKET_BASE_URL, + secret_key=FSBUCKET_SECRET_KEY, + object_key=object_key, + size=size + ) + else: + OUTPUT_BUCKET_URI = settings.OUTPUT_BUCKET_URI + if OUTPUT_BUCKET_URI is None: + raise Exception('Environment variable not set: OUTPUT_BUCKET_URI') + OUTPUT_BUCKET_CREDENTIALS = settings.OUTPUT_BUCKET_CREDENTIALS + if OUTPUT_BUCKET_CREDENTIALS is None: + raise Exception('Environment variable not set: OUTPUT_BUCKET_CREDENTIALS') + + signed_upload_url = await _get_signed_upload_url( + bucket_uri=OUTPUT_BUCKET_URI, + bucket_credentials=OUTPUT_BUCKET_CREDENTIALS, + object_key=object_key, + size=size + ) download_url = f'{output_bucket_base_url}/{object_key}' return signed_upload_url, download_url + def _is_valid_sha1(sha1: str): if len(sha1) != 40: return False diff --git a/python/dendro/api_helpers/services/processor/update_job_status.py b/python/dendro/api_helpers/services/processor/update_job_status.py index 5f73ea61..ab2778db 100644 --- a/python/dendro/api_helpers/services/processor/update_job_status.py +++ b/python/dendro/api_helpers/services/processor/update_job_status.py @@ -50,7 +50,7 @@ async def update_job_status( output_file_ids = [] for output_file in job.outputFiles: if not output_file.skipCloudUpload: - output_file_url = f"{output_bucket_base_url}/dendro-outputs/{job.jobId}/{output_file.name}" + output_file_url = f"{output_bucket_base_url}/dendro-outputs/{job.projectId}/{job.jobId}/{output_file.name}" else: # file_id will be filled in output_file_url = f'dendro:?project={job.projectId}&file_id=$file_id$&label={output_file.fileName}&compute_resource={job.computeResourceId}'