Skip to content

Commit 207570c

Browse files
committed
AsyncStreams: Stop socket and file requests when close or finish events happens
1 parent a320715 commit 207570c

File tree

3 files changed

+39
-45
lines changed

3 files changed

+39
-45
lines changed

Libraries/AsyncStreams/AsyncRequestStreams.cpp

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -40,29 +40,24 @@ template <typename AsyncReadRequest>
4040
SC::AsyncRequestReadableStream<AsyncReadRequest>::AsyncRequestReadableStream()
4141
{
4242
AsyncReadableStream::asyncRead.bind<AsyncRequestReadableStream, &AsyncRequestReadableStream::read>(*this);
43+
(void)AsyncReadableStream::eventClose
44+
.addListener<AsyncRequestReadableStream, &AsyncRequestReadableStream::onCloseStopRequest>(*this);
4345
}
4446

4547
template <typename AsyncReadRequest>
46-
void SC::AsyncRequestReadableStream<AsyncReadRequest>::onEndCloseDescriptor()
48+
void SC::AsyncRequestReadableStream<AsyncReadRequest>::onCloseStopRequest()
4749
{
48-
Result res = Internal::closeDescriptor(request);
49-
if (not res)
50+
if (not request.isFree())
5051
{
51-
emitError(res);
52+
request.stop(*eventLoop);
5253
}
53-
}
54-
55-
template <typename AsyncReadRequest>
56-
SC::Result SC::AsyncRequestReadableStream<AsyncReadRequest>::registerAutoCloseDescriptor(bool value)
57-
{
58-
using Self = AsyncRequestReadableStream;
59-
if (value)
60-
{
61-
return Result(eventEnd.addListener<Self, &Self::onEndCloseDescriptor>(*this));
62-
}
63-
else
54+
if (autoCloseDescriptor)
6455
{
65-
return Result(eventEnd.removeListener<Self, &Self::onEndCloseDescriptor>(*this));
56+
Result res = Internal::closeDescriptor(request);
57+
if (not res)
58+
{
59+
emitError(res);
60+
}
6661
}
6762
}
6863

@@ -89,10 +84,10 @@ template <typename AsyncReadRequest>
8984
void SC::AsyncRequestReadableStream<AsyncReadRequest>::afterRead(typename AsyncReadRequest::Result& result,
9085
AsyncBufferView::ID bufferID)
9186
{
92-
SC_ASSERT_RELEASE(request.isFree());
9387
Span<char> data;
9488
if (result.get(data))
9589
{
90+
SC_ASSERT_RELEASE(request.isFree());
9691
if (Internal::isEnded(result))
9792
{
9893
getBuffersPool().unrefBuffer(bufferID);
@@ -157,29 +152,24 @@ template <typename AsyncWriteRequest>
157152
SC::AsyncRequestWritableStream<AsyncWriteRequest>::AsyncRequestWritableStream()
158153
{
159154
AsyncWritableStream::asyncWrite.bind<AsyncRequestWritableStream, &AsyncRequestWritableStream::write>(*this);
155+
(void)AsyncWritableStream::eventFinish
156+
.addListener<AsyncRequestWritableStream, &AsyncRequestWritableStream::onFinishStopRequest>(*this);
160157
}
161158

162159
template <typename AsyncWriteRequest>
163-
void SC::AsyncRequestWritableStream<AsyncWriteRequest>::onEndCloseDescriptor()
160+
void SC::AsyncRequestWritableStream<AsyncWriteRequest>::onFinishStopRequest()
164161
{
165-
Result res = Internal::closeDescriptor(request);
166-
if (not res)
162+
if (not request.isFree())
167163
{
168-
emitError(res);
164+
request.stop(*eventLoop);
169165
}
170-
}
171-
172-
template <typename AsyncWriteRequest>
173-
SC::Result SC::AsyncRequestWritableStream<AsyncWriteRequest>::registerAutoCloseDescriptor(bool value)
174-
{
175-
using Self = AsyncRequestWritableStream;
176-
if (value)
166+
if (autoCloseDescriptor)
177167
{
178-
return Result(eventFinish.addListener<Self, &Self::onEndCloseDescriptor>(*this));
179-
}
180-
else
181-
{
182-
return Result(eventFinish.removeListener<Self, &Self::onEndCloseDescriptor>(*this));
168+
Result res = Internal::closeDescriptor(request);
169+
if (not res)
170+
{
171+
emitError(res);
172+
}
183173
}
184174
}
185175

Libraries/AsyncStreams/AsyncRequestStreams.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@ struct AsyncRequestReadableStream : public AsyncReadableStream
1313
{
1414
AsyncRequestReadableStream();
1515

16-
/// @brief Registers or unregisters a listener to AsyncReadableStream::eventEnd to close descriptor
17-
Result registerAutoCloseDescriptor(bool value);
16+
/// @brief Automatically closes descriptor during read stream close event
17+
void setAutoCloseDescriptor(bool value) { autoCloseDescriptor = value; }
1818

1919
AsyncRequestType request; /// AsyncFileRead / AsyncFileWrite / AsyncSocketReceive / AsyncSocketSend
2020
protected:
21-
AsyncEventLoop* eventLoop = nullptr;
2221
struct Internal;
22+
AsyncEventLoop* eventLoop = nullptr;
23+
24+
bool autoCloseDescriptor = false;
2325

2426
Result read();
2527

2628
void afterRead(typename AsyncRequestType::Result& result, AsyncBufferView::ID bufferID);
27-
void onEndCloseDescriptor();
29+
void onCloseStopRequest();
2830
};
2931

3032
template <typename AsyncRequestType>
@@ -36,20 +38,22 @@ struct AsyncRequestWritableStream : public AsyncWritableStream
3638
Result init(AsyncBuffersPool& buffersPool, Span<Request> requests, AsyncEventLoop& eventLoop,
3739
const DescriptorType& descriptor);
3840

39-
/// @brief Registers or unregisters a listener to AsyncWritableStream::eventFinish to close descriptor
40-
Result registerAutoCloseDescriptor(bool value);
41+
/// @brief Automatically closes descriptor during write stream finish event
42+
void setAutoCloseDescriptor(bool value) { autoCloseDescriptor = value; }
4143

4244
AsyncRequestType request; /// AsyncFileRead / AsyncFileWrite / AsyncSocketReceive / AsyncSocketSend
4345

4446
protected:
45-
AsyncEventLoop* eventLoop = nullptr;
4647
struct Internal;
48+
AsyncEventLoop* eventLoop = nullptr;
49+
50+
bool autoCloseDescriptor = false;
4751

4852
Function<void(AsyncBufferView::ID)> callback;
4953

5054
Result write(AsyncBufferView::ID bufferID, Function<void(AsyncBufferView::ID)> cb);
5155

52-
void onEndCloseDescriptor();
56+
void onFinishStopRequest();
5357
};
5458

5559
/// @brief Uses an SC::AsyncFileRead to stream data from a file

Tests/Libraries/AsyncStreams/AsyncRequestStreamsTest.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -379,8 +379,8 @@ void SC::AsyncRequestStreamsTest::fileCompressRemote(AsyncEventLoop& eventLoop,
379379
{
380380
SC_TEST_EXPECT(writeSideStream.request.executeOn(writeStreamTask, streamPool));
381381
}
382-
SC_TEST_EXPECT(writeSideStream.registerAutoCloseDescriptor(true));
383-
writeSide.detach(); // Taken care by registerAutoCloseDescriptor(true)
382+
writeSideStream.setAutoCloseDescriptor(true);
383+
writeSide.detach(); // Taken care by setAutoCloseDescriptor(true)
384384

385385
// Create Readable Socket Stream
386386
READABLE_TYPE readSideStream;
@@ -393,8 +393,8 @@ void SC::AsyncRequestStreamsTest::fileCompressRemote(AsyncEventLoop& eventLoop,
393393
{
394394
SC_TEST_EXPECT(readSideStream.request.executeOn(readStreamTask, streamPool));
395395
}
396-
SC_TEST_EXPECT(readSideStream.registerAutoCloseDescriptor(true));
397-
readSide.detach(); // Taken care by registerAutoCloseDescriptor(true)
396+
readSideStream.setAutoCloseDescriptor(true);
397+
readSide.detach(); // Taken care by setAutoCloseDescriptor(true)
398398
(void)readSideStream.eventError.addListener([this](Result res) { SC_TEST_EXPECT(res); });
399399

400400
AsyncWritableStream::Request writeFileRequests[numberOfBuffers2 + 1];

0 commit comments

Comments
 (0)