Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pyright.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ jobs:
- run: uv pip install --system -e .[cloudserver,gui]
- uses: jakebailey/pyright-action@v2
with:
version: 1.1.308
version: 1.1.404
4 changes: 3 additions & 1 deletion dpdispatcher/contexts/dp_cloud_server_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ def upload(self, submission):
# return oss_task_zip
# api.upload(self.oss_task_dir, zip_task_file)

def download(self, submission):
def download(
self, submission, check_exists=False, mark_failure=True, back_error=False
):
jobs = submission.belonging_jobs
job_hashs = {}
job_infos = {}
Expand Down
4 changes: 2 additions & 2 deletions dpdispatcher/contexts/lazy_local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ def get_job_root(self):

def upload(
self,
jobs,
submission,
# local_up_files,
dereference=True,
):
pass

def download(
self,
jobs,
submission,
# remote_down_files,
check_exists=False,
mark_failure=True,
Expand Down
10 changes: 6 additions & 4 deletions dpdispatcher/contexts/openapi_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ def __init__(
raise ValueError(
"remote_profile must contain 'project_id' or set environment variable 'BOHRIUM_PROJECT_ID'"
)
self.client = Bohrium(
self.client = Bohrium( # type: ignore[reportPossiblyUnboundVariable]
access_key=access_key, project_id=project_id, app_key=app_key
)
self.storage = Tiefblue()
self.job = Job(client=self.client)
self.storage = Tiefblue() # type: ignore[reportPossiblyUnboundVariable]
self.job = Job(client=self.client) # type: ignore[reportPossiblyUnboundVariable]
self.jgid = None
os.makedirs(DP_CLOUD_SERVER_HOME_DIR, exist_ok=True)

Expand Down Expand Up @@ -206,7 +206,9 @@ def upload(self, submission):
# return oss_task_zip
# api.upload(self.oss_task_dir, zip_task_file)

def download(self, submission):
def download(
self, submission, check_exists=False, mark_failure=True, back_error=False
):
jobs = submission.belonging_jobs
job_hashs = {}
job_infos = {}
Expand Down
12 changes: 6 additions & 6 deletions dpdispatcher/contexts/ssh_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ def call(self, cmd):
# print(pid)
return {"stdin": stdin, "stdout": stdout, "stderr": stderr}

def check_finish(self, cmd_pipes):
return cmd_pipes["stdout"].channel.exit_status_ready()
def check_finish(self, proc):
return proc["stdout"].channel.exit_status_ready()

def get_return(self, cmd_pipes):
if not self.check_finish(cmd_pipes):
Expand Down Expand Up @@ -888,11 +888,11 @@ def _put_files(
# local tar
if os.path.isfile(os.path.join(self.local_root, of)):
os.remove(os.path.join(self.local_root, of))
with tarfile.open(
with tarfile.open( # type: ignore[reportCallIssue, reportArgumentType]
os.path.join(self.local_root, of),
tarfile_mode,
dereference=dereference,
**kwargs,
mode=tarfile_mode, # type: ignore[reportArgumentType]
dereference=dereference, # type: ignore[reportArgumentType]
**kwargs, # type: ignore[reportArgumentType]
) as tar:
# avoid compressing duplicated files or directories
for ii in set(files):
Expand Down
2 changes: 1 addition & 1 deletion dpdispatcher/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def check_if_recover(self, submission):
return if_recover

@abstractmethod
def check_finish_tag(self, **kwargs):
def check_finish_tag(self, job):
raise NotImplementedError(
"abstract method check_finish_tag should be implemented by derived class"
)
Expand Down
3 changes: 0 additions & 3 deletions dpdispatcher/machines/JH_UniScheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ def do_submit(self, job):
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

@retry()
def check_status(self, job):
try:
Expand Down
3 changes: 0 additions & 3 deletions dpdispatcher/machines/fugaku.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ def do_submit(self, job):
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

def check_status(self, job):
job_id = job.job_id
if job_id == "":
Expand Down
3 changes: 0 additions & 3 deletions dpdispatcher/machines/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ def do_submit(self, job):
return job_id

# TODO: derive abstract methods
def default_resources(self, resources):
pass

def sub_script_cmd(self, res):
pass

Expand Down
6 changes: 3 additions & 3 deletions dpdispatcher/machines/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ def __init__(self, context):
raise ValueError(
"remote_profile must contain 'project_id' or set environment variable 'BOHRIUM_PROJECT_ID'"
)
self.client = Bohrium(
self.client = Bohrium( # type: ignore[reportPossiblyUnboundVariable]
access_key=access_key, project_id=project_id, app_key=app_key
)
self.storage = Tiefblue()
self.job = Job(client=self.client)
self.storage = Tiefblue() # type: ignore[reportPossiblyUnboundVariable]
self.job = Job(client=self.client) # type: ignore[reportPossiblyUnboundVariable]
self.group_id = None

def gen_script(self, job):
Expand Down
6 changes: 0 additions & 6 deletions dpdispatcher/machines/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ def do_submit(self, job):
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

def check_status(self, job):
job_id = job.job_id
if job_id == "":
Expand Down Expand Up @@ -255,9 +252,6 @@ def do_submit(self, job):
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

def check_status(self, job):
### https://softpanorama.org/HPC/Grid_engine/Queues/queue_states.shtml
job_id = job.job_id
Expand Down
3 changes: 0 additions & 3 deletions dpdispatcher/machines/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ def do_submit(self, job):
# self.context.write_file(job_id_name, job_id)
# return job_id

def default_resources(self, resources):
pass

def check_status(self, job):
job_id = job.job_id
# print('shell.check_status.job_id', job_id)
Expand Down
3 changes: 0 additions & 3 deletions dpdispatcher/machines/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ def do_submit(self, job):
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

@retry()
def check_status(self, job):
job_id = job.job_id
Expand Down
10 changes: 5 additions & 5 deletions dpdispatcher/utils/dpcloudserver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ def _get_oss_bucket(self, endpoint, bucket_name):
res = self.get("/data/get_sts_token", {})
# print('debug>>>>>>>>>>>>>', res)
dlog.debug(f"debug: _get_oss_bucket: res:{res}")
auth = oss2.StsAuth(
auth = oss2.StsAuth( # type: ignore[reportPossiblyUnboundVariable]
res["AccessKeyId"], res["AccessKeySecret"], res["SecurityToken"]
)
return oss2.Bucket(auth, endpoint, bucket_name)
return oss2.Bucket(auth, endpoint, bucket_name) # type: ignore[reportPossiblyUnboundVariable]

def download(self, oss_file, save_file, endpoint, bucket_name):
bucket = self._get_oss_bucket(endpoint, bucket_name)
Expand Down Expand Up @@ -184,7 +184,7 @@ def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name):
)
bucket = self._get_oss_bucket(endpoint, bucket_name)
total_size = os.path.getsize(zip_task_file)
part_size = determine_part_size(total_size, preferred_size=1000 * 1024)
part_size = determine_part_size(total_size, preferred_size=1000 * 1024) # type: ignore[reportPossiblyUnboundVariable]
upload_id = bucket.init_multipart_upload(oss_task_zip).upload_id
parts = []
with open(zip_task_file, "rb") as fileobj:
Expand All @@ -196,9 +196,9 @@ def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name):
oss_task_zip,
upload_id,
part_number,
SizedFileAdapter(fileobj, num_to_upload),
SizedFileAdapter(fileobj, num_to_upload), # type: ignore[reportPossiblyUnboundVariable]
)
parts.append(PartInfo(part_number, result.etag))
parts.append(PartInfo(part_number, result.etag)) # type: ignore[reportPossiblyUnboundVariable]
offset += num_to_upload
part_number += 1
# result = bucket.complete_multipart_upload(oss_task_zip, upload_id, parts)
Expand Down
Loading