Skip to content

Commit

Permalink
both stream tests running
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Jun 1, 2022
1 parent 53c457a commit d4d631e
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,74 @@ def _test_graphExecutionDriver(self, mode):
# b is already done
self.assertEqual(c.status, DROPStates.COMPLETED)

def test_objectAsNormalAndStreamingInput(self):
"""
A test that checks that a DROP can act as normal and streaming input of
different AppDROPs at the same time. We use the following graph:
A --|--> B --> D
|--> C --> E
Here B uses A as a streaming input, while C uses it as a normal
input
"""

class LastCharWriterApp(AppDROP):
def initialize(self, **kwargs):
super(LastCharWriterApp, self).initialize(**kwargs)
self._lastByte = None

def dataWritten(self, uid, data):
self.execStatus = AppDROPStates.RUNNING
outputDrop = self.outputs[0]
self._lastByte = data[-1:]
outputDrop.write(self._lastByte)

def dropCompleted(self, uid, status):
self.execStatus = AppDROPStates.FINISHED
self._notifyAppIsFinished()

a = InMemoryDROP("a", "a")
b = LastCharWriterApp("b", "b")
c = SumupContainerChecksum("c", "c")
d = InMemoryDROP("d", "d")
e = InMemoryDROP("e", "e")
a.addStreamingConsumer(b)
a.addConsumer(c)
b.addOutput(d)
c.addOutput(e)

# Consumer cannot be normal and streaming at the same time
self.assertRaises(Exception, a.addConsumer, b)
self.assertRaises(Exception, a.addStreamingConsumer, c)

# Write a little, then check the consumers
def checkDropStates(aStatus, dStatus, eStatus, lastByte):
self.assertEqual(aStatus, a.status)
self.assertEqual(dStatus, d.status)
self.assertEqual(eStatus, e.status)
if lastByte is not None:
self.assertEqual(lastByte, b._lastByte)

checkDropStates(
DROPStates.INITIALIZED, DROPStates.INITIALIZED, DROPStates.INITIALIZED, None
)
a.write(b"abcde")
checkDropStates(
DROPStates.WRITING, DROPStates.WRITING, DROPStates.INITIALIZED, b"e"
)
a.write(b"fghij")
checkDropStates(
DROPStates.WRITING, DROPStates.WRITING, DROPStates.INITIALIZED, b"j"
)
a.write(b"k")
with DROPWaiterCtx(self, [d, e]):
a.setCompleted()
checkDropStates(
DROPStates.COMPLETED, DROPStates.COMPLETED, DROPStates.COMPLETED, b"k"
)
self.assertEqual(b"ejk", droputils.allDropContents(d))

def test_objectAsNormalAndAsyncStreamingInput(self):
"""
A test that checks that a DROP can act as normal and async streaming input
Expand Down

0 comments on commit d4d631e

Please sign in to comment.