Skip to content

Commit

Permalink
Merge branch 'yan-1048-remove-memory-leaks'
Browse files Browse the repository at this point in the history
Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed Jun 29, 2022
2 parents 1576295 + 37c3f27 commit c762528
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 24 deletions.
34 changes: 16 additions & 18 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,6 @@ def optionalEval(x):
outputs[uid] = all_contents(drop) if self.output_parser is DropParser.PATH else None


self.funcargs = {}

# Keyword arguments are made up of the default values plus the inputs
# that match one of the keyword argument names
# if defaults dict has not been specified at all we'll go ahead anyway
Expand All @@ -435,7 +433,7 @@ def optionalEval(x):
if name in self.func_defaults or name not in argnames
}
logger.debug(f"updating funcargs with {kwargs}")
self.funcargs = kwargs
funcargs = kwargs

# Fill arguments with rest of inputs
logger.debug(f"available inputs: {inputs}")
Expand All @@ -445,7 +443,7 @@ def optionalEval(x):
logger.debug(f"Parameters found: {self.parameters}")
posargs = self.arguments.args[:self.fn_npos]
kwargs = {}
self.pargs = []
pargs = []
# Initialize pargs dictionary and update with provided argument values
pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs)))
if "applicationArgs" in self.parameters:
Expand Down Expand Up @@ -481,7 +479,7 @@ def optionalEval(x):
kwargs.update({self.arguments.args[i]: list(inputs.values())[i]})

logger.debug(f"Updating funcargs with input ports {kwargs}")
self.funcargs.update(kwargs)
funcargs.update(kwargs)

if ('outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict)):
out_names = [list(i.values())[0] for i in self.parameters['outputs']]
Expand Down Expand Up @@ -512,7 +510,7 @@ def optionalEval(x):
logger.debug("Identified keyword arguments removed: %s",
[i['text'] for i in _dum])
for pa in posargs:
if pa != 'self' and pa not in self.funcargs:
if pa != 'self' and pa not in funcargs:
if pa in appArgs:
arg = appArgs.pop(pa)
value = arg['value']
Expand All @@ -536,13 +534,13 @@ def optionalEval(x):
logger.debug("Identified positional arguments removed: %s",
[i['text'] for i in _dum])
logger.debug(f"updating posargs with {list(pargsDict.keys())}")
self.pargs.extend(list(pargsDict.values()))
pargs.extend(list(pargsDict.values()))

# Try to get values for still missing kwargs arguments from Application kws
kwargs = {}
kws = self.arguments.args[self.fn_npos:]
for ka in kws:
if ka not in self.funcargs:
if ka not in funcargs:
if ka in appArgs:
arg = appArgs.pop(ka)
value = arg['value']
Expand All @@ -559,13 +557,13 @@ def optionalEval(x):
else:
logger.warning(f"Keyword argument '{ka}' not found!")
logger.debug(f"updating funcargs with {kwargs}")
self.funcargs.update(kwargs)
funcargs.update(kwargs)

# deal with kwonlyargs
kwargs = {}
kws = self.arguments.kwonlyargs
for ka in kws:
if ka not in self.funcargs:
if ka not in funcargs:
if ka in appArgs:
arg = appArgs.pop(ka)
value = arg['value']
Expand All @@ -582,7 +580,7 @@ def optionalEval(x):
else:
logger.warning(f"Keyword only argument '{ka}' not found!")
logger.debug(f"updating funcargs with kwonlyargs: {kwargs}")
self.funcargs.update(kwargs)
funcargs.update(kwargs)

# any remaining application arguments will be used for vargs and vkwargs
vparg = []
Expand All @@ -599,25 +597,25 @@ def optionalEval(x):
vkarg.update({arg:value})

if self.arguments.varargs:
self.pargs.extend(vparg)
pargs.extend(vparg)
if self.arguments.varkw:
self.funcargs.update(vkarg)
funcargs.update(vkarg)

# Fill rest with default arguments if there are any more
kwargs = {}
for kw in self.func_defaults.keys():
value = self.func_defaults[kw]
if kw not in self.funcargs:
if kw not in funcargs:
kwargs.update({kw: value})
logger.debug(f"updating funcargs with {kwargs}")
self.funcargs.update(kwargs)
self._recompute_data["args"] = self.funcargs.copy()
logger.debug(f"Running {self.func_name} with *{self.pargs} **{self.funcargs}")
funcargs.update(kwargs)
self._recompute_data["args"] = funcargs.copy()
logger.debug(f"Running {self.func_name} with *{pargs} **{funcargs}")

# we capture and log whatever is produced on STDOUT
capture = StringIO()
with redirect_stdout(capture):
result = self.f(*self.pargs, **self.funcargs)
result = self.f(*pargs, **funcargs)
logger.info(f"Captured output from function app '{self.func_name}': {capture.getvalue()}")
logger.debug(f"Finished execution of {self.func_name}.")

Expand Down
13 changes: 8 additions & 5 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,22 @@ class RandomArrayApp(BarrierAppDROP):
size = dlg_int_param("size", 100)
marray = []

def initialize(self, **kwargs):
def initialize(self, keep_array=False, **kwargs):
super(RandomArrayApp, self).initialize(**kwargs)
self._keep_array = keep_array

def run(self):
# At least one output should have been added
outs = self.outputs
if len(outs) < 1:
raise Exception("At least one output should have been added to %r" % self)
self.generateRandomArray()
marray = self.generateRandomArray()
if self._keep_array:
self.marray = marray
for o in outs:
d = pickle.dumps(self.marray)
d = pickle.dumps(marray)
o.len = len(d)
o.write(pickle.dumps(self.marray))
o.write(d)

def generateRandomArray(self):
if self.integer:
Expand All @@ -292,7 +295,7 @@ def generateRandomArray(self):
# generate an array of self.size floats with numbers between
# self.low and self.high
marray = (np.random.random(size=self.size) + self.low) * self.high
self.marray = marray
return marray

def _getArray(self):
return self.marray
Expand Down
10 changes: 10 additions & 0 deletions daliuge-engine/dlg/lifecycle/dlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ def addDrop(self, drop):
# perform this task, like using threading.Timers (probably not) or
# any other that doesn't mean looping over all DROPs

def remove_drops(self, drop_oids):
"""
Remove drops from DLM's monitoring
"""
self._drops = {
oid: drop
for oid, drop in self._drops.items()
if oid not in drop_oids
}

def handleOpenedDrop(self, oid, uid):
drop = self._drops[uid]
if drop.status == DROPStates.COMPLETED:
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ def destroySession(self, sessionId):
session = self._sessions.pop(sessionId)
if hasattr(self, "_memoryManager"):
self._memoryManager.shutdown_session(sessionId)
self._dlm.remove_drops(session.drops)
session.destroy()

def getSessionIds(self):
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_sleepandcopyapp(self):

def test_randomarrayapp(self):
i = NullDROP("i", "i")
c = RandomArrayApp("c", "c")
c = RandomArrayApp("c", "c", keep_array=True)
o = InMemoryDROP("o", "o")
c.addInput(i)
c.addOutput(o)
Expand Down

0 comments on commit c762528

Please sign in to comment.