From fcbc52f6dabb0591ef9b46cedb1591e573808a0f Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Wed, 26 May 2021 07:40:45 -0400 Subject: [PATCH] refactor removing directory and provide asynchronous option The original implementation method removes files one by one using sftp. If the latency of the remote server is high, it is very slow. Thus, it's better to use system's `rm` to remove a directory, which may save a lot of time. Also, in some supercomputers, it's very slow to remove large numbers of files (e.g. directory containing trajectory) due to bad I/O performance. So an asynchronously option is provided. Implement deepmodeling/dpgen#385. Close deepmodeling/dpgen#385. --- dpdispatcher/ssh_context.py | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/dpdispatcher/ssh_context.py b/dpdispatcher/ssh_context.py index 3f4515c4..520a41ca 100644 --- a/dpdispatcher/ssh_context.py +++ b/dpdispatcher/ssh_context.py @@ -159,10 +159,12 @@ class SSHContext (object): def __init__ (self, local_root, ssh_session, + clean_asynchronously=False, ): assert(type(local_root) == str) self.temp_local_root = os.path.abspath(local_root) self.job_uuid = None + self.clean_asynchronously = clean_asynchronously # self.job_uuid = job_uuid # if job_uuid: # self.job_uuid=job_uuid @@ -195,7 +197,8 @@ def from_jdata(cls, jdata): ssh_session = SSHSession(**input) ssh_context = SSHContext( local_root=local_root, - ssh_session=ssh_session + ssh_session=ssh_session, + clean_asynchronously=jdata.get('clean_asynchronously', False), ) return ssh_context @@ -284,8 +287,11 @@ def download(self, def block_checkcall(self, cmd, + asynchronously=False, retry=0) : self.ssh_session.ensure_alive() + if asynchronously: + cmd = "nohup %s >/dev/null &" % cmd stdin, stdout, stderr = self.ssh_session.exec_command(('cd %s ;' % self.remote_root) + cmd) exit_status = stdout.channel.recv_exit_status() if exit_status != 0: @@ -295,7 +301,7 @@ def block_checkcall(self, (exit_status, cmd, self.job_uuid, stderr.read().decode('utf-8'))) dlog.warning("Sleep 60 s and retry the command...") time.sleep(60) - return self.block_checkcall(cmd, retry=retry+1) + return self.block_checkcall(cmd, asynchronously=asynchronously, retry=retry+1) print('debug:self.remote_root, cmd', self.remote_root, cmd) raise RuntimeError("Get error code %d in calling %s through ssh with job: %s . message: %s" % (exit_status, cmd, self.job_uuid, stderr.read().decode('utf-8'))) @@ -310,9 +316,7 @@ def block_call(self, def clean(self) : self.ssh_session.ensure_alive() - sftp = self.ssh.open_sftp() - self._rmtree(sftp, self.remote_root) - sftp.close() + self._rmtree(self.remote_root) def write_file(self, fname, write_str): self.ssh_session.ensure_alive() @@ -363,17 +367,18 @@ def kill(self, cmd_pipes) : self.block_checkcall('kill -15 %s' % cmd_pipes['pid']) - def _rmtree(self, sftp, remotepath, level=0, verbose = False): - for f in sftp.listdir_attr(remotepath): - rpath = os.path.join(remotepath, f.filename) - if stat.S_ISDIR(f.st_mode): - self._rmtree(sftp, rpath, level=(level + 1)) - else: - rpath = os.path.join(remotepath, f.filename) - if verbose: dlog.info('removing %s%s' % (' ' * level, rpath)) - sftp.remove(rpath) - if verbose: dlog.info('removing %s%s' % (' ' * level, remotepath)) - sftp.rmdir(remotepath) + def _rmtree(self, remotepath, verbose = False): + """Remove the remote path.""" + # The original implementation method removes files one by one using sftp. + # If the latency of the remote server is high, it is very slow. + # Thus, it's better to use system's `rm` to remove a directory, which may + # save a lot of time. + if verbose: + dlog.info('removing %s' % remotepath) + # In some supercomputers, it's very slow to remove large numbers of files + # (e.g. directory containing trajectory) due to bad I/O performance. + # So an asynchronously option is provided. + self.block_checkcall('rm -rf %s' % remotepath, asynchronously=self.clean_asynchronously) def _put_files(self, files,