Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions examples/async_upload_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import threading
import time

from flow360.component.case import CaseList
from flow360 import VolumeMesh, Flow360MeshParams, ProgressCallbackInterface
from flow360.component.case import CaseDownloadable

from testcases import OM6test

OM6test.get_files()


my_cases = CaseList()
case1 = my_cases[0].to_case()
case2 = my_cases[1].to_case()

print(case1)
print(case2)


class ProgressCallback(ProgressCallbackInterface):
def __init__(self, name):
self.total = 0
self.bytes_transferred = 0
self.name = name

def total(self, total: int):
self.total = total

def __call__(self, bytes_chunk_transferred):
self.bytes_transferred += bytes_chunk_transferred
print(f"progress {self.name}: {self.bytes_transferred / self.total * 100} %")


def thread_download1():
thread = threading.Thread(
target=case1.results.download_file,
args=[CaseDownloadable.VOLUME],
kwargs={"progress_callback": ProgressCallback("volume case1")},
)
thread.start()
return thread


def thread_download2():
thread = threading.Thread(
target=case2.results.download_file,
args=[CaseDownloadable.VOLUME],
kwargs={"progress_callback": ProgressCallback("volume case2")},
)
thread.start()
return thread


def thread_upload():
meshParams = Flow360MeshParams.from_file(OM6test.mesh_json)
thread = threading.Thread(
target=VolumeMesh.from_file,
args=[OM6test.mesh_filename, meshParams, "OM6wing-mesh"],
kwargs={"progress_callback": ProgressCallback("mesh upload")},
)
thread.start()
return thread


t1 = thread_download1()
t2 = thread_download2()
t3 = thread_upload()


for _ in range(10):
print("This thread continues while upload/download progress")
time.sleep(1)


# wait for thread to finish, this is not necessary if your main thread doesn't stop
t1.join()
t2.join()
t3.join()
1 change: 1 addition & 0 deletions flow360/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from .component.volume_mesh import VolumeMesh
from .component.case import Case
from .component.flow360_solver_params import Flow360MeshParams, MeshBoundary, Flow360Params
from .cloud.s3_utils import ProgressCallbackInterface
3 changes: 2 additions & 1 deletion flow360/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def configure(apikey):
:param apikey:
:return:
"""

if not os.path.exists(f"{home}/.flow360"):
os.makedirs(f"{home}/.flow360")
with open(f"{home}/.flow360/config.toml", "w+", encoding="utf-8") as config_file:
toml_config = toml.loads(config_file.read())
toml_config.update({"default": {"apikey": apikey}})
Expand Down
92 changes: 68 additions & 24 deletions flow360/cloud/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import urllib
from datetime import datetime
from enum import Enum
from abc import ABC, abstractmethod

import boto3
from boto3.s3.transfer import TransferConfig
Expand All @@ -24,6 +25,23 @@
from ..environment import Env
from .http_util import http


class ProgressCallbackInterface(ABC):
"""
Progress callback abstract class
"""

@abstractmethod
def total(self, total: int):
"""
total bytes to transfer
"""

@abstractmethod
def __call__(self, bytes_chunk_transferred):
pass


_s3_config = TransferConfig(
multipart_threshold=1024 * 25,
max_concurrency=50,
Expand Down Expand Up @@ -161,7 +179,9 @@ def _get_grant_url(self, resource_id, file_name: str) -> str:

return None

def upload_file(self, resource_id: str, remote_file_name: str, file_name: str):
def upload_file(
self, resource_id: str, remote_file_name: str, file_name: str, progress_callback=None
):
"""
Upload a file to s3.
:param resource_id:
Expand All @@ -170,24 +190,37 @@ def upload_file(self, resource_id: str, remote_file_name: str, file_name: str):
:return:
"""
if not os.path.exists(file_name):
print(f"mesh file {file_name} does not Exist!")
print(f"file {file_name} does not Exist!")
raise FileNotFoundError()
with _get_progress(_S3Action.UPLOADING) as progress:
task_id = progress.add_task(
"upload", filename=os.path.basename(file_name), total=os.path.getsize(file_name)
)

def _call_back(bytes_in_chunk):
progress.update(task_id, advance=bytes_in_chunk)

token = self._get_s3_sts_token(resource_id, remote_file_name)
token.get_client().upload_file(
token = self._get_s3_sts_token(resource_id, remote_file_name)
client = token.get_client()
if progress_callback:
progress_callback.total = float(os.path.getsize(file_name))
client.upload_file(
Bucket=token.get_bucket(),
Filename=file_name,
Key=token.get_s3_key(),
Callback=_call_back,
Callback=progress_callback,
Config=_s3_config,
)
else:
with _get_progress(_S3Action.UPLOADING) as progress:
task_id = progress.add_task(
"upload", filename=os.path.basename(file_name), total=os.path.getsize(file_name)
)

def _call_back(bytes_in_chunk):
progress.update(task_id, advance=bytes_in_chunk)

token = self._get_s3_sts_token(resource_id, remote_file_name)
client.upload_file(
Bucket=token.get_bucket(),
Filename=file_name,
Key=token.get_s3_key(),
Callback=_call_back,
Config=_s3_config,
)

# pylint: disable=too-many-arguments
def download_file(
Expand All @@ -197,6 +230,7 @@ def download_file(
to_file: str,
keep_folder: bool = True,
overwrite: bool = True,
progress_callback=None,
):
"""
Download a file from s3.
Expand All @@ -206,6 +240,7 @@ def download_file(
:param keep_folder: If true, the downloaded file will be put
in the same folder as the file on cloud. Only works when to_file is a folder name.
:param overwrite: if True overwrite if file exists, otherwise don't download
:param progress_callback: provide custom callback for progress
:return:
"""
to_file = create_base_folder(resource_id, remote_file_name, to_file, keep_folder)
Expand All @@ -215,23 +250,32 @@ def download_file(
token = self._get_s3_sts_token(resource_id, remote_file_name)
client = token.get_client()
meta_data = client.head_object(Bucket=token.get_bucket(), Key=token.get_s3_key())
with _get_progress(_S3Action.DOWNLOADING) as progress:
progress.start()
task_id = progress.add_task(
"download",
filename=os.path.basename(remote_file_name),
total=meta_data.get("ContentLength", 0),
)

def _call_back(bytes_in_chunk):
progress.update(task_id, advance=bytes_in_chunk)

if progress_callback:
progress_callback.total = meta_data.get("ContentLength", 0)
client.download_file(
Bucket=token.get_bucket(),
Filename=to_file,
Key=token.get_s3_key(),
Callback=_call_back,
Callback=progress_callback,
)
else:
with _get_progress(_S3Action.DOWNLOADING) as progress:
progress.start()
task_id = progress.add_task(
"download",
filename=os.path.basename(remote_file_name),
total=meta_data.get("ContentLength", 0),
)

def _call_back(bytes_in_chunk):
progress.update(task_id, advance=bytes_in_chunk)

client.download_file(
Bucket=token.get_bucket(),
Filename=to_file,
Key=token.get_s3_key(),
Callback=_call_back,
)

def _get_s3_sts_token(self, resource_id: str, file_name: str) -> _S3STSToken:
session_key = f"{resource_id}:{self.value}:{file_name}"
Expand Down
6 changes: 4 additions & 2 deletions flow360/component/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,15 @@ def plot(self):
"""
return self._plotter

def download_file(self, downloadable: CaseDownloadable, overwrite: bool = True):
def download_file(self, downloadable: CaseDownloadable, overwrite: bool = True, **kwargs):
"""
download specific file by filename
:param downloadable: filename to download
:param overwrite: when True, overwrites existing file, otherwise skip
"""
return self._case.download_file(f"results/{downloadable.value}", overwrite=overwrite)
return self._case.download_file(
f"results/{downloadable.value}", overwrite=overwrite, **kwargs
)

def download_volumetric(self):
"""
Expand Down
23 changes: 18 additions & 5 deletions flow360/component/flow360_base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,36 @@ def solver_version(self):
"""
return self.info.solver_version

# pylint: disable=too-many-arguments
@on_cloud_resource_only
def download_file(
self, file_name, to_file=".", keep_folder: bool = True, overwrite: bool = True
self,
file_name,
to_file=".",
keep_folder: bool = True,
overwrite: bool = True,
progress_callback=None,
):
"""
general download functionality
"""
self.s3_transfer_method.download_file(
self.id, file_name, to_file, keep_folder, overwrite=overwrite
return self.s3_transfer_method.download_file(
self.id,
file_name,
to_file,
keep_folder,
overwrite=overwrite,
progress_callback=progress_callback,
)

@on_cloud_resource_only
def upload_file(self, remote_file_name: str, file_name: str):
def upload_file(self, remote_file_name: str, file_name: str, progress_callback=None):
"""
general upload functionality
"""
self.s3_transfer_method.upload_file(self.id, remote_file_name, file_name)
self.s3_transfer_method.upload_file(
self.id, remote_file_name, file_name, progress_callback=progress_callback
)


def is_object_cloud_resource(resource: Flow360Resource):
Expand Down
10 changes: 8 additions & 2 deletions flow360/component/surface_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class SurfaceMesh(Flow360BaseModel, extra=Extra.allow):
config: Optional[str]
user_upload_file_name: Optional[str]

def download(self, file_name: str, to_file=".", keep_folder: bool = True):
def download(
self, file_name: str, to_file=".", keep_folder: bool = True, progress_callback=None
):
"""
Download file from surface mesh
:param file_name:
Expand All @@ -32,7 +34,11 @@ def download(self, file_name: str, to_file=".", keep_folder: bool = True):
"""
assert self.surface_mesh_id
S3TransferType.SURFACE_MESH.download_file(
self.surface_mesh_id, file_name, to_file, keep_folder
self.surface_mesh_id,
file_name,
to_file,
keep_folder,
progress_callback=progress_callback,
)

def download_log(self, to_file=".", keep_folder: bool = True):
Expand Down
13 changes: 11 additions & 2 deletions flow360/component/volume_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,15 @@ def all_boundaries(self):
"""
return self.info.boundaries

# pylint: disable=too-many-arguments
@on_cloud_resource_only
def download_file(
self,
file_name: Union[str, VolumeMeshDownloadable],
to_file=".",
keep_folder: bool = True,
overwrite: bool = True,
progress_callback=None,
):
"""
Download file from surface mesh
Expand All @@ -399,7 +401,13 @@ def download_file(
"""
if isinstance(file_name, VolumeMeshDownloadable):
file_name = file_name.value
super().download_file(file_name, to_file, keep_folder, overwrite=overwrite)
return super().download_file(
file_name,
to_file,
keep_folder=keep_folder,
overwrite=overwrite,
progress_callback=progress_callback,
)

@on_cloud_resource_only
def download(self, to_file=".", keep_folder: bool = True):
Expand Down Expand Up @@ -505,6 +513,7 @@ def from_file(
name: str = None,
tags: [str] = None,
solver_version=None,
progress_callback=None,
):
"""
Create volume mesh from ugrid file
Expand Down Expand Up @@ -546,7 +555,7 @@ def from_file(
mesh._params = Flow360MeshParams(**mesh._info.mesh_params.dict())
mesh.init_id(mesh._info.id)
remote_file_name = mesh._remote_file_name(mesh_format, compression, endianness)
mesh.upload_file(remote_file_name, file_name)
mesh.upload_file(remote_file_name, file_name, progress_callback=progress_callback)
mesh._complete_upload(remote_file_name)
return mesh

Expand Down
Loading