diff --git a/impactlab_tools/utils/paralog.py b/impactlab_tools/utils/paralog.py index 9087b0e..b0e742c 100644 --- a/impactlab_tools/utils/paralog.py +++ b/impactlab_tools/utils/paralog.py @@ -46,7 +46,7 @@ def __init__(self, jobname, jobtitle, logdir, timeout, exclusive_jobnames=[]): self.jobtitle = jobtitle self.timeout = timeout self.exclusive_jobnames = exclusive_jobnames - + # Decide on the name of the log file if not os.path.exists(logdir): os.makedirs(logdir) @@ -67,9 +67,11 @@ def __del__(self): def claim(self, dirpath): """Claim a directory.""" - if self.is_claimed(dirpath): + if not os.path.exists(dirname): + os.makedirs(os.path.abspath(dirname)) + elif self.is_claimed(dirpath): return False - + status_path = StatusManager.claiming_filepath(dirpath, self.jobname) with open(status_path, 'w') as fp: fp.write("%d %s: %s\n" % (os.getpid(), self.jobtitle, self.logpath)) @@ -81,7 +83,7 @@ def update(self, dirpath, status): status_path = StatusManager.claiming_filepath(dirpath, self.jobname) with open(status_path, 'a') as fp: fp.write(status + '\n') - + def release(self, dirpath, status): """Release the claim on this directory.""" os.remove(StatusManager.claiming_filepath(dirpath, self.jobname)) @@ -91,6 +93,9 @@ def release(self, dirpath, status): def is_claimed(self, dirname): """Check if a directory has claims from any of our jobs.""" + if not os.path.exists(dirname): + return False + for jobname in [self.jobname] + self.exclusive_jobnames: filepath = StatusManager.claiming_filepath(dirname, jobname) if os.path.exists(filepath): @@ -104,7 +109,7 @@ def is_claimed(self, dirname): def globalstatus_filepath(dirpath): """The path to the global status for the directory.""" return StatusManager.claiming_filepath(dirpath, 'global') - + @staticmethod def claiming_filepath(dirpath, jobname): """Return the path to the status file used for claiming a directory.""" @@ -117,7 +122,7 @@ def kill_active(dirpath, jobname): if not os.path.exists(filepath): return - + with open(filepath, 'r') as fp: status = fp.read() pid = int(status.split()[0]) @@ -137,37 +142,9 @@ def write(self, message): def close(self): self.log.close() - + def flush(self): #this flush method is needed for python 3 compatibility. #this handles the flush command by doing nothing. #you might want to specify some extra behavior here. pass - -if __name__ == '__main__': - statman1 = StatusManager('test', 'Testing process', '/shares/gcp/temp', 60*60) - print statman1.logpath - assert statman1.claim("/shares/gcp/temp"), "Cannot claim directory!" - - statman2 = StatusManager('test', 'Testing process', '/shares/gcp/temp', 60*60) - print statman2.logpath - assert not statman2.claim("/shares/gcp/temp"), "Accidentally claimed directory!" - - statman1.update("/shares/gcp/temp", "New status.") - statman1.release("/shares/gcp/temp", "First pass complete.") - - assert statman2.claim("/shares/gcp/temp"), "Cannot claim directory afterwards!" - - statman2.release("/shares/gcp/temp", "Second pass complete.") - - logpath1 = statman1.logpath - logpath2 = statman2.logpath - - del statman2 # need to delete in opposite order for our test - del statman1 - - print logpath1 - print logpath2 - - with open(StatusManager.globalstatus_filepath("/shares/gcp/temp"), 'r') as fp: - print fp.read() diff --git a/tests/utils/test_paralog.py b/tests/utils/test_paralog.py new file mode 100644 index 0000000..49a0c37 --- /dev/null +++ b/tests/utils/test_paralog.py @@ -0,0 +1,33 @@ +import pytest +import shutil +from impactlab_tools.utils import paralog + +def test_claiming(): + statman1 = StatusManager('test', 'Testing process', 'testing-paralog', 60*60) + print statman1.logpath + assert statman1.claim("testing-paralog"), "Cannot claim directory!" + + statman2 = StatusManager('test', 'Testing process', 'testing-paralog', 60*60) + print statman2.logpath + assert not statman2.claim("testing-paralog"), "Accidentally claimed directory!" + + statman1.update("testing-paralog", "New status.") + statman1.release("testing-paralog", "First pass complete.") + + assert statman2.claim("testing-paralog"), "Cannot claim directory afterwards!" + + statman2.release("testing-paralog", "Second pass complete.") + + logpath1 = statman1.logpath + logpath2 = statman2.logpath + + del statman2 # need to delete in opposite order for our test + del statman1 + + print logpath1 + print logpath2 + + with open(StatusManager.globalstatus_filepath("testing-paralog"), 'r') as fp: + print fp.read() + + shutil.rmtree('testing-paralog')