diff --git a/.github/workflows/pyright.yml b/.github/workflows/pyright.yml index 2412b3c4..c641d65e 100644 --- a/.github/workflows/pyright.yml +++ b/.github/workflows/pyright.yml @@ -15,4 +15,4 @@ jobs: - run: uv pip install --system -e .[cloudserver,gui] - uses: jakebailey/pyright-action@v2 with: - version: 1.1.308 + version: 1.1.404 diff --git a/dpdispatcher/contexts/dp_cloud_server_context.py b/dpdispatcher/contexts/dp_cloud_server_context.py index 4b92f1c9..4d827a80 100644 --- a/dpdispatcher/contexts/dp_cloud_server_context.py +++ b/dpdispatcher/contexts/dp_cloud_server_context.py @@ -161,7 +161,9 @@ def upload(self, submission): # return oss_task_zip # api.upload(self.oss_task_dir, zip_task_file) - def download(self, submission): + def download( + self, submission, check_exists=False, mark_failure=True, back_error=False + ): jobs = submission.belonging_jobs job_hashs = {} job_infos = {} diff --git a/dpdispatcher/contexts/lazy_local_context.py b/dpdispatcher/contexts/lazy_local_context.py index b6b35101..5eaa9264 100644 --- a/dpdispatcher/contexts/lazy_local_context.py +++ b/dpdispatcher/contexts/lazy_local_context.py @@ -83,7 +83,7 @@ def get_job_root(self): def upload( self, - jobs, + submission, # local_up_files, dereference=True, ): @@ -91,7 +91,7 @@ def upload( def download( self, - jobs, + submission, # remote_down_files, check_exists=False, mark_failure=True, diff --git a/dpdispatcher/contexts/openapi_context.py b/dpdispatcher/contexts/openapi_context.py index ccae90d5..94b25ba8 100644 --- a/dpdispatcher/contexts/openapi_context.py +++ b/dpdispatcher/contexts/openapi_context.py @@ -95,11 +95,11 @@ def __init__( raise ValueError( "remote_profile must contain 'project_id' or set environment variable 'BOHRIUM_PROJECT_ID'" ) - self.client = Bohrium( + self.client = Bohrium( # type: ignore[reportPossiblyUnboundVariable] access_key=access_key, project_id=project_id, app_key=app_key ) - self.storage = Tiefblue() - self.job = Job(client=self.client) + self.storage = Tiefblue() # type: ignore[reportPossiblyUnboundVariable] + self.job = Job(client=self.client) # type: ignore[reportPossiblyUnboundVariable] self.jgid = None os.makedirs(DP_CLOUD_SERVER_HOME_DIR, exist_ok=True) @@ -206,7 +206,9 @@ def upload(self, submission): # return oss_task_zip # api.upload(self.oss_task_dir, zip_task_file) - def download(self, submission): + def download( + self, submission, check_exists=False, mark_failure=True, back_error=False + ): jobs = submission.belonging_jobs job_hashs = {} job_infos = {} diff --git a/dpdispatcher/contexts/ssh_context.py b/dpdispatcher/contexts/ssh_context.py index 000a550c..2f662f46 100644 --- a/dpdispatcher/contexts/ssh_context.py +++ b/dpdispatcher/contexts/ssh_context.py @@ -825,8 +825,8 @@ def call(self, cmd): # print(pid) return {"stdin": stdin, "stdout": stdout, "stderr": stderr} - def check_finish(self, cmd_pipes): - return cmd_pipes["stdout"].channel.exit_status_ready() + def check_finish(self, proc): + return proc["stdout"].channel.exit_status_ready() def get_return(self, cmd_pipes): if not self.check_finish(cmd_pipes): @@ -888,11 +888,11 @@ def _put_files( # local tar if os.path.isfile(os.path.join(self.local_root, of)): os.remove(os.path.join(self.local_root, of)) - with tarfile.open( + with tarfile.open( # type: ignore[reportCallIssue, reportArgumentType] os.path.join(self.local_root, of), - tarfile_mode, - dereference=dereference, - **kwargs, + mode=tarfile_mode, # type: ignore[reportArgumentType] + dereference=dereference, # type: ignore[reportArgumentType] + **kwargs, # type: ignore[reportArgumentType] ) as tar: # avoid compressing duplicated files or directories for ii in set(files): diff --git a/dpdispatcher/machine.py b/dpdispatcher/machine.py index a78b61f5..01a51557 100644 --- a/dpdispatcher/machine.py +++ b/dpdispatcher/machine.py @@ -227,7 +227,7 @@ def check_if_recover(self, submission): return if_recover @abstractmethod - def check_finish_tag(self, **kwargs): + def check_finish_tag(self, job): raise NotImplementedError( "abstract method check_finish_tag should be implemented by derived class" ) diff --git a/dpdispatcher/machines/JH_UniScheduler.py b/dpdispatcher/machines/JH_UniScheduler.py index e0fa5e1b..a9f5a421 100644 --- a/dpdispatcher/machines/JH_UniScheduler.py +++ b/dpdispatcher/machines/JH_UniScheduler.py @@ -84,9 +84,6 @@ def do_submit(self, job): self.context.write_file(job_id_name, job_id) return job_id - def default_resources(self, resources): - pass - @retry() def check_status(self, job): try: diff --git a/dpdispatcher/machines/fugaku.py b/dpdispatcher/machines/fugaku.py index 5dba7391..d4a38a4b 100644 --- a/dpdispatcher/machines/fugaku.py +++ b/dpdispatcher/machines/fugaku.py @@ -67,9 +67,6 @@ def do_submit(self, job): self.context.write_file(job_id_name, job_id) return job_id - def default_resources(self, resources): - pass - def check_status(self, job): job_id = job.job_id if job_id == "": diff --git a/dpdispatcher/machines/lsf.py b/dpdispatcher/machines/lsf.py index eb0e0549..035ceace 100644 --- a/dpdispatcher/machines/lsf.py +++ b/dpdispatcher/machines/lsf.py @@ -102,9 +102,6 @@ def do_submit(self, job): return job_id # TODO: derive abstract methods - def default_resources(self, resources): - pass - def sub_script_cmd(self, res): pass diff --git a/dpdispatcher/machines/openapi.py b/dpdispatcher/machines/openapi.py index 3073c2f9..e5514dce 100644 --- a/dpdispatcher/machines/openapi.py +++ b/dpdispatcher/machines/openapi.py @@ -64,11 +64,11 @@ def __init__(self, context): raise ValueError( "remote_profile must contain 'project_id' or set environment variable 'BOHRIUM_PROJECT_ID'" ) - self.client = Bohrium( + self.client = Bohrium( # type: ignore[reportPossiblyUnboundVariable] access_key=access_key, project_id=project_id, app_key=app_key ) - self.storage = Tiefblue() - self.job = Job(client=self.client) + self.storage = Tiefblue() # type: ignore[reportPossiblyUnboundVariable] + self.job = Job(client=self.client) # type: ignore[reportPossiblyUnboundVariable] self.group_id = None def gen_script(self, job): diff --git a/dpdispatcher/machines/pbs.py b/dpdispatcher/machines/pbs.py index a055c50c..35ef4c44 100644 --- a/dpdispatcher/machines/pbs.py +++ b/dpdispatcher/machines/pbs.py @@ -69,9 +69,6 @@ def do_submit(self, job): self.context.write_file(job_id_name, job_id) return job_id - def default_resources(self, resources): - pass - def check_status(self, job): job_id = job.job_id if job_id == "": @@ -255,9 +252,6 @@ def do_submit(self, job): self.context.write_file(job_id_name, job_id) return job_id - def default_resources(self, resources): - pass - def check_status(self, job): ### https://softpanorama.org/HPC/Grid_engine/Queues/queue_states.shtml job_id = job.job_id diff --git a/dpdispatcher/machines/shell.py b/dpdispatcher/machines/shell.py index babb1971..2205e333 100644 --- a/dpdispatcher/machines/shell.py +++ b/dpdispatcher/machines/shell.py @@ -60,9 +60,6 @@ def do_submit(self, job): # self.context.write_file(job_id_name, job_id) # return job_id - def default_resources(self, resources): - pass - def check_status(self, job): job_id = job.job_id # print('shell.check_status.job_id', job_id) diff --git a/dpdispatcher/machines/slurm.py b/dpdispatcher/machines/slurm.py index f8ae4744..d4f2b328 100644 --- a/dpdispatcher/machines/slurm.py +++ b/dpdispatcher/machines/slurm.py @@ -118,9 +118,6 @@ def do_submit(self, job): self.context.write_file(job_id_name, job_id) return job_id - def default_resources(self, resources): - pass - @retry() def check_status(self, job): job_id = job.job_id diff --git a/dpdispatcher/utils/dpcloudserver/client.py b/dpdispatcher/utils/dpcloudserver/client.py index a4897bfe..23eab8a0 100644 --- a/dpdispatcher/utils/dpcloudserver/client.py +++ b/dpdispatcher/utils/dpcloudserver/client.py @@ -142,10 +142,10 @@ def _get_oss_bucket(self, endpoint, bucket_name): res = self.get("/data/get_sts_token", {}) # print('debug>>>>>>>>>>>>>', res) dlog.debug(f"debug: _get_oss_bucket: res:{res}") - auth = oss2.StsAuth( + auth = oss2.StsAuth( # type: ignore[reportPossiblyUnboundVariable] res["AccessKeyId"], res["AccessKeySecret"], res["SecurityToken"] ) - return oss2.Bucket(auth, endpoint, bucket_name) + return oss2.Bucket(auth, endpoint, bucket_name) # type: ignore[reportPossiblyUnboundVariable] def download(self, oss_file, save_file, endpoint, bucket_name): bucket = self._get_oss_bucket(endpoint, bucket_name) @@ -184,7 +184,7 @@ def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name): ) bucket = self._get_oss_bucket(endpoint, bucket_name) total_size = os.path.getsize(zip_task_file) - part_size = determine_part_size(total_size, preferred_size=1000 * 1024) + part_size = determine_part_size(total_size, preferred_size=1000 * 1024) # type: ignore[reportPossiblyUnboundVariable] upload_id = bucket.init_multipart_upload(oss_task_zip).upload_id parts = [] with open(zip_task_file, "rb") as fileobj: @@ -196,9 +196,9 @@ def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name): oss_task_zip, upload_id, part_number, - SizedFileAdapter(fileobj, num_to_upload), + SizedFileAdapter(fileobj, num_to_upload), # type: ignore[reportPossiblyUnboundVariable] ) - parts.append(PartInfo(part_number, result.etag)) + parts.append(PartInfo(part_number, result.etag)) # type: ignore[reportPossiblyUnboundVariable] offset += num_to_upload part_number += 1 # result = bucket.complete_multipart_upload(oss_task_zip, upload_id, parts)