Skip to content

Commit

Permalink
Merge on insert to loss buffer
Browse files Browse the repository at this point in the history
Summary: Previously we were treating the loss buffer the same way we were the retransmission buffer. That is, each entry represented a single stream frame that was lost. There's actually no reason to do this as we treat it more like the write buffer when retransmitting. It can also lead to pathological cases where we write consecutive byte ranges as separate stream frames.

Reviewed By: yangchi

Differential Revision: D19240352

fbshipit-source-id: 94ffb397fa287688d986a4a91d531d85050d9f98
  • Loading branch information
mjoras authored and facebook-github-bot committed Jan 11, 2020
1 parent 967053b commit ec9d1cc
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 18 deletions.
20 changes: 2 additions & 18 deletions quic/loss/QuicLossFunctions.cpp
Expand Up @@ -109,15 +109,7 @@ void markPacketLoss(
*stream, frame, bufferItr->second)) {
break;
}
stream->lossBuffer.insert(
std::upper_bound(
stream->lossBuffer.begin(),
stream->lossBuffer.end(),
bufferItr->second.offset,
[](const auto& offset, const auto& buffer) {
return offset < buffer.offset;
}),
std::move(bufferItr->second));
stream->insertIntoLossBuffer(std::move(bufferItr->second));
stream->retransmissionBuffer.erase(bufferItr);
conn.streamManager->updateLossStreams(*stream);
break;
Expand All @@ -138,15 +130,7 @@ void markPacketLoss(
break;
}
DCHECK_EQ(bufferItr->second.offset, frame.offset);
cryptoStream->lossBuffer.insert(
std::upper_bound(
cryptoStream->lossBuffer.begin(),
cryptoStream->lossBuffer.end(),
bufferItr->second.offset,
[](const auto& offset, const auto& buffer) {
return offset < buffer.offset;
}),
std::move(bufferItr->second));
cryptoStream->insertIntoLossBuffer(std::move(bufferItr->second));
cryptoStream->retransmissionBuffer.erase(bufferItr);
break;
}
Expand Down
127 changes: 127 additions & 0 deletions quic/loss/test/QuicLossFunctionsTest.cpp
Expand Up @@ -347,6 +347,133 @@ TEST_F(QuicLossFunctionsTest, TestMarkPacketLoss) {
EXPECT_TRUE(eq(buf, buffer.data.move()));
}

TEST_F(QuicLossFunctionsTest, TestMarkPacketLossMerge) {
folly::EventBase evb;
MockAsyncUDPSocket socket(&evb);
auto conn = createConn();
EXPECT_CALL(*transportInfoCb_, onNewQuicStream()).Times(1);
auto stream1Id =
conn->streamManager->createNextBidirectionalStream().value()->id;
auto stream1 = conn->streamManager->findStream(stream1Id);

auto buf1 = buildRandomInputData(20);
writeDataToQuicStream(*stream1, buf1->clone(), false);
writeQuicDataToSocket(
socket,
*conn,
*conn->clientConnectionId,
*conn->serverConnectionId,
*aead,
*headerCipher,
*conn->version,
conn->transportSettings.writeConnectionDataPacketsLimit);
EXPECT_EQ(1, conn->outstandingPackets.size());

auto buf2 = buildRandomInputData(20);
writeDataToQuicStream(*stream1, buf2->clone(), false);
writeQuicDataToSocket(
socket,
*conn,
*conn->clientConnectionId,
*conn->serverConnectionId,
*aead,
*headerCipher,
*conn->version,
conn->transportSettings.writeConnectionDataPacketsLimit);
EXPECT_EQ(2, conn->outstandingPackets.size());

auto& packet1 =
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData)->packet;
auto packetNum = packet1.header.getPacketSequenceNum();
markPacketLoss(*conn, packet1, false, packetNum);
EXPECT_EQ(stream1->retransmissionBuffer.size(), 1);
EXPECT_EQ(stream1->lossBuffer.size(), 1);
auto& packet2 =
getLastOutstandingPacket(*conn, PacketNumberSpace::AppData)->packet;
packetNum = packet2.header.getPacketSequenceNum();
markPacketLoss(*conn, packet2, false, packetNum);
EXPECT_EQ(stream1->retransmissionBuffer.size(), 0);
EXPECT_EQ(stream1->lossBuffer.size(), 1);

auto combined = buf1->clone();
combined->prependChain(buf2->clone());
auto& buffer = stream1->lossBuffer.front();
EXPECT_EQ(buffer.offset, 0);
IOBufEqualTo eq;
EXPECT_TRUE(eq(combined, buffer.data.move()));
}

TEST_F(QuicLossFunctionsTest, TestMarkPacketLossNoMerge) {
folly::EventBase evb;
MockAsyncUDPSocket socket(&evb);
auto conn = createConn();
EXPECT_CALL(*transportInfoCb_, onNewQuicStream()).Times(1);
auto stream1Id =
conn->streamManager->createNextBidirectionalStream().value()->id;
auto stream1 = conn->streamManager->findStream(stream1Id);

auto buf1 = buildRandomInputData(20);
writeDataToQuicStream(*stream1, buf1->clone(), false);
writeQuicDataToSocket(
socket,
*conn,
*conn->clientConnectionId,
*conn->serverConnectionId,
*aead,
*headerCipher,
*conn->version,
conn->transportSettings.writeConnectionDataPacketsLimit);
EXPECT_EQ(1, conn->outstandingPackets.size());

auto buf2 = buildRandomInputData(20);
writeDataToQuicStream(*stream1, buf2->clone(), false);
writeQuicDataToSocket(
socket,
*conn,
*conn->clientConnectionId,
*conn->serverConnectionId,
*aead,
*headerCipher,
*conn->version,
conn->transportSettings.writeConnectionDataPacketsLimit);
EXPECT_EQ(2, conn->outstandingPackets.size());

auto buf3 = buildRandomInputData(20);
writeDataToQuicStream(*stream1, buf3->clone(), false);
writeQuicDataToSocket(
socket,
*conn,
*conn->clientConnectionId,
*conn->serverConnectionId,
*aead,
*headerCipher,
*conn->version,
conn->transportSettings.writeConnectionDataPacketsLimit);
EXPECT_EQ(3, conn->outstandingPackets.size());

auto& packet1 =
getFirstOutstandingPacket(*conn, PacketNumberSpace::AppData)->packet;
auto packetNum = packet1.header.getPacketSequenceNum();
markPacketLoss(*conn, packet1, false, packetNum);
EXPECT_EQ(stream1->retransmissionBuffer.size(), 2);
EXPECT_EQ(stream1->lossBuffer.size(), 1);
auto& packet3 =
getLastOutstandingPacket(*conn, PacketNumberSpace::AppData)->packet;
packetNum = packet3.header.getPacketSequenceNum();
markPacketLoss(*conn, packet3, false, packetNum);
EXPECT_EQ(stream1->retransmissionBuffer.size(), 1);
EXPECT_EQ(stream1->lossBuffer.size(), 2);

auto& buffer1 = stream1->lossBuffer[0];
EXPECT_EQ(buffer1.offset, 0);
IOBufEqualTo eq;
EXPECT_TRUE(eq(buf1, buffer1.data.move()));

auto& buffer3 = stream1->lossBuffer[1];
EXPECT_EQ(buffer3.offset, 40);
EXPECT_TRUE(eq(buf3, buffer3.data.move()));
}

TEST_F(QuicLossFunctionsTest, RetxBufferSortedAfterLoss) {
folly::EventBase evb;
MockAsyncUDPSocket socket(&evb);
Expand Down
22 changes: 22 additions & 0 deletions quic/state/StreamData.h
Expand Up @@ -90,6 +90,28 @@ struct QuicStreamLike {
// and RST. We may split write EOF into two values in the future.
// Read side eof offset.
folly::Optional<uint64_t> finalReadOffset;

/*
* Either insert a new entry into the loss buffer, or merge the buffer with
* an existing entry.
*/
void insertIntoLossBuffer(StreamBuffer buf) {
// We assume here that we won't try to insert an overlapping buffer, as
// that should never happen in the loss buffer.
auto lossItr = std::upper_bound(
lossBuffer.begin(),
lossBuffer.end(),
buf.offset,
[](auto offset, const auto& buffer) { return offset < buffer.offset; });
if (!lossBuffer.empty() && lossItr != lossBuffer.begin() &&
std::prev(lossItr)->offset + std::prev(lossItr)->data.chainLength() ==
buf.offset) {
std::prev(lossItr)->data.append(buf.data.move());
std::prev(lossItr)->eof = buf.eof;
} else {
lossBuffer.insert(lossItr, std::move(buf));
}
}
};

struct QuicConnectionStateBase;
Expand Down

0 comments on commit ec9d1cc

Please sign in to comment.