Skip to content

Commit

Permalink
Merge pull request #111 from ICRAR/yan-891
Browse files Browse the repository at this point in the history
Yan 891 - Support for default ENV variable store
  • Loading branch information
pritchardn committed Feb 22, 2022
2 parents 39fcb57 + 1fbf990 commit ba8820d
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 38 deletions.
26 changes: 14 additions & 12 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ def __init__(self, oid, uid, **kwargs):

# Useful to have access to all EAGLE parameters without a priori knowledge
self._parameters = dict(kwargs)

self.autofill_environment_variables()
kwargs.update(self._parameters)
# Sub-class initialization; mark ourselves as INITIALIZED after that
self.initialize(**kwargs)
self._status = (
Expand Down Expand Up @@ -624,17 +625,20 @@ def get_environment_variable(self, key: str):
return getDlgVariable(key)
if len(key) < 2 or key[0] != '$':
# Reject malformed entries
return None
return key
key_edit = key[1:]
env_var_ref, env_var_key = key_edit.split('.')[0], '.'.join(key_edit.split('.')[1:])
env_var_drop = None
for producer in self.producers:
for producer in self._producers:
if producer.name == env_var_ref:
env_var_drop = producer
if env_var_drop is not None: # TODO: Check for KeyValueDROP interface support
return env_var_drop.get(env_var_key)
ret_val = env_var_drop.get(env_var_key)
if ret_val is None:
return key
return ret_val
else:
return None
return key

def get_environment_variables(self, keys: list):
"""
Expand Down Expand Up @@ -1564,11 +1568,10 @@ class FileDROP(DataDROP, PathBasedDrop):
In the table, ``$f`` is the value of ``filepath``, ``$d`` is the value of
``dirname``, ``$u`` is the drop's UID and ``$B`` is the base directory for
this drop's session, namelly ``/the/cwd/$session_id``.
this drop's session, namely ``/the/cwd/$session_id``.
"""

filepath = dlg_string_param("filepath", None)
dirname = dlg_string_param("dirname", None)
# filepath = dlg_string_param("filepath", None)
# dirname = dlg_string_param("dirname", None)
delete_parent_directory = dlg_bool_param("delete_parent_directory", False)
check_filepath_exists = dlg_bool_param("check_filepath_exists", False)

Expand Down Expand Up @@ -1597,7 +1600,8 @@ def initialize(self, **kwargs):
"""
# filepath, dirpath the two pieces of information we offer users to tweak
# These are very intermingled but are not exactly the same, see below

self.filepath = self.parameters.get('filepath', None)
self.dirname = self.parameters.get('dirname', None)
# Duh!
if isabs(self.filepath) and self.dirname:
raise InvalidDropException(
Expand All @@ -1608,7 +1612,6 @@ def initialize(self, **kwargs):
# filename-only components (e.g., dirname='lala' and filename='1/2'
# results in dirname='lala/1' and filename='2'
filepath, dirname = self.sanitize_paths(self.filepath, self.dirname)

# We later check if the file exists, but only if the user has specified
# an absolute dirname/filepath (otherwise it doesn't make sense, since
# we create our own filenames/dirnames dynamically as necessary
Expand Down Expand Up @@ -1868,7 +1871,6 @@ def initialize(self, **kwargs):
self._buf = io.BytesIO(*args)

def getIO(self):
print(sys.version_info)
if sys.version_info >= (3, 8):
if hasattr(self, '_sessID'):
return SharedMemoryIO(self.oid, self._sessID)
Expand Down
10 changes: 8 additions & 2 deletions daliuge-engine/dlg/environmentvar_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from dlg.drop import AbstractDROP, DEFAULT_INTERNAL_PARAMETERS
from dlg.io import MemoryIO


class KeyValueDROP:

@abc.abstractmethod
Expand Down Expand Up @@ -82,7 +81,14 @@ def getIO(self):
return MemoryIO(io.BytesIO(json.dumps(self._variables).encode('utf-8')))

def get(self, key):
return self._variables.get(key)
"""
Fetches key from internal store if present.
If not present, attempts to fetch variable from environment
"""
value = self._variables.get(key)
if value is None:
value = os.environ.get(key)
return value

def get_multiple(self, keys: list):
return_vars = []
Expand Down
4 changes: 3 additions & 1 deletion daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
PlasmaDROP,
PlasmaFlightDROP
)
from .environmentvar_drop import EnvironmentVarDROP
from dlg.parset_drop import ParameterSetDROP
from .exceptions import InvalidGraphException
from .json_drop import JsonDROP
Expand All @@ -60,7 +61,8 @@
Categories.JSON: JsonDROP,
Categories.PLASMA: PlasmaDROP,
Categories.PLASMAFLIGHT: PlasmaFlightDROP,
Categories.PARSET: ParameterSetDROP
Categories.PARSET: ParameterSetDROP,
Categories.ENVIRONMENTVARS: EnvironmentVarDROP
}

try:
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def deploySession(self, sessionId, completedDrops=[]):
self._memoryManager.register_session(sessionId)

def foreach(drop):
drop.autofill_environment_variables()
if self._threadpool is not None:
drop._tp = self._threadpool
if isinstance(drop, InMemoryDROP):
Expand Down
20 changes: 14 additions & 6 deletions daliuge-engine/dlg/manager/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,15 @@ def filter(self, record):
if self._nm is not None:
logdir = self._nm.logdir
logfile = generateLogFileName(logdir, self.sessionId)
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setFormatter(fmt)
self.file_handler.addFilter(SessionFilter(self.sessionId))
logging.root.addHandler(self.file_handler)
try:
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setFormatter(fmt)
self.file_handler.addFilter(SessionFilter(self.sessionId))
logging.root.addHandler(self.file_handler)
except AttributeError as e:
print(e)
except FileNotFoundError as f:
print(f)

@property
def sessionId(self):
Expand Down Expand Up @@ -524,8 +529,11 @@ def getGraph(self):
return dict(self._graph)

def destroy(self):
self.file_handler.close()
logging.root.removeHandler(self.file_handler)
try:
self.file_handler.close()
logging.root.removeHandler(self.file_handler)
except AttributeError as e:
print(e)

__del__ = destroy

Expand Down
5 changes: 4 additions & 1 deletion daliuge-engine/dlg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ def getDlgVariable(key: str):
"""
if key == "$DLG_ROOT":
return getDlgDir()
return os.environ.get(key[1:])
value = os.environ.get(key[1:])
if value is None:
return key
return value


def createDirIfMissing(path):
Expand Down
47 changes: 31 additions & 16 deletions daliuge-engine/test/test_environmentvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#

import os
import tempfile
import unittest

from dlg.drop import AbstractDROP, FileDROP
from dlg.environmentvar_drop import EnvironmentVarDROP
from dlg.drop import AbstractDROP
from dlg.utils import getDlgDir


Expand Down Expand Up @@ -92,8 +94,9 @@ def test_drop_get_single(self):
self.assertEqual({'first': 1, 'second': 'sec'},
test_drop.get_environment_variable('$env_vars.dict_var'))
self.assertEqual([1, 2.0, '3'], test_drop.get_environment_variable('$env_vars.list_var'))
self.assertIsNone(test_drop.get_environment_variable('$env_vars.non_var'))
self.assertIsNone(test_drop.get_environment_variable('$env_vars.uid'))
self.assertEqual('$env_vars.non_var',
test_drop.get_environment_variable('$env_vars.non_var'))
self.assertEqual('$env_vars.uid', test_drop.get_environment_variable('$env_vars.uid'))

def test_drop_get_multiple(self):
"""
Expand All @@ -104,14 +107,15 @@ def test_drop_get_multiple(self):
env_drop = create_std_env_vars(name=env_name)
test_drop = AbstractDROP(uid='b', oid='b')
test_drop.addProducer(env_drop)
expected_vars = [None, '/HOME/', 3, False, 0.5, {'first': 1, 'second': 'sec'},
[1, 2.0, '3'], None]
expected_vars = [f'${env_name}.uid', '/HOME/', 3, False, 0.5, {'first': 1, 'second': 'sec'},
[1, 2.0, '3'], f'${env_name}.non_var']
query_keys = ['uid', 'dir_var', 'int_var', 'bool_var', 'float_var', 'dict_var', 'list_var',
'non_var']
query_keys = [f'${env_name}.{x}' for x in query_keys] # Build queries of the correct form
# Add some purposefully malformed vars
query_keys.extend(['dir_var', '$non_store.non_var'])
expected_vars.extend([None, None])
extra_keys = ['dir_var', '$non_store.non_var']
query_keys.extend(extra_keys)
expected_vars.extend(extra_keys)
self.assertEqual(expected_vars, test_drop.get_environment_variables(query_keys))

def test_drop_get_empty(self):
Expand All @@ -122,8 +126,8 @@ def test_drop_get_empty(self):
env_drop = create_empty_env_vars(name=env_name)
test_drop = AbstractDROP(uid='c', oid='c')
test_drop.addProducer(env_drop)
self.assertEqual(None, test_drop.get_environment_variable(''))
self.assertEqual(None, test_drop.get_environment_variable('$'))
self.assertEqual('', test_drop.get_environment_variable(''))
self.assertEqual('$', test_drop.get_environment_variable('$'))

def test_drop_get_multiEnv(self):
"""
Expand All @@ -142,10 +146,12 @@ def test_drop_get_multiEnv(self):
test_drop.get_environment_variable(f"${env2_name}.dir_var"))
self.assertEqual(3, test_drop.get_environment_variable(f"${env1_name}.int_var"))
self.assertEqual(4, test_drop.get_environment_variable(f"${env2_name}.int_var"))
self.assertIsNone(test_drop.get_environment_variable(f'{env1_name}.int_var'))
self.assertIsNone(test_drop.get_environment_variable(f'.int_var'))
self.assertIsNone(test_drop.get_environment_variable(f'$third_env.int_var'))
self.assertEqual(['/HOME/', '/DIFFERENT/', 3, 4, None, None],
self.assertEqual(f'{env1_name}.int_var',
test_drop.get_environment_variable(f'{env1_name}.int_var'))
self.assertEqual(f'.int_var', test_drop.get_environment_variable(f'.int_var'))
self.assertEqual(f'$third_env.int_var',
test_drop.get_environment_variable(f'$third_env.int_var'))
self.assertEqual(['/HOME/', '/DIFFERENT/', 3, 4, f'${env1_name}.non_var', '$fake.var'],
test_drop.get_environment_variables(
[f'${env1_name}.dir_var', f'${env2_name}.dir_var',
f'${env1_name}.int_var', f'${env2_name}.int_var',
Expand All @@ -170,5 +176,14 @@ def test_get_dlg_vars(self):
test_drop.autofill_environment_variables()
self.assertEqual(getDlgDir(), test_drop.parameters['dlg_root'])
self.assertEqual(getDlgDir(), test_drop.get_environment_variable('$DLG_ROOT'))
self.assertEqual(None, test_drop.parameters['non_dlg_var'])
self.assertEqual(None, test_drop.get_environment_variable('$DLG_NONEXISTS'))
self.assertEqual('$DLG_NONEXISTS', test_drop.parameters['non_dlg_var'])
self.assertEqual('$DLG_NONEXISTS', test_drop.get_environment_variable('$DLG_NONEXISTS'))

def test_filename_integration(self):
with tempfile.TemporaryDirectory() as tmp_dir:
os.environ['DLG_ROOT'] = tmp_dir
os.environ['DLG_FILE'] = 'test_file'
test_drop = FileDROP(oid='a', uid='a', filepath="$DLG_FILE", dirname="$DLG_ROOT")
test_drop.write(b"1234")
self.assertEqual(tmp_dir, test_drop.dirname)
self.assertEqual('test_file', test_drop.filepath)

0 comments on commit ba8820d

Please sign in to comment.