Skip to content

Commit

Permalink
Removes duplicate drop implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed Aug 18, 2022
1 parent d824bd5 commit a230189
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 159 deletions.
23 changes: 21 additions & 2 deletions daliuge-engine/dlg/data/directorycontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import logging
import os
import shutil

Expand All @@ -28,7 +29,24 @@
from dlg.exceptions import InvalidDropException, InvalidRelationshipException
from dlg.meta import dlg_bool_param

logger = logging.getLogger(__name__)

##
# TODO: This needs some more work
# @brief Directory
# @details A ContainerDROP that represents a filesystem directory. It only allows
# FileDROPs and DirectoryContainers to be added as children. Children
# can only be added if they are placed directly within the directory
# represented by this DirectoryContainer.
# @par EAGLE_START
# @param category Directory
# @param tag future
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param check_exists Check path exists/True/Boolean/ComponentParameter/readwrite//False/False/Perform a check to make sure the file path exists before proceeding with the application
# @param dirname Directory name//String/ComponentParameter/readwrite//False/False/"Directory name/path"
# @param dummy dummy//String/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
class DirectoryContainer(PathBasedDrop, ContainerDROP):
"""
A ContainerDROP that represents a filesystem directory. It only allows
Expand All @@ -49,7 +67,8 @@ def initialize(self, **kwargs):

directory = kwargs["dirname"]

if self.check_exists is True:
logger.debug("Checking existence of %s %s", directory, self.check_exists)
if "check_exists" in kwargs and kwargs["check_exists"] is True:
if not os.path.isdir(directory):
raise InvalidDropException(self, "%s is not a directory" % (directory))

Expand All @@ -71,4 +90,4 @@ def delete(self):
shutil.rmtree(self._path)

def exists(self):
return os.path.isdir(self._path)
return os.path.isdir(self._path)
157 changes: 0 additions & 157 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1561,163 +1561,6 @@ def exists(self):
return any([c.exists() for c in self._children])
return True

##
# TODO: This needs some more work
# @brief Directory
# @details A ContainerDROP that represents a filesystem directory. It only allows
# FileDROPs and DirectoryContainers to be added as children. Children
# can only be added if they are placed directly within the directory
# represented by this DirectoryContainer.
# @par EAGLE_START
# @param category Directory
# @param tag future
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param check_exists Check path exists/True/Boolean/ComponentParameter/readwrite//False/False/Perform a check to make sure the file path exists before proceeding with the application
# @param dirname Directory name//String/ComponentParameter/readwrite//False/False/"Directory name/path"
# @param dummy dummy//String/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END

class DirectoryContainer(PathBasedDrop, ContainerDROP):
"""
A ContainerDROP that represents a filesystem directory. It only allows
FileDROPs and DirectoryContainers to be added as children. Children
can only be added if they are placed directly within the directory
represented by this DirectoryContainer.
"""

check_exists = dlg_bool_param("check_exists", True)

def initialize(self, **kwargs):
ContainerDROP.initialize(self, **kwargs)

if "dirname" not in kwargs:
raise InvalidDropException(
self, 'DirectoryContainer needs a "dirname" parameter'
)

directory = kwargs["dirname"]

logger.debug("Checking existence of %s %s", directory, self.check_exists)
if "check_exists" in kwargs and kwargs["check_exists"] is True:
if not os.path.isdir(directory):
raise InvalidDropException(self, "%s is not a directory" % (directory))

self._path = self.get_dir(directory)

def addChild(self, child):
if isinstance(child, (FileDROP, DirectoryContainer)):
path = child.path
if os.path.dirname(path) != self.path:
raise InvalidRelationshipException(
DROPRel(child, DROPLinkType.CHILD, self),
"Child DROP is not under %s" % (self.path),
)
ContainerDROP.addChild(self, child)
else:
raise TypeError("Child DROP is not of type FileDROP or DirectoryContainer")

def delete(self):
shutil.rmtree(self._path)

def exists(self):
return os.path.isdir(self._path)


##
# @brief Plasma
# @details An object in a Apache Arrow Plasma in-memory object store
# @par EAGLE_START
# @param category Plasma
# @param tag daliuge
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param plasma_path Plasma Path//String/ComponentParameter/readwrite//False/False/Path to the local plasma store
# @param object_id Object Id//String/ComponentParameter/readwrite//False/False/PlasmaId of the object for all compute nodes
# @param use_staging Use Staging/False/Boolean/ComponentParameter/readwrite//False/False/Enables writing to a dynamically resizeable staging buffer
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
class PlasmaDROP(DataDROP):
"""
A DROP that points to data stored in a Plasma Store
"""

object_id: bytes = dlg_string_param("object_id", None)
plasma_path: str = dlg_string_param("plasma_path", "/tmp/plasma")
use_staging: bool = dlg_bool_param("use_staging", False)

def initialize(self, **kwargs):
super().initialize(**kwargs)
self.plasma_path = os.path.expandvars(self.plasma_path)
if self.object_id is None:
self.object_id = (
np.random.bytes(20) if len(self.uid) != 20 else self.uid.encode("ascii")
)
elif isinstance(self.object_id, str):
self.object_id = self.object_id.encode("ascii")

def getIO(self):
return PlasmaIO(
plasma.ObjectID(self.object_id),
self.plasma_path,
expected_size=self._expectedSize,
use_staging=self.use_staging,
)

@property
def dataURL(self) -> str:
return "plasma://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


##
# @brief PlasmaFlight
# @details An Apache Arrow Flight server providing distributed access
# to a Plasma in-memory object store
# @par EAGLE_START
# @param category PlasmaFlight
# @param tag daliuge
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param plasma_path Plasma Path//String/ComponentParameter/readwrite//False/False/Path to the local plasma store
# @param object_id Object Id//String/ComponentParameter/readwrite//False/False/PlasmaId of the object for all compute nodes
# @param flight_path Flight Path//String/ComponentParameter/readwrite//False/False/IP and flight port of the drop owner
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
class PlasmaFlightDROP(DataDROP):
"""
A DROP that points to data stored in a Plasma Store
"""

object_id: bytes = dlg_string_param("object_id", None)
plasma_path: str = dlg_string_param("plasma_path", "/tmp/plasma")
flight_path: str = dlg_string_param("flight_path", None)
use_staging: bool = dlg_bool_param("use_staging", False)

def initialize(self, **kwargs):
super().initialize(**kwargs)
self.plasma_path = os.path.expandvars(self.plasma_path)
if self.object_id is None:
self.object_id = (
np.random.bytes(20) if len(self.uid) != 20 else self.uid.encode("ascii")
)
elif isinstance(self.object_id, str):
self.object_id = self.object_id.encode("ascii")

def getIO(self):
return PlasmaFlightIO(
plasma.ObjectID(self.object_id),
self.plasma_path,
flight_path=self.flight_path,
expected_size=self._expectedSize,
use_staging=self.use_staging,
)

@property
def dataURL(self) -> str:
return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode("ascii"))


# ===============================================================================
# AppDROP classes follow
Expand Down

0 comments on commit a230189

Please sign in to comment.