Skip to content

Commit

Permalink
Merge 26f3453 into 82fa3e8
Browse files Browse the repository at this point in the history
  • Loading branch information
rtobar committed Jun 1, 2022
2 parents 82fa3e8 + 26f3453 commit 5aa60de
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 78 deletions.
13 changes: 10 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import sys
import inspect
import binascii
from typing import List, Optional, Union
from typing import List, Union

import numpy as np
import pyarrow.plasma as plasma
Expand Down Expand Up @@ -2615,6 +2615,8 @@ def async_execute(self):
t.daemon = 1
t.start()

_dlg_proc_lock = threading.Lock()

@track_current_drop
def execute(self, _send_notifications=True):
"""
Expand All @@ -2636,8 +2638,13 @@ def execute(self, _send_notifications=True):
try:
if hasattr(self, "_tp"):
proc = DlgProcess(target=self.run, daemon=True)
proc.start()
proc.join()
# see YAN-975 for why this is happening
lock = InputFiredAppDROP._dlg_proc_lock
with lock:
proc.start()
with lock:
proc.join()
proc.close()
if proc.exception:
raise proc.exception
else:
Expand Down
13 changes: 3 additions & 10 deletions daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import threading
import time

from glob import glob
from . import constants
from .drop_manager import DROPManager
from .session import Session
Expand Down Expand Up @@ -190,11 +189,10 @@ def start(self):
Starts any background task required by this Node Manager
"""

@abc.abstractmethod
def shutdown(self):
"""
Stops any pending background task run by this Node Manager
"""
if self._threadpool:
self._threadpool.close()
self._threadpool.join()

@abc.abstractmethod
def subscribe(self, host, port):
Expand Down Expand Up @@ -374,11 +372,6 @@ def call_drop(self, sessionId, uid, method, *args):
self._check_session_id(sessionId)
return self._sessions[sessionId].call_drop(uid, method, *args)

def shutdown(self):
if hasattr(self, "_threadpool") and self._threadpool is not None:
self._threadpool.close()
self._threadpool.join()


class ZMQPubSubMixIn(object):
"""
Expand Down
6 changes: 3 additions & 3 deletions daliuge-engine/test/apps/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _test_socket_listener(self, **kwargs):
A --> B --> C --> D
"""

host = "localhost"
host = "127.0.0.1"
port = 9933
data = os.urandom(1025)

Expand Down Expand Up @@ -85,11 +85,11 @@ def test_socket_listener_integer_with_bufsize(self):
def test_invalid(self):

# Shouldn't allow inputs
a = SocketListenerApp("a", "a", port=1)
a = SocketListenerApp("a", "a", port=64 * 1024)
a.addOutput(InMemoryDROP("c", "c"))
self.assertRaises(Exception, a.addInput, InMemoryDROP("b", "b"))
self.assertRaises(Exception, a.addStreamingInput, InMemoryDROP("b", "b"))

# Shouldn't be able to open ports <= 1024
# Shouldn't be able to open ports > 64k - 1
a.execute()
self.assertEqual(a.status, DROPStates.ERROR)
81 changes: 19 additions & 62 deletions daliuge-engine/test/manager/test_dm.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,14 @@ class NodeManagerTestsBase(NMTestsMixIn):
def _deploy_error_graph(self, **kwargs):
sessionId = f"s{random.randint(0, 1000)}"
g = [
{"oid": "A", "type": "plain", "storage": Categories.MEMORY},
memory("A"),
{
"oid": "B",
"type": "app",
"app": "test.manager.test_dm.ErroneousApp",
"inputs": ["A"],
},
{
"oid": "C",
"type": "plain",
"storage": Categories.MEMORY,
"producers": ["B"],
},
memory("C", producers=["B"]),
]
add_test_reprodata(g)
dm = self._start_dm(threads=self.nm_threads, **kwargs)
Expand Down Expand Up @@ -243,15 +238,10 @@ def handleEvent(self, _evt):
self.assertTrue(evt.wait(10), "Didn't receive events on time")

def _test_runGraphOneDOPerDOM(self, repeats=1):
g1 = [{"oid": "A", "type": "plain", "storage": Categories.MEMORY}]
g1 = [memory("A")]
g2 = [
{"oid": "B", "type": "app", "app": "dlg.apps.crc.CRCApp"},
{
"oid": "C",
"type": "plain",
"storage": Categories.MEMORY,
"producers": ["B"],
},
memory("C", producers=["B"]),
]
rels = [DROPRel("B", DROPLinkType.CONSUMER, "A")]
a_data = os.urandom(32)
Expand Down Expand Up @@ -310,29 +300,14 @@ def test_runGraphSeveralDropsPerDM(self):

sessionId = "s1"
g1 = [
{
"oid": "A",
"type": "plain",
"storage": Categories.MEMORY,
"consumers": ["C"],
},
{"oid": "B", "type": "plain", "storage": Categories.MEMORY},
memory("A", consumers=["C"]),
memory("B"),
{"oid": "C", "type": "app", "app": "dlg.apps.crc.CRCApp"},
{
"oid": "D",
"type": "plain",
"storage": Categories.MEMORY,
"producers": ["C"],
},
memory("D", producers=["C"]),
]
g2 = [
{"oid": "E", "type": "app", "app": "test.test_drop.SumupContainerChecksum"},
{
"oid": "F",
"type": "plain",
"storage": Categories.MEMORY,
"producers": ["E"],
},
memory("F", producers=["E"]),
]
add_test_reprodata(g1)
add_test_reprodata(g2)
Expand Down Expand Up @@ -485,21 +460,13 @@ def test_many_relationships(self):

sessionId = f"s{random.randint(0, 1000)}"
N = 100
g1 = [{"oid": "A", "type": "plain", "storage": Categories.MEMORY}]
g2 = [{"oid": "C", "type": "plain", "storage": Categories.MEMORY}]
g1 = [memory("A")]
g2 = [memory("C")]
rels = []
for i in range(N):
b_oid = "B%d" % (i,)
# SleepAndCopyApp effectively opens the input drop
g2.append(
{
"oid": b_oid,
"type": "app",
"app": "dlg.apps.simple.SleepAndCopyApp",
"outputs": ["C"],
"sleepTime": 0,
}
)
g2.append(sleepAndCopy(b_oid, outputs=["C"], sleepTime=0))
rels.append(DROPRel("A", DROPLinkType.INPUT, b_oid))
add_test_reprodata(g1)
add_test_reprodata(g2)
Expand Down Expand Up @@ -544,24 +511,14 @@ def test_runGraphSeveralDropsPerDM_with_get_consumer_nodes(self):

sessionId = f"s{random.randint(0, 1000)}"
g1 = [
{
"oid": "A",
"type": "plain",
"storage": Categories.MEMORY,
"consumers": ["C"],
},
memory("A", consumers=["C"]),
{
"oid": "C",
"type": "app",
"app": "dlg.apps.crc.CRCApp",
"consumers": ["D"],
},
{
"oid": "D",
"type": "plain",
"storage": Categories.MEMORY,
"producers": ["C"],
},
memory("D", producers=["C"]),
]
g2 = [
{
Expand Down Expand Up @@ -612,15 +569,15 @@ def test_run_streaming_consumer_remotely(self):
"""

g1 = [
{"oid": "A", "type": "plain", "storage": Categories.MEMORY},
memory("A"),
{
"oid": "B",
"type": "app",
"app": "dlg.apps.simple.CopyApp",
"inputs": ["A"],
"outputs": ["C"],
},
{"oid": "C", "type": "plain", "storage": Categories.MEMORY},
memory("C"),
]
g2 = [
{
Expand All @@ -629,7 +586,7 @@ def test_run_streaming_consumer_remotely(self):
"app": "dlg.apps.crc.CRCStreamApp",
"outputs": ["E"],
},
{"oid": "E", "type": "plain", "storage": Categories.MEMORY},
memory("E"),
]
rels = [DROPRel("C", DROPLinkType.STREAMING_INPUT, "D")]
a_data = os.urandom(32)
Expand All @@ -642,7 +599,7 @@ def test_run_streaming_consumer_remotely2(self):
"""

g1 = [
{"oid": "A", "type": "plain", "storage": Categories.MEMORY},
memory("A"),
{
"oid": "B",
"type": "app",
Expand All @@ -651,15 +608,15 @@ def test_run_streaming_consumer_remotely2(self):
},
]
g2 = [
{"oid": "C", "type": "plain", "storage": Categories.MEMORY},
memory("C"),
{
"oid": "D",
"type": "app",
"app": "dlg.apps.crc.CRCStreamApp",
"streamingInputs": ["C"],
"outputs": ["E"],
},
{"oid": "E", "type": "plain", "storage": Categories.MEMORY},
memory("E"),
]
rels = [DROPRel("C", DROPLinkType.OUTPUT, "B")]
a_data = os.urandom(32)
Expand Down

0 comments on commit 5aa60de

Please sign in to comment.