Skip to content

Commit

Permalink
Merge 5f9ab05 into 66dd9be
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Aug 15, 2022
2 parents 66dd9be + 5f9ab05 commit 1950ae8
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 15 deletions.
6 changes: 6 additions & 0 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import inspect
import json
import logging
import os
import pickle

from typing import Callable
Expand Down Expand Up @@ -272,6 +273,11 @@ def initialize(self, **kwargs):
"""
BarrierAppDROP.initialize(self, **kwargs)

env = os.environ.copy()
env.update({"DLG_UID": self._uid})
if self._dlg_session:
env.update({"DLG_SESSION_ID": self._dlg_session.sessionId})

self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})

self.func_code = self._getArg(kwargs, "func_code", None)
Expand Down
21 changes: 19 additions & 2 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,13 +1182,13 @@ def get_dir(self, dirname):
parts = []
if self._dlg_session:
parts.append(".")
parts.append(self._dlg_session.sessionId)
else:
parts.append("/tmp/daliuge_tfiles")
if dirname:
parts.append(dirname)

the_dir = os.path.abspath(os.path.normpath(os.path.join(*parts)))
logger.debug("Path used for drop: %s", the_dir)
createDirIfMissing(the_dir)
return the_dir

Expand Down Expand Up @@ -2122,6 +2122,22 @@ def exists(self):
return any([c.exists() for c in self._children])
return True

##
# TODO: This needs some more work
# @brief Directory
# @details A ContainerDROP that represents a filesystem directory. It only allows
# FileDROPs and DirectoryContainers to be added as children. Children
# can only be added if they are placed directly within the directory
# represented by this DirectoryContainer.
# @par EAGLE_START
# @param category Directory
# @param tag future
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param check_exists Check path exists/True/Boolean/ComponentParameter/readwrite//False/False/Perform a check to make sure the file path exists before proceeding with the application
# @param dirname Directory name//String/ComponentParameter/readwrite//False/False/"Directory name/path"
# @param dummy dummy//String/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END

class DirectoryContainer(PathBasedDrop, ContainerDROP):
"""
Expand All @@ -2143,7 +2159,8 @@ def initialize(self, **kwargs):

directory = kwargs["dirname"]

if self.check_exists is True:
logger.debug("Checking existence of %s %s", directory, self.check_exists)
if "check_exists" in kwargs and kwargs["check_exists"] is True:
if not os.path.isdir(directory):
raise InvalidDropException(self, "%s is not a directory" % (directory))

Expand Down
24 changes: 18 additions & 6 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ def __init__(self, sessionId, nm=None):
self._graphreprodata = None
self._reprofinished = False

# create the session directory and change CWD
self._sessionDir = f"{utils.getDlgWorkDir()}/{sessionId}"
logger.debug("Creating session directory: %s", self._sessionDir)
createDirIfMissing(self._sessionDir)
os.chdir(self._sessionDir)

logger.debug("Updating ENV with SESSION_ID: %s", sessionId)
os.environ.update({"DLG_SESSION_ID": sessionId})

class SessionFilter(logging.Filter):
def __init__(self, sessionId):
self.sessionId = sessionId
Expand All @@ -177,10 +186,10 @@ def filter(self, record):
fmt = logging.Formatter(fmt)
fmt.converter = time.gmtime

logdir = utils.getDlgLogsDir()
if self._nm is not None:
logdir = self._nm.logdir
logfile = generateLogFileName(logdir, self.sessionId)
# logdir = utils.getDlgLogsDir()
# if self._nm is not None:
# logdir = self._nm.logdir
logfile = generateLogFileName(self._sessionDir, self.sessionId)
try:
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setFormatter(fmt)
Expand All @@ -195,6 +204,10 @@ def filter(self, record):
def sessionId(self):
return self._sessionId

@property
def sessionDir(self):
return self._sessionDir

@property
def status(self):
with self._statusLock:
Expand Down Expand Up @@ -227,8 +240,7 @@ def reprostatus(self, status):
self._reprofinished = status

def write_reprodata(self):
parts = [utils.getDlgLogsDir(), self._sessionId]
the_dir = os.path.abspath(os.path.normpath(os.path.join(*parts)))
the_dir = self._sessionDir
createDirIfMissing(the_dir)
the_path = os.path.join(the_dir, "reprodata.out")
with open(the_path, "w+", encoding="utf-8") as file:
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/test/lifecycle/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import os
import sqlite3
import unittest
import tempfile

from dlg.drop import InMemoryDROP
from dlg.lifecycle.registry import RDBMSRegistry


DBFILE = "testing_dlm.db"

DBFILE = tempfile.mktemp()

class TestRDBMSRegistry(unittest.TestCase):
def setUp(self):
Expand Down
11 changes: 6 additions & 5 deletions tools/xml2palette/xml2palette.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,9 @@ def process_compounddef_default(compounddef, language):
if child.tag == "detaileddescription" and len(child) > 0 and casa_mode:
# for child in ggchild:
dStr = child[0][0].text
descDict = parseCasaDocs(dStr)
descDict, comp_description = parseCasaDocs(dStr)
member["params"].append({"key": "description", "direction": None, "value": comp_description})

pkeys = {p["key"]:i for i,p in enumerate(member["params"])}
for p in descDict.keys():
if p in pkeys:
Expand Down Expand Up @@ -1012,8 +1014,6 @@ def cleanString(text: str) -> str:
def parseCasaDocs(dStr:str) -> dict:
"""
Parse the special docstring for casatasks
"""
"""
Extract the parameters from the casatask doc string.
:param task: The casatask to derive the parameters from.
Expand All @@ -1027,7 +1027,7 @@ def parseCasaDocs(dStr:str) -> dict:
end_ind = [idx for idx, s in enumerate(dList) if '-- example' in s][0]
except IndexError:
logging.debug('Problems finding start or end index for task: {task}')
return {}
return {}, ""
paramsList = dList[start_ind:end_ind]
paramsSidx = [idx+1 for idx, p in enumerate(paramsList) if len(p) > 0 and p[0] != ' ']
paramsEidx = paramsSidx[1:] + [len(paramsList) - 1]
Expand All @@ -1040,7 +1040,8 @@ def parseCasaDocs(dStr:str) -> dict:
pl = [p.strip() for p in paramsList[paramsSidx[i]:paramsEidx[i]-1] if len(p.strip()) > 0]
paramDocs[i] = paramDocs[i] + ' '+' '.join(pl)
params = dict(zip(paramNames, paramDocs))
return params
comp_description = "\n".join(dList[:start_ind-1]) # return main description as well
return params, comp_description


if __name__ == "__main__":
Expand Down

0 comments on commit 1950ae8

Please sign in to comment.