-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft: YAN-913 Async Drop Example #169
base: master
Are you sure you want to change the base?
Conversation
class LastCharWriterApp(AppDROP): | ||
# Note: cannot share string member with test thread | ||
_lastByte = multiprocessing.Value(ctypes.c_char, b" ") | ||
_stream = QueueAsyncStream() | ||
|
||
def run(self): | ||
asyncio.run(self.arun()) | ||
|
||
async def arun(self): | ||
outputDrop = self.outputs[0] | ||
async for data in self._stream: | ||
self._lastByte.value = data[-1:] | ||
outputDrop.write(self._lastByte.value) | ||
|
||
def dataWritten(self, uid, data): | ||
self._stream.append(data) | ||
|
||
def dropCompleted(self, uid, drop_state): | ||
self._stream.end() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here's another approach to stream objects using the existing streaming interface. This works with python threading and multiprocessing just has a few catches regarding memory sharing with test code.
The reason for using a stream object is so that the dataWritten callback is running on a different app's subprocess and should return quickly to not block it. The run method here is invoked using with its own subprocess and is more suited to perform any heavy data processing.
The advantage of using an async stream in particular here this can scale to 2 or more streams using a single subprocess using something like asyncio.gather
which switches between streams depending on which one has data ready.
class QueueAsyncStream(AsyncIterator): | ||
_q = multiprocessing.Queue() | ||
_end = multiprocessing.Event() | ||
async def __anext__(self): | ||
while True: | ||
if self._q.qsize() > 0: | ||
return self._q.get() | ||
elif self._end.is_set(): | ||
raise StopAsyncIteration | ||
else: | ||
await asyncio.sleep(0) | ||
|
||
def append(self, data): | ||
while not self._q.full(): | ||
try: | ||
self._q.put(data, block=True, timeout=1) | ||
break | ||
except queue.Full: | ||
pass | ||
|
||
def end(self): | ||
self._end.set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously I used a deque here however multiprocess safe issues lead me to trial a queue. Here I've also set the stream base instead to AsyncIterator instead of AsyncIterable where the primary difference is the iterator can only be iterated once. This greatly simplifies the lifecycle of around when to pop data and embraces shared ownership of stream data.
This implementation assumes putting data in the stream in one process and getting it out in another has minimal overhead.
78bd896
to
dff60e8
Compare
No description provided.