Skip to content

Added new QUIC_STREAM_EVENT: QUIC_STREAM_EVENT_COPIED_TO_FRAME #5094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/api/QUIC_STREAM_EVENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ typedef enum QUIC_STREAM_EVENT_TYPE {
QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE = 8,
QUIC_STREAM_EVENT_PEER_ACCEPTED = 9,
QUIC_STREAM_EVENT_CANCEL_ON_LOSS = 10,
QUIC_STREAM_EVENT_COPIED_TO_FRAME = 11,
} QUIC_STREAM_EVENT_TYPE;
```

Expand Down Expand Up @@ -69,6 +70,13 @@ typedef struct QUIC_STREAM_EVENT {
struct {
/* out */ QUIC_UINT62 ErrorCode;
} CANCEL_ON_LOSS;
struct {
uint64_t BytesCopied;
/* in */ uint64_t *BytesCopiedBeforeNextEvent; // Minimum number of send data bytes
// need to be copied before the
// next event is signalled
void* ClientSendContext; // Identical to ClientContext in SEND_COMPLETE
} COPIED_TO_FRAME;
};
} QUIC_STREAM_EVENT;
```
Expand Down Expand Up @@ -261,6 +269,21 @@ The application can supply an error code in this struct to be sent to the peer.

The application can set this 62 bit error code to communicate to the peer about the stream shutdown, which is received by the peer as a `QUIC_STREAM_EVENT_PEER_SEND_ABORTED` event on its stream object.

## QUIC_STREAM_EVENT_COPIED_TO_FRAME

This event is raised when a stream writes bytes from a send request to a data frame. The event is always triggered on first write to frame, the application decides when (or if)
it should trigger the next time by setting `BytesCopiedBeforeNextEvent` in the callback.

`BytesCopied`

The number of bytes written

`BytesCopiedBeforeNextEvent`

Number of bytes copied from the send request before the next `QUIC_STREAM_EVENT_COPIED_TO_FRAME` is raised, defaults to a very high value and consequently the next event is never
triggered.


# See Also

[Streams](../Streams.md)<br>
Expand Down
1 change: 1 addition & 0 deletions docs/api/StreamSend.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Value | Meaning
**QUIC_SEND_FLAG_DELAY_SEND**<br>16 | Provides a hint to MsQuic to indicate the data does not need to be sent immediately, likely because more is soon to follow.
**QUIC_SEND_FLAG_CANCEL_ON_LOSS**<br>32 | Informs MsQuic to irreversibly mark the associated stream to be canceled when packet loss has been detected on it. I.e., all sends on a given stream are subject to this behavior from the moment the flag has been supplied for the first time.
**QUIC_SEND_FLAG_CANCEL_ON_BLOCKED**<br>64 | **Unused and ignored** for `StreamSend` for now
**QUIC_SEND_FLAG_EVENT_ON_FIRST_COPY_TO_FRAME**<br>128 | Informs MsQuic that the first `QUIC_STREAM_EVENT_COPIED_TO_FRAME` should be signalled to the application

`ClientSendContext`

Expand Down
10 changes: 10 additions & 0 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,11 @@
SendRequest->Flags = Flags & ~QUIC_SEND_FLAGS_INTERNAL;
SendRequest->TotalLength = TotalLength;
SendRequest->ClientContext = ClientSendContext;
if(Flags & QUIC_SEND_FLAG_EVENT_ON_FIRST_COPY_TO_FRAME) {
SendRequest->BytesToBeCopiedBeforeNextCopiedToFrameEvent = 0;
} else {

Check warning on line 1111 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1110-L1111

Added lines #L1110 - L1111 were not covered by tests
SendRequest->BytesToBeCopiedBeforeNextCopiedToFrameEvent = UINT64_MAX;
}

#pragma warning(push)
#pragma warning(disable:6240) // CXPLAT_AT_DISPATCH only really does anything for kernel mode
Expand Down Expand Up @@ -1859,6 +1864,11 @@
SendRequest->Flags = Flags;
SendRequest->TotalLength = TotalLength;
SendRequest->ClientContext = ClientSendContext;
if(Flags & QUIC_SEND_FLAG_EVENT_ON_FIRST_COPY_TO_FRAME) {
SendRequest->BytesToBeCopiedBeforeNextCopiedToFrameEvent = 0;
} else {

Check warning on line 1869 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1868-L1869

Added lines #L1868 - L1869 were not covered by tests
SendRequest->BytesToBeCopiedBeforeNextCopiedToFrameEvent = UINT64_MAX;
}

Status = QuicDatagramQueueSend(&Connection->Datagram, SendRequest);

Expand Down
6 changes: 6 additions & 0 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ typedef struct QUIC_SEND_REQUEST {
//
void* ClientContext;

//
// The number of bytes to be copied into a frame before signalling the next
// QUIC_STREAM_EVENT_COPIED_TO_FRAME event.
//
uint64_t BytesToBeCopiedBeforeNextCopiedToFrameEvent;

} QUIC_SEND_REQUEST;

//
Expand Down
15 changes: 15 additions & 0 deletions src/core/stream_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,21 @@
uint16_t CopyLength = Len < BufferLeft ? Len : (uint16_t)BufferLeft;
CXPLAT_DBG_ASSERT(CopyLength > 0);
CxPlatCopyMemory(Buf, Req->Buffers[CurIndex].Buffer + CurOffset, CopyLength);

if (Req->BytesToBeCopiedBeforeNextCopiedToFrameEvent <= CopyLength) {
QUIC_STREAM_EVENT Event;
uint64_t BytesCopiedBeforeNextEvent = UINT64_MAX; // Default to max, so if application does not modify it in

Check warning on line 750 in src/core/stream_send.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_send.c#L750

Added line #L750 was not covered by tests
// callback the event is never signalled again
Event.Type = QUIC_STREAM_EVENT_COPIED_TO_FRAME;
Event.COPIED_TO_FRAME.BytesCopied = CopyLength;
Event.COPIED_TO_FRAME.BytesCopiedBeforeNextEvent = &BytesCopiedBeforeNextEvent;
Event.COPIED_TO_FRAME.ClientSendContext = Req->ClientContext;
QuicStreamIndicateEvent(Stream, &Event);
Req->BytesToBeCopiedBeforeNextCopiedToFrameEvent = BytesCopiedBeforeNextEvent;
} else {

Check warning on line 758 in src/core/stream_send.c

View check run for this annotation

Codecov / codecov/patch

src/core/stream_send.c#L752-L758

Added lines #L752 - L758 were not covered by tests
Req->BytesToBeCopiedBeforeNextCopiedToFrameEvent -= CopyLength;
}

Len -= CopyLength;
Buf += CopyLength;

Expand Down
27 changes: 18 additions & 9 deletions src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,16 @@ typedef enum QUIC_RECEIVE_FLAGS {
DEFINE_ENUM_FLAG_OPERATORS(QUIC_RECEIVE_FLAGS)

typedef enum QUIC_SEND_FLAGS {
QUIC_SEND_FLAG_NONE = 0x0000,
QUIC_SEND_FLAG_ALLOW_0_RTT = 0x0001, // Allows the use of encrypting with 0-RTT key.
QUIC_SEND_FLAG_START = 0x0002, // Asynchronously starts the stream with the sent data.
QUIC_SEND_FLAG_FIN = 0x0004, // Indicates the request is the one last sent on the stream.
QUIC_SEND_FLAG_DGRAM_PRIORITY = 0x0008, // Indicates the datagram is higher priority than others.
QUIC_SEND_FLAG_DELAY_SEND = 0x0010, // Indicates the send should be delayed because more will be queued soon.
QUIC_SEND_FLAG_CANCEL_ON_LOSS = 0x0020, // Indicates that a stream is to be cancelled when packet loss is detected.
QUIC_SEND_FLAG_PRIORITY_WORK = 0x0040, // Higher priority than other connection work.
QUIC_SEND_FLAG_CANCEL_ON_BLOCKED = 0x0080, // Indicates that a frame should be dropped when it can't be sent immediately.
QUIC_SEND_FLAG_NONE = 0x0000,
QUIC_SEND_FLAG_ALLOW_0_RTT = 0x0001, // Allows the use of encrypting with 0-RTT key.
QUIC_SEND_FLAG_START = 0x0002, // Asynchronously starts the stream with the sent data.
QUIC_SEND_FLAG_FIN = 0x0004, // Indicates the request is the one last sent on the stream.
QUIC_SEND_FLAG_DGRAM_PRIORITY = 0x0008, // Indicates the datagram is higher priority than others.
QUIC_SEND_FLAG_DELAY_SEND = 0x0010, // Indicates the send should be delayed because more will be queued soon.
QUIC_SEND_FLAG_CANCEL_ON_LOSS = 0x0020, // Indicates that a stream is to be cancelled when packet loss is detected.
QUIC_SEND_FLAG_PRIORITY_WORK = 0x0040, // Higher priority than other connection work.
QUIC_SEND_FLAG_CANCEL_ON_BLOCKED = 0x0080, // Indicates that a frame should be dropped when it can't be sent immediately.
QUIC_SEND_FLAG_EVENT_ON_FIRST_COPY_TO_FRAME = 0x0100, // Event is signaled when data from the send request is copied to a frame for the first time
} QUIC_SEND_FLAGS;

DEFINE_ENUM_FLAG_OPERATORS(QUIC_SEND_FLAGS)
Expand Down Expand Up @@ -1521,6 +1522,7 @@ typedef enum QUIC_STREAM_EVENT_TYPE {
QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE = 8,
QUIC_STREAM_EVENT_PEER_ACCEPTED = 9,
QUIC_STREAM_EVENT_CANCEL_ON_LOSS = 10,
QUIC_STREAM_EVENT_COPIED_TO_FRAME = 11,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were to go forward with this model, and I'm not saying we would, this new event MUST be opt-in. We cannot deliver it to old clients that have no idea about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you suggest opt in at compile time or runtime?

I was thinking of having a new STREAM_SEND flag and delivery QUIC_STREAM_EVENT_COPIED_TO_FRAME events only if that flag is set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this under the #PREVIEW_FEATURES macro.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand the point of the PREVIEW_FEATURES macro, but will do it in a final commit once I get all the CI tests to pass

} QUIC_STREAM_EVENT_TYPE;

typedef struct QUIC_STREAM_EVENT {
Expand Down Expand Up @@ -1569,6 +1571,13 @@ typedef struct QUIC_STREAM_EVENT {
struct {
/* out */ QUIC_UINT62 ErrorCode;
} CANCEL_ON_LOSS;
struct {
uint64_t BytesCopied;
/* in */ uint64_t *BytesCopiedBeforeNextEvent; // Minimum number of send data bytes
// need to be copied before the
// next event is signalled
void* ClientSendContext; // Identical to ClientContext in SEND_COMPLETE
} COPIED_TO_FRAME;
};
} QUIC_STREAM_EVENT;

Expand Down
142 changes: 141 additions & 1 deletion src/test/lib/EventTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,145 @@ QuicTestValidateStreamEvents9(
#endif // QUIC_PARAM_STREAM_RELIABLE_OFFSET
}

void
QuicTestValidateStreamEvents10(
_In_ MsQuicRegistration& Registration,
_In_ MsQuicListener& Listener,
_In_ QuicAddr& ServerLocalAddr
)
{
// Tests for the `QUIC_STREAM_EVENT_COPIED_TO_FRAME` event
// This test is a duplicate of the previous one,
// except for StreamSend has the `QUIC_SEND_FLAG_EVENT_ON_FIRST_COPY_TO_FRAME` flag
TestScopeLogger ScopeLogger(__FUNCTION__);

#ifdef QUIC_PARAM_STREAM_RELIABLE_OFFSET
MsQuicSettings Settings;
Settings.SetPeerBidiStreamCount(1).SetMinimumMtu(1280).SetMaximumMtu(1280);
Settings.SetReliableResetEnabled(true);
MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", Settings, ServerSelfSignedCredConfig);
TEST_TRUE(ServerConfiguration.IsValid());

MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", Settings, MsQuicCredentialConfig());
TEST_TRUE(ClientConfiguration.IsValid());

{ // Connections scope
ConnValidator Client, Server(ServerConfiguration);

Listener.Context = &Server;

TEST_QUIC_SUCCEEDED(
MsQuic->ConnectionOpen(
Registration,
ConnValidatorCallback,
&Client,
&Client.Handle));

{ // Stream scope

StreamValidator ClientStream(
new(std::nothrow) StreamEventValidator* [6] {
new(std::nothrow) StreamStartCompleteEventValidator(),
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_COPIED_TO_FRAME, 0, true),
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SEND_COMPLETE, 0, true),
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE),
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE, QUIC_EVENT_ACTION_SHUTDOWN_CONNECTION),
nullptr
});
StreamValidator ServerStream(
new(std::nothrow) StreamEventValidator* [6] {
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_RECEIVE),
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_PEER_SEND_ABORTED),
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED),
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE),
new(std::nothrow) StreamEventValidator(QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE),
nullptr
});

Client.SetExpectedEvents(
new(std::nothrow) ConnEventValidator* [8] {
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_RELIABLE_RESET_NEGOTIATED),
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE),
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_DATAGRAM_STATE_CHANGED),
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_CONNECTED),
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE, 0, true),
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_RESUMPTION_TICKET_RECEIVED, 0, true), // TODO - Schannel does resumption regardless
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE),
nullptr
});
Server.SetExpectedEvents(
new(std::nothrow) ConnEventValidator* [6] {
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_RELIABLE_RESET_NEGOTIATED),
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_CONNECTED),
new(std::nothrow) NewStreamEventValidator(&ServerStream),
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER),
new(std::nothrow) ConnEventValidator(QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE),
nullptr
});

TEST_QUIC_SUCCEEDED(
MsQuic->StreamOpen(
Client.Handle,
QUIC_STREAM_OPEN_FLAG_NONE,
StreamValidatorCallback,
&ClientStream,
&ClientStream.Handle));
TEST_QUIC_SUCCEEDED(
MsQuic->StreamStart(
ClientStream.Handle,
QUIC_STREAM_START_FLAG_NONE));

TEST_QUIC_SUCCEEDED(
MsQuic->ConnectionStart(
Client.Handle,
ClientConfiguration,
QuicAddrGetFamily(&ServerLocalAddr.SockAddr),
QUIC_TEST_LOOPBACK_FOR_AF(
QuicAddrGetFamily(&ServerLocalAddr.SockAddr)),
ServerLocalAddr.GetPort()));

TEST_TRUE(Client.HandshakeComplete.WaitTimeout(1000));
CxPlatSleep(200);

uint8_t Buffer[1] = {0x1};
QUIC_BUFFER SendBuffer = { sizeof(Buffer), (uint8_t*) Buffer };
TEST_QUIC_SUCCEEDED(
MsQuic->StreamSend(
ClientStream.Handle,
&SendBuffer,
1,
QUIC_SEND_FLAG_EVENT_ON_FIRST_COPY_TO_FRAME,
nullptr));

uint64_t ReliableOffset = 1;
TEST_QUIC_SUCCEEDED(
MsQuic->SetParam(
ClientStream.Handle,
QUIC_PARAM_STREAM_RELIABLE_OFFSET,
sizeof(ReliableOffset),
&ReliableOffset
)
);

CxPlatSleep(100); // Wait for the sends to be processed

TEST_QUIC_SUCCEEDED(
MsQuic->StreamShutdown(
ClientStream.Handle,
QUIC_STREAM_SHUTDOWN_FLAG_ABORT,
0));

TEST_TRUE(Client.Complete.WaitTimeout(2000));
TEST_TRUE(Server.Complete.WaitTimeout(1000));
} // Stream scope
} // Connections scope
#else // QUIC_PARAM_STREAM_RELIABLE_OFFSET
UNREFERENCED_PARAMETER(Registration);
UNREFERENCED_PARAMETER(Listener);
UNREFERENCED_PARAMETER(ServerLocalAddr);
#endif // QUIC_PARAM_STREAM_RELIABLE_OFFSET
}

void QuicTestValidateStreamEvents(uint32_t Test)
{
MsQuicRegistration Registration(true);
Expand All @@ -1830,7 +1969,8 @@ void QuicTestValidateStreamEvents(uint32_t Test)
QuicTestValidateStreamEvents6,
QuicTestValidateStreamEvents7,
QuicTestValidateStreamEvents8,
QuicTestValidateStreamEvents9
QuicTestValidateStreamEvents9,
QuicTestValidateStreamEvents10
};

Tests[Test](Registration, Listener, ServerLocalAddr);
Expand Down
Loading