From 0b7a157dec85990a9d918141d8bb6e83983913df Mon Sep 17 00:00:00 2001 From: Alexander Piskun Date: Mon, 25 Sep 2023 20:11:41 +0300 Subject: [PATCH] implemented chunked file upload v2 Signed-off-by: Alexander Piskun --- CHANGELOG.md | 11 +++++++ nc_py_api/_session.py | 2 ++ nc_py_api/files/files.py | 47 +++++++++++++++++++----------- nc_py_api/options.py | 7 +++++ tests/actual_tests/files_test.py | 9 ++++++ tests/actual_tests/options_test.py | 18 ++++++++++++ 6 files changed, 77 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55542a9b..14b3cc3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ All notable changes to this project will be documented in this file. +## [0.2.2 - 2023-09-2x] + +### Added + +- FilesAPI: [Chunked v2 upload](https://docs.nextcloud.com/server/latest/developer_manual/client_apis/WebDAV/chunking.html#chunked-upload-v2) support, enabled by default. +- New option to disable `chunked v2 upload` if there is need for that: `CHUNKED_UPLOAD_V2` + +### Changed + +- Default `chunk_size` argument is now 5Mb instead of 4Mb. + ## [0.2.1 - 2023-09-14] ### Added diff --git a/nc_py_api/_session.py b/nc_py_api/_session.py index 4cf8d48f..8ab790eb 100644 --- a/nc_py_api/_session.py +++ b/nc_py_api/_session.py @@ -55,12 +55,14 @@ class RuntimeOptions: timeout: Optional[int] timeout_dav: Optional[int] _nc_cert: Union[str, bool] + upload_chunk_v2: bool def __init__(self, **kwargs): self.xdebug_session = kwargs.get("xdebug_session", options.XDEBUG_SESSION) self.timeout = kwargs.get("npa_timeout", options.NPA_TIMEOUT) self.timeout_dav = kwargs.get("npa_timeout_dav", options.NPA_TIMEOUT_DAV) self._nc_cert = kwargs.get("npa_nc_cert", options.NPA_NC_CERT) + self.upload_chunk_v2 = kwargs.get("chunked_upload_v2", options.CHUNKED_UPLOAD_V2) @property def nc_cert(self) -> Union[str, bool]: diff --git a/nc_py_api/files/files.py b/nc_py_api/files/files.py index ee3dc6b8..ac3067a3 100644 --- a/nc_py_api/files/files.py +++ b/nc_py_api/files/files.py @@ -146,7 +146,7 @@ def download2stream(self, path: Union[str, FsNode], fp, **kwargs) -> None: :param path: path to download file. :param fp: filename (string), pathlib.Path object or a file object. The object must implement the ``file.write`` method and be able to write binary data. - :param kwargs: **chunk_size** an int value specifying chunk size to write. Default = **4Mb** + :param kwargs: **chunk_size** an int value specifying chunk size to write. Default = **5Mb** """ path = path.user_path if isinstance(path, FsNode) else path if isinstance(fp, (str, Path)): @@ -179,7 +179,7 @@ def download_directory_as_zip( result_path, "wb", ) as fp: - for data_chunk in response.iter_raw(chunk_size=kwargs.get("chunk_size", 4 * 1024 * 1024)): + for data_chunk in response.iter_raw(chunk_size=kwargs.get("chunk_size", 5 * 1024 * 1024)): fp.write(data_chunk) return Path(result_path) @@ -201,14 +201,15 @@ def upload_stream(self, path: Union[str, FsNode], fp, **kwargs) -> FsNode: :param path: file's upload path. :param fp: filename (string), pathlib.Path object or a file object. The object must implement the ``file.read`` method providing data with str or bytes type. - :param kwargs: **chunk_size** an int value specifying chunk size to read. Default = **4Mb** + :param kwargs: **chunk_size** an int value specifying chunk size to read. Default = **5Mb** """ path = path.user_path if isinstance(path, FsNode) else path + chunk_size = kwargs.get("chunk_size", 5 * 1024 * 1024) if isinstance(fp, (str, Path)): with builtins.open(fp, "rb") as f: - return self.__upload_stream(path, f, **kwargs) + return self.__upload_stream(path, f, chunk_size) elif hasattr(fp, "read"): - return self.__upload_stream(path, fp, **kwargs) + return self.__upload_stream(path, fp, chunk_size) else: raise TypeError("`fp` must be a path to file or an object with `read` method.") @@ -688,36 +689,48 @@ def __download2stream(self, path: str, fp, **kwargs) -> None: ) as response: # type: ignore self._session.response_headers = response.headers check_error(response.status_code, f"download_stream: user={self._session.user}, path={path}") - for data_chunk in response.iter_raw(chunk_size=kwargs.get("chunk_size", 4 * 1024 * 1024)): + for data_chunk in response.iter_raw(chunk_size=kwargs.get("chunk_size", 5 * 1024 * 1024)): fp.write(data_chunk) - def __upload_stream(self, path: str, fp, **kwargs) -> FsNode: - _dav_path = self._dav_get_obj_path(self._session.user, random_string(64), root_path="/uploads") - response = self._session.dav("MKCOL", _dav_path) + def __upload_stream(self, path: str, fp, chunk_size: int) -> FsNode: + _dav_path = self._dav_get_obj_path(self._session.user, "nc-py-api-" + random_string(56), root_path="/uploads") + _v2 = bool(self._session.cfg.options.upload_chunk_v2 and chunk_size >= 5 * 1024 * 1024) + full_path = self._dav_get_obj_path(self._session.user, path) + headers = {"Destination": self._session.cfg.dav_endpoint + full_path} + if _v2: + response = self._session.dav("MKCOL", _dav_path, headers=headers) + else: + response = self._session.dav("MKCOL", _dav_path) check_error(response.status_code) try: - chunk_size = kwargs.get("chunk_size", 4 * 1024 * 1024) - start_bytes = end_bytes = 0 + start_bytes = end_bytes = chunk_number = 0 while True: piece = fp.read(chunk_size) if not piece: break end_bytes = start_bytes + len(piece) - _filename = str(start_bytes).rjust(15, "0") + "-" + str(end_bytes).rjust(15, "0") - response = self._session.dav("PUT", _dav_path + "/" + _filename, data=piece) + if _v2: + response = self._session.dav( + "PUT", _dav_path + "/" + str(chunk_number), data=piece, headers=headers + ) + else: + _filename = str(start_bytes).rjust(15, "0") + "-" + str(end_bytes).rjust(15, "0") + response = self._session.dav("PUT", _dav_path + "/" + _filename, data=piece) check_error( - response.status_code, f"upload_stream: user={self._session.user}, path={path}, cur_size={end_bytes}" + response.status_code, + f"upload_stream(v={_v2}): user={self._session.user}, path={path}, cur_size={end_bytes}", ) start_bytes = end_bytes - full_path = self._dav_get_obj_path(self._session.user, path) - headers = {"Destination": self._session.cfg.dav_endpoint + full_path} + chunk_number += 1 + response = self._session.dav( "MOVE", _dav_path + "/.file", headers=headers, ) check_error( - response.status_code, f"upload_stream: user={self._session.user}, path={path}, total_size={end_bytes}" + response.status_code, + f"upload_stream(v={_v2}): user={self._session.user}, path={path}, total_size={end_bytes}", ) return FsNode(full_path.strip("/"), **self.__get_etag_fileid_from_response(response)) finally: diff --git a/nc_py_api/options.py b/nc_py_api/options.py index 931be3ff..fd786237 100644 --- a/nc_py_api/options.py +++ b/nc_py_api/options.py @@ -38,3 +38,10 @@ NPA_NC_CERT = False elif str_val.lower() not in ("true", "1"): NPA_NC_CERT = str_val + +CHUNKED_UPLOAD_V2 = True +"""Option to enable/disable **version 2** chunked upload(better Object Storages support). + +Additional information can be found in Nextcloud documentation: +`Chunked file upload V2 +`_""" diff --git a/tests/actual_tests/files_test.py b/tests/actual_tests/files_test.py index 46f464b4..9d988493 100644 --- a/tests/actual_tests/files_test.py +++ b/tests/actual_tests/files_test.py @@ -201,6 +201,15 @@ def test_file_upload_file(nc_any): assert nc_any.files.download("test_dir_tmp/test_file_upload_file") == content +def test_file_upload_chunked_v2(nc_any): + with NamedTemporaryFile() as tmp_file: + tmp_file.seek(7 * 1024 * 1024) + tmp_file.write(b"\0") + tmp_file.flush() + nc_any.files.upload_stream("test_dir_tmp/test_file_upload_chunked_v2", tmp_file.name) + assert len(nc_any.files.download("test_dir_tmp/test_file_upload_chunked_v2")) == 7 * 1024 * 1024 + 1 + + @pytest.mark.parametrize("file_name", ("chunked_zero", "chunked_zero/", "chunked_zero//")) def test_file_upload_chunked_zero_size(nc_any, file_name): nc_any.files.delete("/test_dir_tmp/test_file_upload_del", not_fail=True) diff --git a/tests/actual_tests/options_test.py b/tests/actual_tests/options_test.py index 8e650e0d..8e86be86 100644 --- a/tests/actual_tests/options_test.py +++ b/tests/actual_tests/options_test.py @@ -1,6 +1,7 @@ import os import sys from subprocess import PIPE, run +from unittest import mock import nc_py_api @@ -51,3 +52,20 @@ def test_xdebug_session(nc_any): nc_py_api.options.XDEBUG_SESSION = "12345" new_nc = nc_py_api.Nextcloud() if isinstance(nc_any, nc_py_api.Nextcloud) else nc_py_api.NextcloudApp() assert new_nc._session.adapter.cookies["XDEBUG_SESSION"] == "12345" + + +@mock.patch("nc_py_api.options.CHUNKED_UPLOAD_V2", False) +def test_chunked_upload(nc_any): + new_nc = nc_py_api.Nextcloud() if isinstance(nc_any, nc_py_api.Nextcloud) else nc_py_api.NextcloudApp() + assert new_nc._session.cfg.options.upload_chunk_v2 is False + + +def test_chunked_upload2(nc_any): + new_nc = ( + nc_py_api.Nextcloud(chunked_upload_v2=False) + if isinstance(nc_any, nc_py_api.Nextcloud) + else nc_py_api.NextcloudApp(chunked_upload_v2=False) + ) + assert new_nc._session.cfg.options.upload_chunk_v2 is False + new_nc = nc_py_api.Nextcloud() if isinstance(nc_any, nc_py_api.Nextcloud) else nc_py_api.NextcloudApp() + assert new_nc._session.cfg.options.upload_chunk_v2 is True