Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "rda_python_common"
version = "2.0.18"
version = "2.0.19"
authors = [
{ name="Zaihua Ji", email="zji@ucar.edu" },
]
Expand Down
32 changes: 16 additions & 16 deletions src/rda_python_common/pg_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def local_copy_object(self, tofile, fromfile, bucket = None, meta = None, logact
tinfo = self.check_object_file(tofile, bucket, 0, logact)
if tinfo and tinfo['data_size'] > 0:
return self.pglog("{}-{}-{}: file exists already".format(self.OHOST, bucket, tofile), logact)
ocmd = self.valid_command(self.OBJCTCMD, logact)
ocmd = self.OBJCTCMD
cmd = "{} ul -lf {} -b {} -k {} -md '{}'".format(ocmd, fromfile, bucket, tofile, uinfo)
for loop in range(2):
buf = self.pgsystem(cmd, logact, self.CMDBTH)
Expand Down Expand Up @@ -295,7 +295,7 @@ def quasar_multiple_trasnfer(self, tofiles, fromfiles, topoint, frompoint, logac
destination_endpoint = topoint
label = f"{self.ENDPOINTS[frompoint]} to {self.ENDPOINTS[topoint]} {action}"
verify_checksum = True
bcmd = self.valid_command(self.BACKCMD, logact)
bcmd = self.BACKCMD
cmd = f'{bcmd} {action} -se {source_endpoint} -de {destination_endpoint} --label "{label}"'
if verify_checksum:
cmd += ' -vc'
Expand Down Expand Up @@ -325,7 +325,7 @@ def endpoint_copy_endpoint(self, tofile, fromfile, topoint, frompoint, logact =
if tinfo and tinfo['data_size'] > 0:
return self.pglog("{}-{}: file exists already".format(topoint, tofile), logact)
action = 'transfer'
bcmd = self.valid_command(self.BACKCMD, logact)
bcmd = self.BACKCMD
cmd = f'{bcmd} {action} -se {frompoint} -de {topoint} -sf {fromfile} -df {tofile} -vc'
task = self.submit_globus_task(cmd, topoint, logact)
if task['stat'] == 'S':
Expand Down Expand Up @@ -373,7 +373,7 @@ def check_globus_status(self, taskid, endpoint = None, logact = 0):
if not taskid: return ret
if not endpoint: endpoint = self.PGLOG['BACKUPEP']
mp = r'Status:\s+({})'.format('|'.join(self.QSTATS.values()))
bcmd = self.valid_command(self.BACKCMD, logact)
bcmd = self.BACKCMD
cmd = f"{bcmd} get-task {taskid}"
astats = ['OK', 'Queued']
for loop in range(2):
Expand Down Expand Up @@ -502,7 +502,7 @@ def object_copy_local(self, tofile, fromfile, bucket = None, logact = 0):
if not finfo:
if finfo != None: return ret
return self.lmsg(fromfile, "{}-{} to copy to {}".format(self.OHOST, self.PGLOG['MISSFILE'], tofile), logact)
ocmd = self.valid_command(self.OBJCTCMD, logact)
ocmd = self.OBJCTCMD
cmd = "{} go -k {} -b {}".format(ocmd, fromfile, bucket)
fromname = op.basename(fromfile)
toname = op.basename(tofile)
Expand Down Expand Up @@ -608,7 +608,7 @@ def delete_remote_file(self, file, host, logact = 0):
# Delete a file on object store
def delete_object_file(self, file, bucket = None, logact = 0):
if not bucket: bucket = self.PGLOG['OBJCTBKT']
ocmd = self.valid_command(self.OBJCTCMD, logact)
ocmd = self.OBJCTCMD
for loop in range(2):
list = self.object_glob(file, bucket, 0, logact)
if not list: return self.FAILURE
Expand All @@ -628,7 +628,7 @@ def delete_backup_file(self, file, endpoint = None, logact = 0):
if not endpoint: endpoint = self.PGLOG['BACKUPEP']
info = self.check_backup_file(file, endpoint, 0, logact)
if not info: return self.FAILURE
bcmd = self.valid_command(self.BACKCMD, logact)
bcmd = self.BACKCMD
cmd = f"{bcmd} delete -ep {endpoint} -tf {file}"
task = self.submit_globus_task(cmd, endpoint, logact)
if task['stat'] == 'S':
Expand Down Expand Up @@ -782,7 +782,7 @@ def move_object_file(self, tofile, fromfile, tobucket, frombucket, logact = 0):
return self.errlog("{}-{}: Object File exists, cannot move {}-{} to it".format(tobucket, tofile, frombucket, fromfile), 'R', 1, logact)
elif tinfo != None:
return self.FAILURE
ocmd = self.valid_command(self.OBJCTCMD, logact)
ocmd = self.OBJCTCMD
cmd = "{} mv -b {} -db {} -k {} -dk {}".format(ocmd, frombucket, tobucket, fromfile, tofile)
ucmd = "{} gm -k {} -b {}".format(ocmd, fromfile, frombucket)
ubuf = self.pgsystem(ucmd, self.LOGWRN, self.CMDRET)
Expand Down Expand Up @@ -816,7 +816,7 @@ def move_object_path(self, topath, frompath, tobucket, frombucket, logact = 0):
return self.SUCCESS
else:
return self.errlog("{}-{}: {} to move".format(frombucket, frompath, self.PGLOG['MISSFILE']), 'R', 1, logact)
ocmd = self.valid_command(self.OBJCTCMD, logact)
ocmd = self.OBJCTCMD
cmd = "{} mv -b {} -db {} -k {} -dk {}".format(ocmd, frombucket, tobucket, frompath, topath)
for loop in range(2):
buf = self.pgsystem(cmd, logact, self.CMDBTH)
Expand Down Expand Up @@ -846,7 +846,7 @@ def move_backup_file(self, tofile, fromfile, endpoint = None, logact = 0):
return self.errlog("{}: File exists, cannot move {} to it".format(tofile, fromfile), 'B', 1, logact)
elif tinfo != None:
return ret
bcmd = self.valid_command(self.BACKCMD, logact)
bcmd = self.BACKCMD
cmd = f"{bcmd} rename -ep {endpoint} --old-path {fromfile} --new-path {tofile}"
loop = 0
while loop < 2:
Expand Down Expand Up @@ -945,7 +945,7 @@ def make_one_backup_directory(self, dir, odir, endpoint = None, logact = 0):
return self.FAILURE
if not odir: odir = dir
if not self.make_one_backup_directory(op.dirname(dir), odir, endpoint, logact): return self.FAILURE
bcmd = self.valid_command(self.BACKCMD, logact)
bcmd = self.BACKCMD
cmd = f"{bcmd} mkdir -ep {endpoint} -p {dir}"
for loop in range(2):
buf = self.pgsystem(cmd, logact, self.CMDRET)
Expand Down Expand Up @@ -1421,7 +1421,7 @@ def check_object_file(self, file, bucket = None, opt = 0, logact = 0):
if not file: return ret
ms = re.match(r'^(.+)/$', file)
if ms: file = ms.group(1) # remove ending '/' in case
ocmd = self.valid_command(self.OBJCTCMD, logact)
ocmd = self.OBJCTCMD
cmd = "{} lo {} -b {}".format(ocmd, file, bucket)
ucmd = "{} gm -k {} -b {}".format(ocmd, file, bucket) if opt&14 else None
loop = 0
Expand Down Expand Up @@ -1466,7 +1466,7 @@ def check_object_path(self, path, bucket = None, logact = 0):
if not bucket: bucket = self.PGLOG['OBJCTBKT']
ret = None
if not path: return ret
ocmd = self.valid_command(self.OBJCTCMD, logact)
ocmd = self.OBJCTCMD
cmd = "{} lo {} -ls -b {}".format(ocmd, path, bucket)
loop = 0
while loop < 2:
Expand Down Expand Up @@ -1520,7 +1520,7 @@ def check_backup_file(self, file, endpoint = None, opt = 0, logact = 0):
if not endpoint: endpoint = self.PGLOG['BACKUPEP']
bdir = op.dirname(file)
bfile = op.basename(file)
bcmd = self.valid_command(self.BACKCMD, logact)
bcmd = self.BACKCMD
cmd = f"{bcmd} ls -ep {endpoint} -p {bdir} --filter {bfile}"
ccnt = loop = 0
while loop < 2:
Expand Down Expand Up @@ -1781,7 +1781,7 @@ def object_glob(self, dir, bucket = None, opt = 0, logact = 0):
if not bucket: bucket = self.PGLOG['OBJCTBKT']
ms = re.match(r'^(.+)/$', dir)
if ms: dir = ms.group(1)
ocmd = self.valid_command(self.OBJCTCMD, logact)
ocmd = self.OBJCTCMD
cmd = "{} lo {} -b {}".format(ocmd, dir, bucket)
ary = err = None
buf = self.pgsystem(cmd, self.LOGWRN, self.CMDRET)
Expand Down Expand Up @@ -1820,7 +1820,7 @@ def object_glob(self, dir, bucket = None, opt = 0, logact = 0):
def backup_glob(self, dir, endpoint = None, opt = 0, logact = 0):
if not dir: return None
if not endpoint: endpoint = self.PGLOG['BACKUPEP']
bcmd = self.valid_command(self.BACKCMD, logact)
bcmd = self.BACKCMD
cmd = f"{bcmd} ls -ep {endpoint} -p {dir}"
flist = {}
for loop in range(2):
Expand Down
17 changes: 14 additions & 3 deletions src/rda_python_common/pg_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,10 +670,11 @@ def pgsystem(self, pgcmd, logact = None, cmdopt = 5, instr = None, seconds = 0):
end = time.time()
last = end - last
if error:
cmdpstr = self.command_path(cmdstr)
if ret == self.FAILURE:
error = "Error Execute: {}\n{}".format(cmdstr, error)
error = "Error Execute: {}\n{}".format(cmdpstr, error)
else:
error = "Error From: {}\n{}".format(cmdstr, error)
error = "Error From: {}\n{}".format(cmdpstr, error)
if loop > 1: error = "Retry "
if cmdopt&256: self.PGLOG['SYSERR'] += error
if cmdopt&4:
Expand All @@ -691,7 +692,7 @@ def pgsystem(self, pgcmd, logact = None, cmdopt = 5, instr = None, seconds = 0):
retbuf = ''
return (retbuf if cmdopt&16 else ret)

# strip carrage return '\r', but keep ending newline '\n'
# strip carriage return '\r', but keep ending newline '\n'
@staticmethod
def strip_output_line(line):
ms = re.search(r'\r([^\r]+)\r*$', line)
Expand Down Expand Up @@ -865,6 +866,16 @@ def valid_command(self, cmd, logact = 0):
self.COMMANDS[cmd] = buf
return self.COMMANDS[cmd]

# get full command path if possible
def command_path(self, cmdstr):
if not cmdstr: return ''
ary = cmdstr.split(' ', 1)
cmd = ary[0]
if re.search(r'[\\/]', cmd): return cmdstr
optstr = (' ' + ary[1]) if len(ary) > 1 else ''
pcmd = shutil.which(cmd)
return (pcmd+optstr) if pcmd else cmdstr

# add carbon copies to self.PGLOG['CCDADDR']
def add_carbon_copy(self, cc = None, isstr = None, exclude = 0, specialist = None):
if not cc:
Expand Down