Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 94 additions & 34 deletions appwrite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import platform
import sys
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from .input_file import InputFile
from .exception import AppwriteException
from .encoders.value_class_encoder import ValueClassEncoder
Expand Down Expand Up @@ -186,14 +188,15 @@ def chunked_upload(

if input_file.source_type == 'path':
size = os.stat(input_file.path).st_size
input = open(input_file.path, 'rb')
input = None
elif input_file.source_type == 'bytes':
size = len(input_file.data)
input = input_file.data

if size < self._chunk_size:
if input_file.source_type == 'path':
input_file.data = input.read()
with open(input_file.path, 'rb') as input:
input_file.data = input.read()

params[param_name] = input_file
return self.call(
Expand All @@ -214,46 +217,103 @@ def chunked_upload(

if counter > 0:
offset = counter * self._chunk_size
input.seek(offset)

total_chunks = (size + self._chunk_size - 1) // self._chunk_size
chunks = []
while offset < size:
if input_file.source_type == 'path':
input_file.data = input.read(self._chunk_size) or input.read(size - offset)
elif input_file.source_type == 'bytes':
if offset + self._chunk_size < size:
end = offset + self._chunk_size
else:
end = size
input_file.data = input[offset:end]
end = min(offset + self._chunk_size, size)
chunks.append({
'index': counter,
'start': offset,
'end': end,
})
offset = end
counter = counter + 1

params[param_name] = input_file
headers["content-range"] = f'bytes {offset}-{min((offset + self._chunk_size) - 1, size - 1)}/{size}'
if not chunks:
return result

def read_chunk(start, end):
if input_file.source_type == 'path':
with open(input_file.path, 'rb') as chunk_file:
chunk_file.seek(start)
return chunk_file.read(end - start)
return input[start:end]

upload_id_header = upload_id
completed_count = chunks[0]['index']
uploaded_size = chunks[0]['start']
progress_lock = Lock()
last_result = None
final_result = None

def is_upload_complete(chunk_result):
chunks_uploaded = chunk_result.get('chunksUploaded')
if chunks_uploaded is None:
return False
chunks_total = chunk_result.get('chunksTotal', total_chunks)
return int(chunks_uploaded) >= int(chunks_total)

def upload_chunk(chunk, current_upload_id):
chunk_input = InputFile.from_bytes(
read_chunk(chunk['start'], chunk['end']),
input_file.filename,
getattr(input_file, 'mime_type', None)
)
chunk_params = {**params, param_name: chunk_input}
chunk_headers = {**headers}
chunk_headers["content-range"] = f"bytes {chunk['start']}-{chunk['end'] - 1}/{size}"
if current_upload_id:
chunk_headers["x-appwrite-id"] = current_upload_id

result = self.call(
return self.call(
'post',
path,
headers,
params,
chunk_headers,
chunk_params,
)

offset = offset + self._chunk_size

if "$id" in result:
headers["x-appwrite-id"] = result["$id"]

if on_progress is not None:
end = min((((counter * self._chunk_size) + self._chunk_size) - 1), size - 1)
on_progress({
"$id": result["$id"],
"progress": min(offset, size)/size * 100,
"sizeUploaded": end+1,
"chunksTotal": result["chunksTotal"],
"chunksUploaded": result["chunksUploaded"],
})

counter = counter + 1

return result
result = upload_chunk(chunks[0], upload_id_header)
last_result = result
if "$id" in result:
upload_id_header = result["$id"]

completed_count = chunks[0]['index'] + 1
uploaded_size = chunks[0]['end']

if on_progress is not None:
on_progress({
"$id": result.get("$id"),
"progress": uploaded_size / size * 100,
"sizeUploaded": uploaded_size,
"chunksTotal": total_chunks,
"chunksUploaded": completed_count,
})
Comment thread
greptile-apps[bot] marked this conversation as resolved.

def upload_remaining_chunk(chunk):
nonlocal completed_count, uploaded_size, last_result, final_result
chunk_result = upload_chunk(chunk, upload_id_header)
with progress_lock:
completed_count = completed_count + 1
uploaded_size = uploaded_size + (chunk['end'] - chunk['start'])
last_result = chunk_result
if is_upload_complete(chunk_result):
final_result = chunk_result
if on_progress is not None:
on_progress({
"$id": upload_id_header,
"progress": uploaded_size / size * 100,
"sizeUploaded": uploaded_size,
"chunksTotal": total_chunks,
"chunksUploaded": completed_count,
})

with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(upload_remaining_chunk, chunk) for chunk in chunks[1:]]
for future in as_completed(futures):
future.result()

return final_result or last_result

def flatten(self, data, prefix='', stringify=False):
output = {}
Expand Down