Skip to content

Commit

Permalink
Merge d824bd5 into fea07cc
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Aug 18, 2022
2 parents fea07cc + d824bd5 commit f60f9c2
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 11 deletions.
6 changes: 6 additions & 0 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import inspect
import json
import logging
import os
import pickle

from typing import Callable
Expand Down Expand Up @@ -272,6 +273,11 @@ def initialize(self, **kwargs):
"""
BarrierAppDROP.initialize(self, **kwargs)

env = os.environ.copy()
env.update({"DLG_UID": self._uid})
if self._dlg_session:
env.update({"DLG_SESSION_ID": self._dlg_session.sessionId})

self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})

self.func_code = self._popArg(kwargs, "func_code", None)
Expand Down
160 changes: 159 additions & 1 deletion daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,13 +1162,13 @@ def get_dir(self, dirname):
parts = []
if self._dlg_session:
parts.append(".")
parts.append(self._dlg_session.sessionId)
else:
parts.append("/tmp/daliuge_tfiles")
if dirname:
parts.append(dirname)

the_dir = os.path.abspath(os.path.normpath(os.path.join(*parts)))
logger.debug("Path used for drop: %s", the_dir)
createDirIfMissing(the_dir)
return the_dir

Expand Down Expand Up @@ -1561,6 +1561,164 @@ def exists(self):
return any([c.exists() for c in self._children])
return True

##
# TODO: This needs some more work
# @brief Directory
# @details A ContainerDROP that represents a filesystem directory. It only allows
# FileDROPs and DirectoryContainers to be added as children. Children
# can only be added if they are placed directly within the directory
# represented by this DirectoryContainer.
# @par EAGLE_START
# @param category Directory
# @param tag future
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param check_exists Check path exists/True/Boolean/ComponentParameter/readwrite//False/False/Perform a check to make sure the file path exists before proceeding with the application
# @param dirname Directory name//String/ComponentParameter/readwrite//False/False/"Directory name/path"
# @param dummy dummy//String/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END

class DirectoryContainer(PathBasedDrop, ContainerDROP):
"""
A ContainerDROP that represents a filesystem directory. It only allows
FileDROPs and DirectoryContainers to be added as children. Children
can only be added if they are placed directly within the directory
represented by this DirectoryContainer.
"""

check_exists = dlg_bool_param("check_exists", True)

def initialize(self, **kwargs):
ContainerDROP.initialize(self, **kwargs)

if "dirname" not in kwargs:
raise InvalidDropException(
self, 'DirectoryContainer needs a "dirname" parameter'
)

directory = kwargs["dirname"]

logger.debug("Checking existence of %s %s", directory, self.check_exists)
if "check_exists" in kwargs and kwargs["check_exists"] is True:
if not os.path.isdir(directory):
raise InvalidDropException(self, "%s is not a directory" % (directory))

self._path = self.get_dir(directory)

def addChild(self, child):
if isinstance(child, (FileDROP, DirectoryContainer)):
path = child.path
if os.path.dirname(path) != self.path:
raise InvalidRelationshipException(
DROPRel(child, DROPLinkType.CHILD, self),
"Child DROP is not under %s" % (self.path),
)
ContainerDROP.addChild(self, child)
else:
raise TypeError("Child DROP is not of type FileDROP or DirectoryContainer")

def delete(self):
shutil.rmtree(self._path)

def exists(self):
return os.path.isdir(self._path)


##
# @brief Plasma
# @details An object in a Apache Arrow Plasma in-memory object store
# @par EAGLE_START
# @param category Plasma
# @param tag daliuge
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param plasma_path Plasma Path//String/ComponentParameter/readwrite//False/False/Path to the local plasma store
# @param object_id Object Id//String/ComponentParameter/readwrite//False/False/PlasmaId of the object for all compute nodes
# @param use_staging Use Staging/False/Boolean/ComponentParameter/readwrite//False/False/Enables writing to a dynamically resizeable staging buffer
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
class PlasmaDROP(DataDROP):
"""
A DROP that points to data stored in a Plasma Store
"""

object_id: bytes = dlg_string_param("object_id", None)
plasma_path: str = dlg_string_param("plasma_path", "/tmp/plasma")
use_staging: bool = dlg_bool_param("use_staging", False)

def initialize(self, **kwargs):
super().initialize(**kwargs)
self.plasma_path = os.path.expandvars(self.plasma_path)
if self.object_id is None:
self.object_id = (
np.random.bytes(20) if len(self.uid) != 20 else self.uid.encode("ascii")
)
elif isinstance(self.object_id, str):
self.object_id = self.object_id.encode("ascii")

def getIO(self):
return PlasmaIO(
plasma.ObjectID(self.object_id),
self.plasma_path,
expected_size=self._expectedSize,
use_staging=self.use_staging,
)

@property
def dataURL(self) -> str:
return "plasma://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


##
# @brief PlasmaFlight
# @details An Apache Arrow Flight server providing distributed access
# to a Plasma in-memory object store
# @par EAGLE_START
# @param category PlasmaFlight
# @param tag daliuge
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param plasma_path Plasma Path//String/ComponentParameter/readwrite//False/False/Path to the local plasma store
# @param object_id Object Id//String/ComponentParameter/readwrite//False/False/PlasmaId of the object for all compute nodes
# @param flight_path Flight Path//String/ComponentParameter/readwrite//False/False/IP and flight port of the drop owner
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
class PlasmaFlightDROP(DataDROP):
"""
A DROP that points to data stored in a Plasma Store
"""

object_id: bytes = dlg_string_param("object_id", None)
plasma_path: str = dlg_string_param("plasma_path", "/tmp/plasma")
flight_path: str = dlg_string_param("flight_path", None)
use_staging: bool = dlg_bool_param("use_staging", False)

def initialize(self, **kwargs):
super().initialize(**kwargs)
self.plasma_path = os.path.expandvars(self.plasma_path)
if self.object_id is None:
self.object_id = (
np.random.bytes(20) if len(self.uid) != 20 else self.uid.encode("ascii")
)
elif isinstance(self.object_id, str):
self.object_id = self.object_id.encode("ascii")

def getIO(self):
return PlasmaFlightIO(
plasma.ObjectID(self.object_id),
self.plasma_path,
flight_path=self.flight_path,
expected_size=self._expectedSize,
use_staging=self.use_staging,
)

@property
def dataURL(self) -> str:
return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


# ===============================================================================
# AppDROP classes follow
# ===============================================================================
Expand Down
24 changes: 18 additions & 6 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ def __init__(self, sessionId, nm=None):
self._graphreprodata = None
self._reprofinished = False

# create the session directory and change CWD
self._sessionDir = f"{utils.getDlgWorkDir()}/{sessionId}"
logger.debug("Creating session directory: %s", self._sessionDir)
createDirIfMissing(self._sessionDir)
os.chdir(self._sessionDir)

logger.debug("Updating ENV with SESSION_ID: %s", sessionId)
os.environ.update({"DLG_SESSION_ID": sessionId})

class SessionFilter(logging.Filter):
def __init__(self, sessionId):
self.sessionId = sessionId
Expand All @@ -177,10 +186,10 @@ def filter(self, record):
fmt = logging.Formatter(fmt)
fmt.converter = time.gmtime

logdir = utils.getDlgLogsDir()
if self._nm is not None:
logdir = self._nm.logdir
logfile = generateLogFileName(logdir, self.sessionId)
# logdir = utils.getDlgLogsDir()
# if self._nm is not None:
# logdir = self._nm.logdir
logfile = generateLogFileName(self._sessionDir, self.sessionId)
try:
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setFormatter(fmt)
Expand All @@ -195,6 +204,10 @@ def filter(self, record):
def sessionId(self):
return self._sessionId

@property
def sessionDir(self):
return self._sessionDir

@property
def status(self):
with self._statusLock:
Expand Down Expand Up @@ -227,8 +240,7 @@ def reprostatus(self, status):
self._reprofinished = status

def write_reprodata(self):
parts = [utils.getDlgLogsDir(), self._sessionId]
the_dir = os.path.abspath(os.path.normpath(os.path.join(*parts)))
the_dir = self._sessionDir
createDirIfMissing(the_dir)
the_path = os.path.join(the_dir, "reprodata.out")
with open(the_path, "w+", encoding="utf-8") as file:
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/test/lifecycle/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import os
import sqlite3
import unittest
import tempfile

from dlg.data.memory import InMemoryDROP
from dlg.lifecycle.registry import RDBMSRegistry


DBFILE = "testing_dlm.db"

DBFILE = tempfile.mktemp()

class TestRDBMSRegistry(unittest.TestCase):
def setUp(self):
Expand Down
2 changes: 0 additions & 2 deletions tools/xml2palette/xml2palette.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,8 +1014,6 @@ def cleanString(text: str) -> str:
def parseCasaDocs(dStr:str) -> dict:
"""
Parse the special docstring for casatasks
"""
"""
Extract the parameters from the casatask doc string.
:param task: The casatask to derive the parameters from.
Expand Down

0 comments on commit f60f9c2

Please sign in to comment.