Skip to content

Commit b973256

Browse files
committed
AsyncStreams: Return true from AsyncReadableStream::push if the caller can continue pushing
1 parent b206217 commit b973256

File tree

5 files changed

+39
-24
lines changed

5 files changed

+39
-24
lines changed

Libraries/AsyncStreams/AsyncRequestStreams.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,20 @@ void SC::AsyncRequestReadableStream<AsyncReadRequest>::afterRead(typename AsyncR
102102
}
103103
else
104104
{
105-
AsyncReadableStream::push(bufferID, data.sizeInBytes());
105+
const bool continuePushing = AsyncReadableStream::push(bufferID, data.sizeInBytes());
106106
SC_ASSERT_RELEASE(result.getAsync().isFree());
107107
getBuffersPool().unrefBuffer(bufferID);
108-
if (getBufferOrPause(0, bufferID, result.getAsync().buffer))
108+
// Check if we're still pushing (so not, paused, destroyed or errored etc.)
109+
if (continuePushing)
109110
{
110-
request.callback = [this, bufferID](typename AsyncReadRequest::Result& result)
111-
{ afterRead(result, bufferID); };
112-
result.reactivateRequest(true);
113-
// Stream is in AsyncPushing mode and SC::AsyncResult::reactivateRequest(true) will cause more
114-
// data to be delivered here, so it's not necessary calling AsyncReadableStream::reactivate(true).
111+
if (getBufferOrPause(0, bufferID, result.getAsync().buffer))
112+
{
113+
request.callback = [this, bufferID](typename AsyncReadRequest::Result& result)
114+
{ afterRead(result, bufferID); };
115+
result.reactivateRequest(true);
116+
// Stream is in AsyncPushing mode and SC::AsyncResult::reactivateRequest(true) will cause more
117+
// data to be delivered here, so it's not necessary calling AsyncReadableStream::reactivate(true).
118+
}
115119
}
116120
}
117121
}

Libraries/AsyncStreams/AsyncStreams.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,12 @@ void AsyncReadableStream::emitOnData()
189189
}
190190
}
191191

192-
void AsyncReadableStream::push(AsyncBufferView::ID bufferID, size_t newSize)
192+
bool AsyncReadableStream::push(AsyncBufferView::ID bufferID, size_t newSize)
193193
{
194194
if (newSize == 0)
195195
{
196196
emitError(Result::Error("AsyncReadableStream::push zero sized buffer is not allowed"));
197-
return;
197+
return false;
198198
}
199199
// Push buffer to the queue
200200
buffers->setNewBufferSize(bufferID, newSize);
@@ -204,7 +204,7 @@ void AsyncReadableStream::push(AsyncBufferView::ID bufferID, size_t newSize)
204204
{
205205
state = State::Errored;
206206
emitError(Result::Error("AsyncReadableStream::push dropping buffer"));
207-
return;
207+
return false;
208208
}
209209
buffers->refBuffer(bufferID); // 1a. unrefBuffer in emitOnData()
210210

@@ -219,7 +219,10 @@ void AsyncReadableStream::push(AsyncBufferView::ID bufferID, size_t newSize)
219219
case State::AsyncPushing:
220220
case State::AsyncReading: {
221221
emitOnData();
222-
state = State::AsyncPushing;
222+
if (state == State::AsyncReading)
223+
{
224+
state = State::AsyncPushing;
225+
}
223226
}
224227
break;
225228
case State::Pausing: {
@@ -232,6 +235,7 @@ void AsyncReadableStream::push(AsyncBufferView::ID bufferID, size_t newSize)
232235
}
233236
break;
234237
}
238+
return state == State::AsyncPushing or state == State::SyncPushing;
235239
}
236240

237241
void AsyncReadableStream::reactivate(bool doReactivate)
@@ -652,7 +656,8 @@ void AsyncTransformStream::afterProcess(Span<const char> inputAfter, Span<char>
652656
const size_t consumedOutput = outputData.sizeInBytes() - outputAfter.sizeInBytes();
653657
if (consumedOutput > 0)
654658
{
655-
AsyncReadableStream::push(outputBufferID, consumedOutput);
659+
// Ignore whatever push returns because later on the stream is either finalizing or pausing either way
660+
(void)AsyncReadableStream::push(outputBufferID, consumedOutput);
656661
}
657662
AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID);
658663
if (inputAfter.empty())
@@ -681,7 +686,8 @@ void AsyncTransformStream::afterFinalize(Span<char> outputAfter, bool streamEnde
681686
const size_t consumedOutput = outputData.sizeInBytes() - outputAfter.sizeInBytes();
682687
if (consumedOutput > 0)
683688
{
684-
AsyncReadableStream::push(outputBufferID, consumedOutput);
689+
// Ignore whatever push returns because later on the stream is either finalizing or pausing either way
690+
(void)AsyncReadableStream::push(outputBufferID, consumedOutput);
685691
}
686692
AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID);
687693
if (streamEnded)

Libraries/AsyncStreams/AsyncStreams.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ struct AsyncReadableStream
208208
AsyncBuffersPool& getBuffersPool();
209209

210210
/// @brief Use push from inside AsyncReadableStream::asyncRead function to queue received data
211-
void push(AsyncBufferView::ID bufferID, size_t newSize);
211+
/// @return `true` if the caller can continue pushing
212+
[[nodiscard]] bool push(AsyncBufferView::ID bufferID, size_t newSize);
212213

213214
/// @brief Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end
214215
void pushEnd();

Libraries/AsyncStreams/ZLibTransformStreams.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ SC::Result SC::SyncZLibTransformStream::transform(AsyncBufferView::ID bufferID,
2323
SC_TRY(sourceData.sliceStart(consumedInputBytes, inputData));
2424
while (not inputData.empty())
2525
{
26-
Span<char> outputData;
2726
AsyncBufferView::ID outputBufferID;
28-
if (getBufferOrPause(0, outputBufferID, outputData))
27+
28+
Span<char> outputData;
29+
bool continuePushing = true;
30+
if (getBufferOrPause(0, outputBufferID, outputData) and continuePushing)
2931
{
3032
const size_t outputBefore = outputData.sizeInBytes();
3133
const size_t inputBefore = inputData.sizeInBytes();
@@ -40,7 +42,7 @@ SC::Result SC::SyncZLibTransformStream::transform(AsyncBufferView::ID bufferID,
4042
consumedInputBytes += consumedInput;
4143
if (consumedOutput > 0)
4244
{
43-
AsyncReadableStream::push(outputBufferID, consumedOutput);
45+
continuePushing = AsyncReadableStream::push(outputBufferID, consumedOutput);
4446
}
4547
AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID);
4648
}
@@ -65,8 +67,10 @@ bool SC::SyncZLibTransformStream::canEndTransform()
6567
// we need to hold the "Ending" state of the state machine, to finish
6668
// writing this last trail of transformed data.
6769
AsyncBufferView::ID outputBufferID;
68-
Span<char> outputBefore;
69-
while (getBufferOrPause(0, outputBufferID, outputBefore))
70+
71+
Span<char> outputBefore;
72+
bool continuePushing = true;
73+
while (getBufferOrPause(0, outputBufferID, outputBefore) and continuePushing)
7074
{
7175
Span<char> outputData = outputBefore;
7276

@@ -80,7 +84,7 @@ bool SC::SyncZLibTransformStream::canEndTransform()
8084
const size_t outputBytes = outputBefore.sizeInBytes() - outputData.sizeInBytes();
8185
if (outputBytes > 0)
8286
{
83-
AsyncReadableStream::push(outputBufferID, outputBytes);
87+
continuePushing = AsyncReadableStream::push(outputBufferID, outputBytes);
8488
}
8589
AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID);
8690
if (streamEnded)

Tests/Libraries/AsyncStreams/AsyncStreamsTest.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void SC::AsyncStreamsTest::readableSyncStream()
154154
} context = {readable, 0, 100, {}};
155155

156156
(void)readable.eventError.addListener([this](Result res) { SC_TEST_EXPECT(res); });
157-
readable.asyncRead = [&context]() -> Result
157+
readable.asyncRead = [this, &context]() -> Result
158158
{
159159
if (context.idx < context.max)
160160
{
@@ -163,7 +163,7 @@ void SC::AsyncStreamsTest::readableSyncStream()
163163
if (context.readable.getBufferOrPause(sizeof(context.idx), bufferID, data))
164164
{
165165
memcpy(data.data(), &context.idx, sizeof(context.idx));
166-
context.readable.push(bufferID, sizeof(context.idx));
166+
SC_TEST_EXPECT(context.readable.push(bufferID, sizeof(context.idx)));
167167
context.readable.getBuffersPool().unrefBuffer(bufferID);
168168
context.idx += 1;
169169
context.readable.reactivate(true);
@@ -231,14 +231,14 @@ void SC::AsyncStreamsTest::readableAsyncStream()
231231

232232
SC_TEST_EXPECT(loop.create());
233233
AsyncLoopTimeout timeout;
234-
timeout.callback = [&context](AsyncLoopTimeout::Result&)
234+
timeout.callback = [this, &context](AsyncLoopTimeout::Result&)
235235
{
236236
AsyncBufferView::ID bufferID;
237237
Span<char> data;
238238
if (context.readable.getBufferOrPause(sizeof(context.idx), bufferID, data))
239239
{
240240
memcpy(data.data(), &context.idx, sizeof(context.idx));
241-
context.readable.push(bufferID, sizeof(context.idx));
241+
SC_TEST_EXPECT(context.readable.push(bufferID, sizeof(context.idx)));
242242
context.readable.getBuffersPool().unrefBuffer(bufferID);
243243
context.idx += 1;
244244
context.readable.reactivate(true);

0 commit comments

Comments
 (0)