-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DRAFT: YAN-913 Async Drop Streaming #134
Conversation
…ed by xml2palette
applicationParams forwarding
…esting environment
# Conflicts: # daliuge-engine/build_engine.sh
YAN-913 remove casacore dependencies
…sting # Conflicts: # daliuge-engine/build_engine.sh # daliuge-engine/docker/Dockerfile.devall
def _open(self, **kwargs): | ||
if self._mode == OpenMode.OPEN_WRITE: | ||
return self._buf | ||
elif self._mode == OpenMode.OPEN_READ: | ||
# TODO: potentially wasteful copy | ||
return io.BytesIO(self._buf.getbuffer()) | ||
br = MemoryIO.BytesIOReader(self._buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the issues I've noticed here this is that numerous read functions call close() on the io object after reading. When updating such that multiple readers all point to the same _buf object, the reader closing additionally closes the bytesio object, causing all other drop reads to error.
Abstracting the reader close to just the BytesIOReader should allow DataDrop to continue correctly handling the closing of the underlying BytesIO object.
@@ -82,7 +82,7 @@ def initialize(self, **kwargs): | |||
self.config_data = self.serialize_parameters( | |||
self.filter_parameters(self.parameters, self.mode), self.mode).encode('utf-8') | |||
|
|||
def getIO(self): | |||
def _getIO(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A rather large refactor, but I haven't found an existing case where the DataDROP interface isn't sufficient for an app and needs to use DataIO directly.
To add drop streaming I've simply added dataDrop.writeStream() and dataDrop.readStream()
eb80e22
to
06cd2c3
Compare
Rebased in #165 |
This change adds an alternative approach to stream buffering that utilizes dataDrops for controlling the stream backbuffer rather than passing directly to the next appdrop via callback. The motive for buffering is to allow subprocesses to asynchronously process streams rather than immediately invoke a chain of callbacks, all of which would be executed by the first streaming drop process.