diff --git a/appwrite/client.py b/appwrite/client.py index 35a3116..374286a 100644 --- a/appwrite/client.py +++ b/appwrite/client.py @@ -4,6 +4,8 @@ import platform import sys import requests +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock from .input_file import InputFile from .exception import AppwriteException from .encoders.value_class_encoder import ValueClassEncoder @@ -186,14 +188,15 @@ def chunked_upload( if input_file.source_type == 'path': size = os.stat(input_file.path).st_size - input = open(input_file.path, 'rb') + input = None elif input_file.source_type == 'bytes': size = len(input_file.data) input = input_file.data if size < self._chunk_size: if input_file.source_type == 'path': - input_file.data = input.read() + with open(input_file.path, 'rb') as input: + input_file.data = input.read() params[param_name] = input_file return self.call( @@ -214,46 +217,103 @@ def chunked_upload( if counter > 0: offset = counter * self._chunk_size - input.seek(offset) + total_chunks = (size + self._chunk_size - 1) // self._chunk_size + chunks = [] while offset < size: - if input_file.source_type == 'path': - input_file.data = input.read(self._chunk_size) or input.read(size - offset) - elif input_file.source_type == 'bytes': - if offset + self._chunk_size < size: - end = offset + self._chunk_size - else: - end = size - input_file.data = input[offset:end] + end = min(offset + self._chunk_size, size) + chunks.append({ + 'index': counter, + 'start': offset, + 'end': end, + }) + offset = end + counter = counter + 1 - params[param_name] = input_file - headers["content-range"] = f'bytes {offset}-{min((offset + self._chunk_size) - 1, size - 1)}/{size}' + if not chunks: + return result + + def read_chunk(start, end): + if input_file.source_type == 'path': + with open(input_file.path, 'rb') as chunk_file: + chunk_file.seek(start) + return chunk_file.read(end - start) + return input[start:end] + + upload_id_header = upload_id + completed_count = chunks[0]['index'] + uploaded_size = chunks[0]['start'] + progress_lock = Lock() + last_result = None + final_result = None + + def is_upload_complete(chunk_result): + chunks_uploaded = chunk_result.get('chunksUploaded') + if chunks_uploaded is None: + return False + chunks_total = chunk_result.get('chunksTotal', total_chunks) + return int(chunks_uploaded) >= int(chunks_total) + + def upload_chunk(chunk, current_upload_id): + chunk_input = InputFile.from_bytes( + read_chunk(chunk['start'], chunk['end']), + input_file.filename, + getattr(input_file, 'mime_type', None) + ) + chunk_params = {**params, param_name: chunk_input} + chunk_headers = {**headers} + chunk_headers["content-range"] = f"bytes {chunk['start']}-{chunk['end'] - 1}/{size}" + if current_upload_id: + chunk_headers["x-appwrite-id"] = current_upload_id - result = self.call( + return self.call( 'post', path, - headers, - params, + chunk_headers, + chunk_params, ) - offset = offset + self._chunk_size - - if "$id" in result: - headers["x-appwrite-id"] = result["$id"] - - if on_progress is not None: - end = min((((counter * self._chunk_size) + self._chunk_size) - 1), size - 1) - on_progress({ - "$id": result["$id"], - "progress": min(offset, size)/size * 100, - "sizeUploaded": end+1, - "chunksTotal": result["chunksTotal"], - "chunksUploaded": result["chunksUploaded"], - }) - - counter = counter + 1 - - return result + result = upload_chunk(chunks[0], upload_id_header) + last_result = result + if "$id" in result: + upload_id_header = result["$id"] + + completed_count = chunks[0]['index'] + 1 + uploaded_size = chunks[0]['end'] + + if on_progress is not None: + on_progress({ + "$id": result.get("$id"), + "progress": uploaded_size / size * 100, + "sizeUploaded": uploaded_size, + "chunksTotal": total_chunks, + "chunksUploaded": completed_count, + }) + + def upload_remaining_chunk(chunk): + nonlocal completed_count, uploaded_size, last_result, final_result + chunk_result = upload_chunk(chunk, upload_id_header) + with progress_lock: + completed_count = completed_count + 1 + uploaded_size = uploaded_size + (chunk['end'] - chunk['start']) + last_result = chunk_result + if is_upload_complete(chunk_result): + final_result = chunk_result + if on_progress is not None: + on_progress({ + "$id": upload_id_header, + "progress": uploaded_size / size * 100, + "sizeUploaded": uploaded_size, + "chunksTotal": total_chunks, + "chunksUploaded": completed_count, + }) + + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(upload_remaining_chunk, chunk) for chunk in chunks[1:]] + for future in as_completed(futures): + future.result() + + return final_result or last_result def flatten(self, data, prefix='', stringify=False): output = {}