From d4d631ed2e1e841a233a9766ccbbc87d287931ef Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Wed, 1 Jun 2022 17:17:27 +0800 Subject: [PATCH] both stream tests running --- daliuge-engine/test/test_drop.py | 68 ++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/daliuge-engine/test/test_drop.py b/daliuge-engine/test/test_drop.py index 459d97e13..cc7e1212f 100644 --- a/daliuge-engine/test/test_drop.py +++ b/daliuge-engine/test/test_drop.py @@ -687,6 +687,74 @@ def _test_graphExecutionDriver(self, mode): # b is already done self.assertEqual(c.status, DROPStates.COMPLETED) + def test_objectAsNormalAndStreamingInput(self): + """ + A test that checks that a DROP can act as normal and streaming input of + different AppDROPs at the same time. We use the following graph: + + A --|--> B --> D + |--> C --> E + + Here B uses A as a streaming input, while C uses it as a normal + input + """ + + class LastCharWriterApp(AppDROP): + def initialize(self, **kwargs): + super(LastCharWriterApp, self).initialize(**kwargs) + self._lastByte = None + + def dataWritten(self, uid, data): + self.execStatus = AppDROPStates.RUNNING + outputDrop = self.outputs[0] + self._lastByte = data[-1:] + outputDrop.write(self._lastByte) + + def dropCompleted(self, uid, status): + self.execStatus = AppDROPStates.FINISHED + self._notifyAppIsFinished() + + a = InMemoryDROP("a", "a") + b = LastCharWriterApp("b", "b") + c = SumupContainerChecksum("c", "c") + d = InMemoryDROP("d", "d") + e = InMemoryDROP("e", "e") + a.addStreamingConsumer(b) + a.addConsumer(c) + b.addOutput(d) + c.addOutput(e) + + # Consumer cannot be normal and streaming at the same time + self.assertRaises(Exception, a.addConsumer, b) + self.assertRaises(Exception, a.addStreamingConsumer, c) + + # Write a little, then check the consumers + def checkDropStates(aStatus, dStatus, eStatus, lastByte): + self.assertEqual(aStatus, a.status) + self.assertEqual(dStatus, d.status) + self.assertEqual(eStatus, e.status) + if lastByte is not None: + self.assertEqual(lastByte, b._lastByte) + + checkDropStates( + DROPStates.INITIALIZED, DROPStates.INITIALIZED, DROPStates.INITIALIZED, None + ) + a.write(b"abcde") + checkDropStates( + DROPStates.WRITING, DROPStates.WRITING, DROPStates.INITIALIZED, b"e" + ) + a.write(b"fghij") + checkDropStates( + DROPStates.WRITING, DROPStates.WRITING, DROPStates.INITIALIZED, b"j" + ) + a.write(b"k") + with DROPWaiterCtx(self, [d, e]): + a.setCompleted() + checkDropStates( + DROPStates.COMPLETED, DROPStates.COMPLETED, DROPStates.COMPLETED, b"k" + ) + self.assertEqual(b"ejk", droputils.allDropContents(d)) + def test_objectAsNormalAndAsyncStreamingInput(self): """ A test that checks that a DROP can act as normal and async streaming input