Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload large blob fixes #8717

Merged
merged 6 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions packages/syft/src/syft/service/action/action_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import inspect
from io import BytesIO
from pathlib import Path
import sys
import threading
import time
import traceback
Expand Down Expand Up @@ -769,7 +770,12 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | None:
)
data.upload_to_blobstorage_from_api(api)
else:
storage_entry = CreateBlobStorageEntry.from_obj(data)
serialized = serialize(data, to_bytes=True)
size = sys.getsizeof(serialized)
storage_entry = CreateBlobStorageEntry.from_obj(data, file_size=size)

if not TraceResultRegistry.current_thread_is_tracing():
self.syft_action_data_cache = self.as_empty_data()
if self.syft_blob_storage_entry_id is not None:
# TODO: check if it already exists
storage_entry.id = self.syft_blob_storage_entry_id
Expand All @@ -784,9 +790,7 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | None:
if isinstance(blob_deposit_object, SyftError):
return blob_deposit_object

result = blob_deposit_object.write(
BytesIO(serialize(data, to_bytes=True))
)
result = blob_deposit_object.write(BytesIO(serialized))
if isinstance(result, SyftError):
return result
self.syft_blob_storage_entry_id = (
Expand Down
7 changes: 4 additions & 3 deletions packages/syft/src/syft/store/blob_storage/seaweedfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@
from ...types.syft_object import SYFT_OBJECT_VERSION_3
from ...util.constants import DEFAULT_TIMEOUT

MAX_QUEUE_SIZE = 100
WRITE_EXPIRATION_TIME = 900 # seconds
DEFAULT_FILE_PART_SIZE = (1024**3) * 5 # 5GB
DEFAULT_UPLOAD_CHUNK_SIZE = 819200
DEFAULT_FILE_PART_SIZE = 1024**3 / 100 # 10MB
DEFAULT_UPLOAD_CHUNK_SIZE = 1024 * 800 # 800KB


@serializable()
Expand Down Expand Up @@ -94,7 +95,7 @@ def __init__(self) -> None:
def async_generator(
self, chunk_size: int = DEFAULT_UPLOAD_CHUNK_SIZE
) -> Generator:
item_queue: Queue = Queue()
item_queue: Queue = Queue(maxsize=MAX_QUEUE_SIZE)
threading.Thread(
target=self.add_chunks_to_queue,
kwargs={"queue": item_queue, "chunk_size": chunk_size},
Expand Down
5 changes: 3 additions & 2 deletions packages/syft/src/syft/types/blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,9 @@ class CreateBlobStorageEntry(SyftObject):
extensions: list[str] = []

@classmethod
def from_obj(cls, obj: SyftObject) -> Self:
file_size = sys.getsizeof(serialize._serialize(obj=obj, to_bytes=True))
def from_obj(cls, obj: SyftObject, file_size: int | None = None) -> Self:
if file_size is None:
file_size = sys.getsizeof(serialize._serialize(obj=obj, to_bytes=True))
return cls(file_size=file_size, type_=type(obj))

@classmethod
Expand Down