Skip to content

Commit

Permalink
Merge 78bd896 into 56a60cf
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Jun 3, 2022
2 parents 56a60cf + 78bd896 commit 5b4a529
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 82 deletions.
151 changes: 73 additions & 78 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""
Module containing the core DROP classes.
"""
import multiprocessing
from sqlite3 import OperationalError
import string
from abc import ABCMeta, abstractmethod, abstractproperty
Expand Down Expand Up @@ -2297,6 +2298,7 @@ class AppDROP(ContainerDROP):
after all its inputs have moved to COMPLETED (implying that none of them is
an streaming input); for these cases see the `BarrierAppDROP`.
"""
n_tries = dlg_int_param("Number of tries", 1)

def initialize(self, **kwargs):

Expand Down Expand Up @@ -2470,6 +2472,77 @@ def skip(self):
self._fire(
"producerFinished", status=self.status, execStatus=self.execStatus
)

def async_execute(self):
# Return immediately, but schedule the execution of this app
# If we have been given a thread pool use that
if hasattr(self, "_tp"):
self._tp.apply_async(self.execute)
else:
t = threading.Thread(target=self.execute)
t.daemon = 1
t.start()

_dlg_proc_lock = multiprocessing.Lock()

@track_current_drop
def execute(self, _send_notifications=True):
"""
Manually trigger the execution of this application.
This method is normally invoked internally when the application detects
all its inputs are COMPLETED.
"""

# TODO: We need to be defined more clearly how the state is set in
# applications, for the time being they follow their execState.

# Run at most self._n_tries if there are errors during the execution
logger.debug("Executing %r", self)
tries = 0
drop_state = DROPStates.COMPLETED
self.execStatus = AppDROPStates.RUNNING
while tries < self.n_tries:
try:
if hasattr(self, "_tp"):
proc = DlgProcess(target=self.run, daemon=True)
# see YAN-975 for why this is happening
lock = self._dlg_proc_lock
with lock:
proc.start()
with lock:
proc.join()
proc.close()
if proc.exception:
raise proc.exception
else:
self.run()
if self.execStatus == AppDROPStates.CANCELLED:
return
self.execStatus = AppDROPStates.FINISHED
break
except:
if self.execStatus == AppDROPStates.CANCELLED:
return
tries += 1
logger.exception(
"Error while executing %r (try %d/%d)" % (self, tries, self.n_tries)
)

# We gave up running the application, go to error
if tries == self.n_tries:
self.execStatus = AppDROPStates.ERROR
drop_state = DROPStates.ERROR

self.status = drop_state
if _send_notifications:
self._notifyAppIsFinished()

def run(self):
"""
Run this application. It can be safely assumed that at this point all
the required inputs are COMPLETED.
"""


class InputFiredAppDROP(AppDROP):
Expand Down Expand Up @@ -2505,7 +2578,6 @@ class InputFiredAppDROP(AppDROP):

input_error_threshold = dlg_int_param("Input error threshold (0 and 100)", 0)
n_effective_inputs = dlg_int_param("Number of effective inputs", -1)
n_tries = dlg_int_param("Number of tries", 1)

def initialize(self, **kwargs):
super(InputFiredAppDROP, self).initialize(**kwargs)
Expand Down Expand Up @@ -2536,12 +2608,6 @@ def initialize(self, **kwargs):
self, "Invalid n_tries, must be a positive number"
)

def addStreamingInput(self, streamingInputDrop, back=True):
raise InvalidRelationshipException(
DROPRel(streamingInputDrop, DROPLinkType.STREAMING_INPUT, self),
"InputFiredAppDROPs don't accept streaming inputs",
)

def dropCompleted(self, uid, drop_state):
super(InputFiredAppDROP, self).dropCompleted(uid, drop_state)

Expand Down Expand Up @@ -2605,77 +2671,6 @@ def dropCompleted(self, uid, drop_state):
else:
self.async_execute()

def async_execute(self):
# Return immediately, but schedule the execution of this app
# If we have been given a thread pool use that
if hasattr(self, "_tp"):
self._tp.apply_async(self.execute)
else:
t = threading.Thread(target=self.execute)
t.daemon = 1
t.start()

_dlg_proc_lock = threading.Lock()

@track_current_drop
def execute(self, _send_notifications=True):
"""
Manually trigger the execution of this application.
This method is normally invoked internally when the application detects
all its inputs are COMPLETED.
"""

# TODO: We need to be defined more clearly how the state is set in
# applications, for the time being they follow their execState.

# Run at most self._n_tries if there are errors during the execution
logger.debug("Executing %r", self)
tries = 0
drop_state = DROPStates.COMPLETED
self.execStatus = AppDROPStates.RUNNING
while tries < self.n_tries:
try:
if hasattr(self, "_tp"):
proc = DlgProcess(target=self.run, daemon=True)
# see YAN-975 for why this is happening
lock = InputFiredAppDROP._dlg_proc_lock
with lock:
proc.start()
with lock:
proc.join()
proc.close()
if proc.exception:
raise proc.exception
else:
self.run()
if self.execStatus == AppDROPStates.CANCELLED:
return
self.execStatus = AppDROPStates.FINISHED
break
except:
if self.execStatus == AppDROPStates.CANCELLED:
return
tries += 1
logger.exception(
"Error while executing %r (try %d/%d)" % (self, tries, self.n_tries)
)

# We gave up running the application, go to error
if tries == self.n_tries:
self.execStatus = AppDROPStates.ERROR
drop_state = DROPStates.ERROR

self.status = drop_state
if _send_notifications:
self._notifyAppIsFinished()

def run(self):
"""
Run this application. It can be safely assumed that at this point all
the required inputs are COMPLETED.
"""

# TODO: another thing we need to check
def exists(self):
return True
Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ def test_multi_listappendthrashing(self, size=1000, parallel=True):
mdrops = [InMemoryDROP(chr(65 + x), chr(65 + x)) for x in range(max_threads)]
if parallel:
# a bit of magic to get the app drops using the processes
_ = [drop.__setattr__("_tp", threadpool) for drop in drops]
_ = [drop.__setattr__("_tp", threadpool) for drop in mdrops]
_ = [drop.__setattr__("_sessID", session_id) for drop in mdrops]
_ = [memory_manager.register_drop(drop.uid, session_id) for drop in mdrops]
for drop in drops: drop.__setattr__("_tp", threadpool)
for drop in mdrops: drop.__setattr__("_tp", threadpool)
for drop in mdrops: drop.__setattr__("_sessID", session_id)
for drop in mdrops: memory_manager.register_drop(drop.uid, session_id)
X.__setattr__("_tp", threadpool)
Z.__setattr__("_tp", threadpool)
Z.__setattr__("_sessID", session_id)
Expand Down
143 changes: 143 additions & 0 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@
# MA 02111-1307 USA
#

import asyncio
import contextlib
import ctypes
import io
import multiprocessing
from multiprocessing.pool import ThreadPool
import os, unittest
import queue
from typing import AsyncIterable, AsyncIterator, Deque
import time
import random
import shutil
import sqlite3
Expand Down Expand Up @@ -52,6 +59,11 @@
from dlg.exceptions import InvalidDropException
from dlg.apps.simple import NullBarrierApp, SimpleBranch, SleepAndCopyApp

import logging

from dlg.manager.shared_memory_manager import DlgSharedMemoryManager
logger = logging.getLogger(__name__)

try:
from crc32c import crc32c
except:
Expand All @@ -75,6 +87,20 @@ def isContainer(drop):
return False


def enabled_multiprocessing(apps, dataDrops):
from psutil import cpu_count
session_id = 1
max_threads = cpu_count(logical=False)
threadpool = ThreadPool(processes=max_threads)
memory_manager = DlgSharedMemoryManager()
for app in apps:
app.__setattr__("_tp", threadpool)
for dataDrop in dataDrops:
dataDrop.__setattr__("_tp", threadpool)
dataDrop.__setattr__("_sessID", session_id)
memory_manager.register_drop(dataDrop.uid, session_id)


class SumupContainerChecksum(BarrierAppDROP):
"""
A dummy BarrierAppDROP that recursively sums up the checksums of
Expand Down Expand Up @@ -747,8 +773,125 @@ def checkDropStates(aStatus, dStatus, eStatus, lastByte):
checkDropStates(
DROPStates.COMPLETED, DROPStates.COMPLETED, DROPStates.COMPLETED, b"k"
)
self.assertEqual(b"ejk", droputils.allDropContents(d))

def test_objectAsNormalAndThreadedAsyncStreamingInput(self):
self._test_objectAsNormalAndAsyncStreamingInput(False)

def test_objectAsNormalAndMPAsyncStreamingInput(self):
self._test_objectAsNormalAndAsyncStreamingInput(True)

def _test_objectAsNormalAndAsyncStreamingInput(self, use_multiprocessing):
"""
A test that checks that a DROP can act as normal and async streaming input
of different AppDROPs at the same time. We use the following graph:
A --|--> B --> D
|--> C --> E
Here B uses A as a streaming input, while C uses it as a normal
input
"""

class QueueAsyncStream(AsyncIterator):
_q = multiprocessing.Queue()
_end = multiprocessing.Event()
async def __anext__(self):
while True:
if self._q.qsize() > 0:
return self._q.get()
elif self._end.is_set():
raise StopAsyncIteration
else:
await asyncio.sleep(0)

def append(self, data):
while not self._q.full():
try:
self._q.put(data, block=True, timeout=1)
break
except queue.Full:
pass

def end(self):
self._end.set()

class LastCharWriterApp(AppDROP):
# Note: cannot share string member with test thread
_lastByte = multiprocessing.Value(ctypes.c_char, b" ")
_stream = QueueAsyncStream()

def run(self):
asyncio.run(self.arun())

async def arun(self):
outputDrop = self.outputs[0]
async for data in self._stream:
self._lastByte.value = data[-1:]
outputDrop.write(self._lastByte.value)

def dataWritten(self, uid, data):
self._stream.append(data)

def dropCompleted(self, uid, drop_state):
self._stream.end()

a = InMemoryDROP("a", "a")
b = LastCharWriterApp("b", "b")
c = SumupContainerChecksum("c", "c")
d = InMemoryDROP("d", "d")
e = InMemoryDROP("e", "e")
a.addStreamingConsumer(b)
a.addConsumer(c)
b.addOutput(d)
c.addOutput(e)

if use_multiprocessing:
enabled_multiprocessing([b,c], [a,d,e])

# Consumer cannot be normal and streaming at the same time
self.assertRaises(Exception, a.addConsumer, b)
self.assertRaises(Exception, a.addStreamingConsumer, c)

# Write a little, then check the consumers
def checkDropStates(aStatus, dStatus, eStatus, lastByte):
self.assertEqual(aStatus, a.status)
self.assertEqual(dStatus, d.status)
self.assertEqual(eStatus, e.status)
if lastByte is not None:
self.assertEqual(lastByte, b._lastByte.value)

with DROPWaiterCtx(self, [d, e]):
b.async_execute()
time.sleep(0.1)

# TODO: data state doesn't update to writing with multiprocessing
data_state = DROPStates.INITIALIZED if use_multiprocessing else DROPStates.WRITING

checkDropStates(
DROPStates.INITIALIZED, DROPStates.INITIALIZED, DROPStates.INITIALIZED, b" "
)
a.write(b"abcde")
time.sleep(0.1)

checkDropStates(
DROPStates.WRITING, data_state, DROPStates.INITIALIZED, b"e"
)
a.write(b"fghij")
time.sleep(0.1)

checkDropStates(
DROPStates.WRITING, data_state, DROPStates.INITIALIZED, b"j"
)
a.write(b"k")
time.sleep(0.1)
a.setCompleted()

checkDropStates(
DROPStates.COMPLETED, DROPStates.COMPLETED, DROPStates.COMPLETED, b"k"
)
self.assertEqual(b"ejk", droputils.allDropContents(d))
self.assertEqual(b"1325211590", droputils.allDropContents(e))

def test_fileDROP_delete_parent_dir(self):
"""
Expand Down

0 comments on commit 5b4a529

Please sign in to comment.