Skip to content

Commit

Permalink
mp and threaded streaming test
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Jun 3, 2022
1 parent d6c9fd9 commit 78bd896
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 50 deletions.
12 changes: 4 additions & 8 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2508,14 +2508,10 @@ def execute(self, _send_notifications=True):
proc = DlgProcess(target=self.run, daemon=True)
# see YAN-975 for why this is happening
lock = self._dlg_proc_lock
logger.debug("aquiring lock..")
#with lock:
logger.debug("got lock")
proc.start()
#with lock:
logger.debug("joining...")
proc.join()
logger.debug("proc done!")
with lock:
proc.start()
with lock:
proc.join()
proc.close()
if proc.exception:
raise proc.exception
Expand Down
72 changes: 30 additions & 42 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ def isContainer(drop):
return False


def enabled_multiprocessing(apps, dataDrops):
from psutil import cpu_count
session_id = 1
max_threads = cpu_count(logical=False)
threadpool = ThreadPool(processes=max_threads)
memory_manager = DlgSharedMemoryManager()
for app in apps:
app.__setattr__("_tp", threadpool)
for dataDrop in dataDrops:
dataDrop.__setattr__("_tp", threadpool)
dataDrop.__setattr__("_sessID", session_id)
memory_manager.register_drop(dataDrop.uid, session_id)


class SumupContainerChecksum(BarrierAppDROP):
"""
A dummy BarrierAppDROP that recursively sums up the checksums of
Expand Down Expand Up @@ -761,7 +775,13 @@ def checkDropStates(aStatus, dStatus, eStatus, lastByte):
)
self.assertEqual(b"ejk", droputils.allDropContents(d))

def test_objectAsNormalAndAsyncStreamingInput(self):
def test_objectAsNormalAndThreadedAsyncStreamingInput(self):
self._test_objectAsNormalAndAsyncStreamingInput(False)

def test_objectAsNormalAndMPAsyncStreamingInput(self):
self._test_objectAsNormalAndAsyncStreamingInput(True)

def _test_objectAsNormalAndAsyncStreamingInput(self, use_multiprocessing):
"""
A test that checks that a DROP can act as normal and async streaming input
of different AppDROPs at the same time. We use the following graph:
Expand All @@ -773,24 +793,6 @@ def test_objectAsNormalAndAsyncStreamingInput(self):
input
"""

class DequeAsyncStream(AsyncIterator):
_d = Deque()
_end = False
async def __anext__(self):
while True:
if len(self._d) > 0:
return self._d.popleft()
elif self._end:
raise StopAsyncIteration
else:
await asyncio.sleep(0)

def append(self, data):
self._d.append(data)

def end(self):
self._end = True

class QueueAsyncStream(AsyncIterator):
_q = multiprocessing.Queue()
_end = multiprocessing.Event()
Expand All @@ -815,7 +817,7 @@ def end(self):
self._end.set()

class LastCharWriterApp(AppDROP):
#_lastByte = None # Note: cannot share with daliuge thread
# Note: cannot share string member with test thread
_lastByte = multiprocessing.Value(ctypes.c_char, b" ")
_stream = QueueAsyncStream()

Expand All @@ -826,7 +828,6 @@ async def arun(self):
outputDrop = self.outputs[0]
async for data in self._stream:
self._lastByte.value = data[-1:]
#outputDrop.status = DROPStates.WRITING
outputDrop.write(self._lastByte.value)

def dataWritten(self, uid, data):
Expand All @@ -845,24 +846,8 @@ def dropCompleted(self, uid, drop_state):
b.addOutput(d)
c.addOutput(e)

use_multiprocess = True
if use_multiprocess:
from psutil import cpu_count
session_id = 1
max_threads = cpu_count(logical=False)
threadpool = ThreadPool(processes=max_threads)
memory_manager = DlgSharedMemoryManager()
a.__setattr__("_tp", threadpool)
b.__setattr__("_tp", threadpool)
c.__setattr__("_tp", threadpool)
d.__setattr__("_tp", threadpool)
e.__setattr__("_tp", threadpool)
a.__setattr__("_sessID", session_id)
d.__setattr__("_sessID", session_id)
e.__setattr__("_sessID", session_id)
memory_manager.register_drop(a.uid, session_id)
memory_manager.register_drop(d.uid, session_id)
memory_manager.register_drop(e.uid, session_id)
if use_multiprocessing:
enabled_multiprocessing([b,c], [a,d,e])

# Consumer cannot be normal and streaming at the same time
self.assertRaises(Exception, a.addConsumer, b)
Expand All @@ -876,24 +861,27 @@ def checkDropStates(aStatus, dStatus, eStatus, lastByte):
if lastByte is not None:
self.assertEqual(lastByte, b._lastByte.value)

with DROPWaiterCtx(self, [d, e], timeout = 5):
with DROPWaiterCtx(self, [d, e]):
b.async_execute()
time.sleep(0.1)

# TODO: data state doesn't update to writing with multiprocessing
data_state = DROPStates.INITIALIZED if use_multiprocessing else DROPStates.WRITING

checkDropStates(
DROPStates.INITIALIZED, DROPStates.INITIALIZED, DROPStates.INITIALIZED, b" "
)
a.write(b"abcde")
time.sleep(0.1)

checkDropStates(
DROPStates.WRITING, DROPStates.INITIALIZED, DROPStates.INITIALIZED, b"e"
DROPStates.WRITING, data_state, DROPStates.INITIALIZED, b"e"
)
a.write(b"fghij")
time.sleep(0.1)

checkDropStates(
DROPStates.WRITING, DROPStates.INITIALIZED, DROPStates.INITIALIZED, b"j"
DROPStates.WRITING, data_state, DROPStates.INITIALIZED, b"j"
)
a.write(b"k")
time.sleep(0.1)
Expand Down

0 comments on commit 78bd896

Please sign in to comment.