Skip to content

Commit

Permalink
Merge pull request #219 from ICRAR/liu-338
Browse files Browse the repository at this point in the history
Liu 338
  • Loading branch information
awicenec committed Feb 20, 2023
2 parents 22c8c6f + ebf564e commit 5bf5a47
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 106 deletions.
98 changes: 23 additions & 75 deletions daliuge-engine/dlg/data/drops/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param delete_parent_directory Delete parent directory/False/Boolean/ComponentParameter/readwrite//False/False/Also delete the parent directory of this file when deleting the file itself
# @param check_filepath_exists Check file path exists/False/Boolean/ComponentParameter/readwrite//False/False/Perform a check to make sure the file path exists before proceeding with the application
# @param filepath File Path//String/ComponentParameter/readwrite//False/False/Path to the file for this node
# @param dirname Directory name//String/ComponentParameter/readwrite//False/False/Path to the file for this node
# @param filepath File Path//String/ComponentParameter/readwrite//False/False/File path for this file. If it has a '/' at the end it will be treated as a directory name and the filename will the generated. If it does not have a '/', the last part will be treated as a filename. If `filepath` does not start with '/’ (relative path) then the session directory will be pre-pended to make the path absolute.
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/True/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
Expand All @@ -53,41 +52,14 @@ class FileDROP(DataDROP, PathBasedDrop):
"""
A DROP that points to data stored in a mounted filesystem.
Users can (but usually don't need to) specify both a `filepath` and a
`dirname` parameter for each FileDrop. The combination of these two parameters
will determine the final location of the file backed up by this drop on the
underlying filesystem. When no ``filepath`` is provided, the drop's UID will be
used as a filename. When a relative filepath is provided, it is relative to
``dirname``. When an absolute ``filepath`` is given, it is used as-is.
When a relative ``dirname`` is provided, it is relative to the base directory
of the currently running session (i.e., a directory with the session ID as a
name, placed within the currently working directory of the Node Manager
hosting that session). If ``dirname`` is absolute, it is used as-is.
In some cases drops are created **outside** the context of a session, most
notably during unit tests. In these cases the base directory is a fixed
location under ``/tmp``.
The following table summarizes the calculation of the final path used by
the ``FileDrop`` class depending on its parameters:
============ ===================== ===================== ==========
. filepath
------------ ------------------------------------------------------
dirname empty relative absolute
============ ===================== ===================== ==========
**empty** /``$B``/``$u`` /``$B``/``$f`` /``$f``
**relative** /``$B``/``$d``/``$u`` /``$B``/``$d``/``$f`` **ERROR**
**absolute** /``$d``/``$u`` /``$d``/``$f`` **ERROR**
============ ===================== ===================== ==========
In the table, ``$f`` is the value of ``filepath``, ``$d`` is the value of
``dirname``, ``$u`` is the drop's UID and ``$B`` is the base directory for
this drop's session, namely ``/the/cwd/$session_id``.
Users can fix both the path and the name of a FileDrop using the `filepath`
parameter for each FileDrop. We distinguish three cases and their combinations.
If it has a '/' at the end it will be treated as a directory name and the
filename will the generated. If it does not have a '/', the last part will be
treated as a filename. If `filepath` does not start with '/’ (relative path)
then the session directory will be pre-pended to make the path absolute.
"""

# filepath = dlg_string_param("filepath", None)
# dirname = dlg_string_param("dirname", None)
delete_parent_directory = dlg_bool_param("delete_parent_directory", False)
check_filepath_exists = dlg_bool_param("check_filepath_exists", False)

Expand All @@ -100,63 +72,39 @@ def __init__(self, *args, **kwargs):
kwargs["expireAfterUse"] = False
super().__init__(*args, **kwargs)

def sanitize_paths(self, filepath, dirname):
def sanitize_paths(self, filepath):

# first replace any ENV_VARS on the names
if filepath:
filepath = os.path.expandvars(filepath)
if dirname:
dirname = os.path.expandvars(dirname)
# No filepath has been given, there's nothing to sanitize
if not filepath:
return filepath, dirname

# All is good, return unchanged
filepath_b = os.path.basename(filepath)
if filepath_b == filepath:
return filepath, dirname

# Extract the dirname from filepath and append it to dirname
filepath_d = os.path.dirname(filepath)
if not isabs(filepath_d) and dirname:
filepath_d = os.path.join(dirname, filepath_d)
return filepath_b, filepath_d
# replace any ENV_VARS on the names
return os.path.expandvars(filepath) if filepath else None

non_fname_chars = re.compile(r":|%s" % os.sep)

def initialize(self, **kwargs):
"""
FileDROP-specific initialization.
"""
# filepath, dirpath the two pieces of information we offer users to tweak
# These are very intermingled but are not exactly the same, see below
self.filepath = self.parameters.get("filepath", None)
self.dirname = self.parameters.get("dirname", None)
# Duh!
if isabs(self.filepath) and self.dirname:
raise InvalidDropException(
self,
"An absolute filepath does not allow a dirname to be specified",
)

# Sanitize filepath/dirname into proper directories-only and
# filename-only components (e.g., dirname='lala' and filename='1/2'
# results in dirname='lala/1' and filename='2'
filepath, dirname = self.sanitize_paths(self.filepath, self.dirname)
filepath = self.sanitize_paths(self.filepath)
filename = os.path.basename(filepath) if filepath else None
dirname = os.path.dirname(filepath) if filepath else None
# We later check if the file exists, but only if the user has specified
# an absolute dirname/filepath (otherwise it doesn't make sense, since
# we create our own filenames/dirnames dynamically as necessary
check = False
if isabs(dirname) and filepath:
if isabs(dirname):
check = self.check_filepath_exists

# Default filepath to drop UID and dirname to per-session directory
if not filepath:
filepath = self.non_fname_chars.sub("_", self.uid)
dirname = self.get_dir(dirname)
# Default filename to drop UID
if not filename:
filename = self.non_fname_chars.sub("_", self.uid)
self.filename = filename
self.dirname = self.get_dir(dirname)

self._root = dirname
self._path = os.path.join(dirname, filepath)
self._root = self.dirname
self._path = os.path.join(
self.dirname, self.filename
) # put it back together again
logger.debug(f"Set path of drop {self._uid}: {self._path}")
if check and not os.path.isfile(self._path):
raise InvalidDropException(
Expand Down
10 changes: 5 additions & 5 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ def assertFiles(
a = FileDROP(
"a",
"a",
dirname=tempDir,
filepath=tempDir + "/",
delete_parent_directory=delete_parent_directory,
)
a.write(b" ")
Expand Down Expand Up @@ -819,10 +819,10 @@ def test_directoryContainer(self):
os.makedirs(dirname2)

# DROPs involved
a = FileDROP("a", "a", dirname=dirname)
b = FileDROP("b", "b", dirname=dirname)
c = FileDROP("c", "c", dirname=dirname2)
d = FileDROP("d", "d", dirname=dirname2)
a = FileDROP("a", "a", filepath=f"{dirname}/")
b = FileDROP("b", "b", filepath=f"{dirname}/")
c = FileDROP("c", "c", filepath=f"{dirname2}/")
d = FileDROP("d", "d", filepath=f"{dirname2}/")
cont1 = DirectoryContainer("e", "e", dirname=dirname)
cont2 = DirectoryContainer("f", "f", dirname=dirname2)

Expand Down
63 changes: 46 additions & 17 deletions daliuge-engine/test/test_environmentvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ def test_get(self):
self.assertEqual(3, env_drop.get("int_var"))
self.assertEqual(False, env_drop.get("bool_var"))
self.assertEqual(0.5, env_drop.get("float_var"))
self.assertEqual({"first": 1, "second": "sec"}, env_drop.get("dict_var"))
self.assertEqual(
{"first": 1, "second": "sec"}, env_drop.get("dict_var")
)
self.assertEqual([1, 2.0, "3"], env_drop.get("list_var"))
self.assertIsNone(env_drop.get("non_var"))
self.assertIsNone(env_drop.get("uid"))
Expand Down Expand Up @@ -113,23 +115,30 @@ def test_drop_get_single(self):
self.assertEqual(
"/HOME/", test_drop.get_environment_variable("$env_vars.dir_var")
)
self.assertEqual(3, test_drop.get_environment_variable("$env_vars.int_var"))
self.assertEqual(
3, test_drop.get_environment_variable("$env_vars.int_var")
)
self.assertEqual(
False, test_drop.get_environment_variable("$env_vars.bool_var")
)
self.assertEqual(0.5, test_drop.get_environment_variable("$env_vars.float_var"))
self.assertEqual(
0.5, test_drop.get_environment_variable("$env_vars.float_var")
)
self.assertEqual(
{"first": 1, "second": "sec"},
test_drop.get_environment_variable("$env_vars.dict_var"),
)
self.assertEqual(
[1, 2.0, "3"], test_drop.get_environment_variable("$env_vars.list_var")
[1, 2.0, "3"],
test_drop.get_environment_variable("$env_vars.list_var"),
)
self.assertEqual(
"$env_vars.non_var", test_drop.get_environment_variable("$env_vars.non_var")
"$env_vars.non_var",
test_drop.get_environment_variable("$env_vars.non_var"),
)
self.assertEqual(
"$env_vars.uid", test_drop.get_environment_variable("$env_vars.uid")
"$env_vars.uid",
test_drop.get_environment_variable("$env_vars.uid"),
)

def test_drop_get_multiple(self):
Expand Down Expand Up @@ -168,7 +177,9 @@ def test_drop_get_multiple(self):
extra_keys = ["dir_var", "$non_store.non_var"]
query_keys.extend(extra_keys)
expected_vars.extend(extra_keys)
self.assertEqual(expected_vars, test_drop.get_environment_variables(query_keys))
self.assertEqual(
expected_vars, test_drop.get_environment_variables(query_keys)
)

def test_drop_get_empty(self):
"""
Expand All @@ -195,24 +206,39 @@ def test_drop_get_multiEnv(self):
test_drop.addProducer(env1_drop)
test_drop.addProducer(env2_drop)
self.assertEqual(
"/HOME/", test_drop.get_environment_variable(f"${env1_name}.dir_var")
"/HOME/",
test_drop.get_environment_variable(f"${env1_name}.dir_var"),
)
self.assertEqual(
"/DIFFERENT/",
test_drop.get_environment_variable(f"${env2_name}.dir_var"),
)
self.assertEqual(
3, test_drop.get_environment_variable(f"${env1_name}.int_var")
)
self.assertEqual(
"/DIFFERENT/", test_drop.get_environment_variable(f"${env2_name}.dir_var")
4, test_drop.get_environment_variable(f"${env2_name}.int_var")
)
self.assertEqual(3, test_drop.get_environment_variable(f"${env1_name}.int_var"))
self.assertEqual(4, test_drop.get_environment_variable(f"${env2_name}.int_var"))
self.assertEqual(
f"{env1_name}.int_var",
test_drop.get_environment_variable(f"{env1_name}.int_var"),
)
self.assertEqual(f".int_var", test_drop.get_environment_variable(f".int_var"))
self.assertEqual(
f".int_var", test_drop.get_environment_variable(f".int_var")
)
self.assertEqual(
f"$third_env.int_var",
test_drop.get_environment_variable(f"$third_env.int_var"),
)
self.assertEqual(
["/HOME/", "/DIFFERENT/", 3, 4, f"${env1_name}.non_var", "$fake.var"],
[
"/HOME/",
"/DIFFERENT/",
3,
4,
f"${env1_name}.non_var",
"$fake.var",
],
test_drop.get_environment_variables(
[
f"${env1_name}.dir_var",
Expand Down Expand Up @@ -252,19 +278,22 @@ def test_get_dlg_vars(self):
)
test_drop.autofill_environment_variables()
self.assertEqual(getDlgDir(), test_drop.parameters["dlg_root"])
self.assertEqual(getDlgDir(), test_drop.get_environment_variable("$DLG_ROOT"))
self.assertEqual(
getDlgDir(), test_drop.get_environment_variable("$DLG_ROOT")
)
self.assertEqual("$DLG_NONEXISTS", test_drop.parameters["non_dlg_var"])
self.assertEqual(
"$DLG_NONEXISTS", test_drop.get_environment_variable("$DLG_NONEXISTS")
"$DLG_NONEXISTS",
test_drop.get_environment_variable("$DLG_NONEXISTS"),
)

def test_filename_integration(self):
with tempfile.TemporaryDirectory() as tmp_dir:
os.environ["DLG_ROOT"] = tmp_dir
os.environ["DLG_FILE"] = "test_file"
test_drop = FileDROP(
oid="a", uid="a", filepath="$DLG_FILE", dirname="$DLG_ROOT"
oid="a", uid="a", filepath="$DLG_ROOT/$DLG_FILE"
)
test_drop.write(b"1234")
self.assertEqual(tmp_dir, test_drop.dirname)
self.assertEqual("test_file", test_drop.filepath)
self.assertEqual("test_file", test_drop.filename)
2 changes: 1 addition & 1 deletion docs/architecture/drops.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ The |daliuge| framework uses Docker containers as its primary interface to 3rd p

The application programmer can make use of the :ref:`DockerApp <api.dlg.apps.dockerapp>` which is the interface between a Docker container and the Drop framework. Refer to the documentation for details.

Other applications not based on Docker containers can be written as well. Any application must derive at least from ``AppDrop``, but an easier-to-use base class is the ``BarrierAppDrop``, which simply requires a ``run`` method to be written by the developer (see :ref:`api.dlg.Drop` for details). |daliuge| ships with a set of pre-existing applications to perform common operations, like a TCP socket listener and a bash command executor, among others. See :ref:`api.dlg.apps` for more examples.
Other applications not based on Docker containers can be written as well. Any application must derive at least from ``AppDrop``, but an easier-to-use base class is the ``BarrierAppDrop``, which simply requires a ``run`` method to be written by the developer (see :ref:`api.dlg.Drop` for details). |daliuge| ships with a set of pre-existing applications to perform common operations, like a TCP socket listener and a bash command executor, among others. See :ref:`api.dlg.apps` for more examples. In addition we have developed a stand-alone tool (`dlg_paletteGen <https://icrar.github.io/dlg_paletteGen/>`_), which enables the automatic generation of |daliuge| compatible component descriptions from existing code. In this way it is possible to enable to usage of big existing public or propietary libraries of algorithms, like e.g. `Astropy <https://astropy.org>`_ within the |daliuge| eco-system.
8 changes: 3 additions & 5 deletions docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,11 @@ Docker components

(1) Memory Data Components can't be used directly as input or output of Docker components. However, it is possible to use Plasma/Flight as a shared memory mechansim.

(2) Parameters are not accessible to the application code inside the container. They are accessible to the component code and can be fed through, but that requires some dedicated code inside the container. That also includes command line parameters, although users can specify the complete command line in the dedicated configuration field. We are working on a solution to make this more generic.
(2) Care has to be taken when using files to exchange data between docker components and other components. In particular any usage of absolute path names is quite tricky to get working and requires cross-mounting of additional volumes. Although this is possible it is not recommended. The |daliuge| workspace directory is mounted by default in the container components as well.

(3) Care has to be taken when using files to exchange data between docker components and other components. In particular any usage of absolute path names is quite tricky to get working and requires cross-mounting of additional volumes. Although this is possible it is not recommended. The |daliuge| workspace directory is mounted by default in the container components as well.
(3) By default the docker containers are started with the same user/group ids as the user running the engine.

(4) Specifying a user as well as specifying a command in the configuration requires that /bin/bash is available in the container. In particular for minimized containers this is very often not the case. Again, we are working on a solution for that.

Note that it is not recommended to pack big external packages together with |daliuge| in a single image. The internal launch mechanism of docker components is under revision and we will keep this document up-to-date accordingly.
Note that it is not recommended to pack big external packages together with |daliuge| in a single image. We are using the `slimtoolkit <https://github.com/slimtoolkit/slim>`_ to minimize the size of our docker images and recommend doing this for component images as well.

Python components
~~~~~~~~~~~~~~~~~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ with a filesystem path.
File Drop
---------

:class:`FileDROP <dlg.drop.FileDROP>` is a highly compatibile data drop type that can be easily used as persistent volume I/O
:class:`FileDROP <dlg.drop.FileDROP>` is a highly compatible data drop type that can be easily used as persistent volume I/O
and inspection of individual app component I/O. The downside of using file drops is reduced I/O performance compared to
alternative memory based drops that can instead utilize buffer protocol.

Expand Down

0 comments on commit 5bf5a47

Please sign in to comment.