Skip to content

Commit

Permalink
Implement max-blocking setting
Browse files Browse the repository at this point in the history
Summary:
Per the latest draft, the decoder can set a limit for the maximum number of blocked streams that the encoder must respect.  Track the number of outstandning vulnerable streams and do not permit references it the limit has been exceeded.

Note that the implementation currently tracks vulnerable *blocks*, not *streams*, so it will be generally more conservative than what the decoder asks for.

Reviewed By: dddmello

Differential Revision: D8000817

fbshipit-source-id: 386406bdd212c56fa99cd02189414e0176a77645
  • Loading branch information
afrind authored and facebook-github-bot committed May 25, 2018
1 parent 84d031f commit 09cb130
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 44 deletions.
5 changes: 4 additions & 1 deletion proxygen/lib/http/codec/compress/HPACKConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const uint8_t Q_INDEXED_STATIC = 0x40;
const uint8_t Q_INSERT_NAME_REF_STATIC = 0x40;
const uint8_t Q_LITERAL_STATIC = 0x10;

const uint32_t kDefaultBlocking = 100;

const uint32_t kTableSize = 4096;

const uint8_t NBIT_MASKS[9] = {
Expand Down Expand Up @@ -86,7 +88,8 @@ enum class DecodeError : uint8_t {
TIMEOUT = 9,
CANCELLED = 10,
BAD_SEQUENCE_NUMBER = 11,
INVALID_ACK = 12
INVALID_ACK = 12,
TOO_MANY_BLOCKING = 13
};

std::ostream& operator<<(std::ostream& os, DecodeError err);
Expand Down
5 changes: 0 additions & 5 deletions proxygen/lib/http/codec/compress/HPACKEncodeBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ void HPACKEncodeBuffer::addHeadroom(uint32_t headroom) {
bufQueue_.append(std::move(buf));
}

uint32_t HPACKEncodeBuffer::appendSequenceNumber(uint16_t seqn) {
buf_.writeBE<uint16_t>(seqn);
return sizeof(uint16_t);
}

void HPACKEncodeBuffer::append(uint8_t byte) {
buf_.push(&byte, 1);
}
Expand Down
2 changes: 0 additions & 2 deletions proxygen/lib/http/codec/compress/HPACKEncodeBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class HPACKEncodeBuffer {
*/
void addHeadroom(uint32_t bytes);

uint32_t appendSequenceNumber(uint16_t seqn);

/**
* Encode the integer value using variable-length layout and the given
* instruction using an nbit prefix. Per the spec, prefix is the portion
Expand Down
8 changes: 8 additions & 0 deletions proxygen/lib/http/codec/compress/QPACKCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ class QPACKCodec : public HeaderCodec {
return decoder_.getQueuedBytes();
}

void setMaxVulnerable(uint32_t maxVulnerable) {
encoder_.setMaxVulnerable(maxVulnerable);
}

void setMaxBlocking(uint32_t maxBlocking) {
decoder_.setMaxBlocking(maxBlocking);
}

protected:
QPACKEncoder encoder_;
QPACKDecoder decoder_;
Expand Down
23 changes: 15 additions & 8 deletions proxygen/lib/http/codec/compress/QPACKDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ void QPACKDecoder::decodeStreaming(
if (largestReference > table_.getBaseIndex()) {
VLOG(5) << "largestReference=" << largestReference << " > baseIndex=" <<
table_.getBaseIndex() << ", queuing";
folly::IOBufQueue q;
q.append(std::move(block));
q.trimStart(dbuf.consumedBytes());
enqueueHeaderBlock(largestReference, baseIndex_,
dbuf.consumedBytes(), q.move(),
totalBytes - dbuf.consumedBytes(), streamingCb);
if (queue_.size() >= maxBlocking_) {
VLOG(2) << "QPACK queue is full size=" << queue_.size()
<< " maxBlocking_=" << maxBlocking_;
err_ = HPACK::DecodeError::TOO_MANY_BLOCKING;
completeDecode(streamingCb, 0, 0);
} else {
folly::IOBufQueue q;
q.append(std::move(block));
q.trimStart(dbuf.consumedBytes());
enqueueHeaderBlock(largestReference, baseIndex_,
dbuf.consumedBytes(), q.move(),
totalBytes - dbuf.consumedBytes(), streamingCb);
}
} else {
decodeStreamingImpl(0, dbuf, streamingCb);
}
Expand All @@ -43,15 +50,15 @@ uint32_t QPACKDecoder::handleBaseIndex(HPACKDecodeBuffer& dbuf) {
uint32_t largestReference;
err_ = dbuf.decodeInteger(largestReference);
if (err_ != HPACK::DecodeError::NONE) {
LOG(ERROR) << "Decode error decoding baseIndex err_=" << err_;
LOG(ERROR) << "Decode error decoding largest reference err_=" << err_;
return 0;
}
VLOG(5) << "Decoded largestReference=" << largestReference;
uint32_t delta = 0;
bool neg = dbuf.peek() & HPACK::Q_DELTA_BASE_NEG;
err_ = dbuf.decodeInteger(HPACK::Q_DELTA_BASE.prefixLength, delta);
if (err_ != HPACK::DecodeError::NONE) {
LOG(ERROR) << "Decode error decoding depends_=" << err_;
LOG(ERROR) << "Decode error decoding delta base=" << err_;
return 0;
}
if (neg) {
Expand Down
5 changes: 5 additions & 0 deletions proxygen/lib/http/codec/compress/QPACKDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class QPACKDecoder : public HPACKDecoderBase,
return queuedBytes_;
}

void setMaxBlocking(uint32_t maxBlocking) {
maxBlocking_ = maxBlocking;
}

private:
bool isValid(bool isStatic, uint32_t index, bool aboveBase);

Expand Down Expand Up @@ -110,6 +114,7 @@ class QPACKDecoder : public HPACKDecoderBase,

void drainQueue();

uint32_t maxBlocking_{HPACK::kDefaultBlocking};
uint32_t baseIndex_{0};
uint32_t holBlockCount_{0};
uint64_t queuedBytes_{0};
Expand Down
52 changes: 31 additions & 21 deletions proxygen/lib/http/codec/compress/QPACKEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ QPACKEncoder::encode(const vector<HPACKHeader>& headers,

QPACKEncoder::EncodeResult
QPACKEncoder::encodeQ(const vector<HPACKHeader>& headers, uint64_t streamId) {
auto& blockRefList = outstanding_[streamId];
blockRefList.emplace_back();
blockReferences_ = &blockRefList.back();
auto& outstandingBlocks = outstanding_[streamId];
outstandingBlocks.emplace_back();
curOutstanding_ = &outstandingBlocks.back();
auto baseIndex = table_.getBaseIndex();

uint32_t largestReference = 0;
Expand Down Expand Up @@ -67,8 +67,12 @@ QPACKEncoder::encodeQ(const vector<HPACKHeader>& headers, uint64_t streamId) {
<< largestReference;
outstandingControl_.push_back(table_.getBaseIndex());
}
// blockReferences_ could be empty, if the block encodes only static
// curOutstanding_.references could be empty, if the block encodes only static
// headers and/or literals
if (curOutstanding_->vulnerable) {
DCHECK(allowVulnerable());
numVulnerable_++;
}

return { std::move(controlBuf), std::move(streamBuffer) };
}
Expand All @@ -86,7 +90,7 @@ void QPACKEncoder::encodeHeaderQ(

bool indexable = shouldIndex(header);
if (indexable) {
index = table_.getIndex(header, allowVulnerable_);
index = table_.getIndex(header, allowVulnerable());
if (index == QPACKHeaderTable::UNACKED) {
index = 0;
indexable = false;
Expand All @@ -96,6 +100,7 @@ void QPACKEncoder::encodeHeaderQ(
// dynamic reference
bool duplicated = false;
std::tie(duplicated, index) = maybeDuplicate(index);
// index is now 0 or absolute
indexable &= (duplicated && index == 0);
}
if (index == 0) {
Expand All @@ -110,7 +115,7 @@ void QPACKEncoder::encodeHeaderQ(
if (indexable && table_.canIndex(header)) {
encodeInsertQ(header, isStaticName, nameIndex);
CHECK(table_.add(header));
if (allowVulnerable_) {
if (allowVulnerable()) {
index = table_.getBaseIndex();
} else {
index = 0;
Expand Down Expand Up @@ -143,7 +148,7 @@ bool QPACKEncoder::shouldIndex(const HPACKHeader& header) const {

std::pair<bool, uint32_t> QPACKEncoder::maybeDuplicate(
uint32_t relativeIndex) {
auto res = table_.maybeDuplicate(relativeIndex, allowVulnerable_);
auto res = table_.maybeDuplicate(relativeIndex, allowVulnerable());
if (res.first) {
VLOG(4) << "Encoded duplicate index=" << relativeIndex;
encodeDuplicate(relativeIndex);
Expand All @@ -158,7 +163,7 @@ std::tuple<bool, uint32_t, uint32_t> QPACKEncoder::getNameIndexQ(
bool isStatic = true;
if (nameIndex == 0) {
// check dynamic table
nameIndex = table_.nameIndex(headerName, allowVulnerable_);
nameIndex = table_.nameIndex(headerName, allowVulnerable());
if (nameIndex != 0) {
absoluteNameIndex = maybeDuplicate(nameIndex).second;
if (absoluteNameIndex) {
Expand All @@ -178,13 +183,9 @@ void QPACKEncoder::encodeStreamLiteralQ(
const HPACKHeader& header, bool isStaticName, uint32_t nameIndex,
uint32_t absoluteNameIndex, uint32_t baseIndex, uint32_t* largestReference) {
if (absoluteNameIndex > 0) {
// Dynamic name reference
if (absoluteNameIndex > baseIndex && !allowVulnerable_) {
// Disallowed vulnerable reference
absoluteNameIndex = 0;
} else {
trackReference(absoluteNameIndex, largestReference);
}
// Dynamic name reference, vulnerability checks already done
CHECK(absoluteNameIndex <= baseIndex || allowVulnerable());
trackReference(absoluteNameIndex, largestReference);
}
if (absoluteNameIndex > baseIndex) {
encodeLiteralQ(header,
Expand All @@ -204,11 +205,14 @@ void QPACKEncoder::encodeStreamLiteralQ(
void QPACKEncoder::trackReference(uint32_t absoluteIndex,
uint32_t* largestReference) {
CHECK_NE(absoluteIndex, 0);
CHECK(curOutstanding_);
if (absoluteIndex > *largestReference) {
*largestReference = absoluteIndex;
if (table_.isVulnerable(absoluteIndex)) {
curOutstanding_->vulnerable = true;
}
}
CHECK(blockReferences_);
auto res = blockReferences_->insert(absoluteIndex);
auto res = curOutstanding_->references.insert(absoluteIndex);
if (res.second) {
VLOG(5) << "Bumping refcount for absoluteIndex=" << absoluteIndex;
table_.addRef(absoluteIndex);
Expand Down Expand Up @@ -289,20 +293,26 @@ HPACK::DecodeError QPACKEncoder::onHeaderAck(uint64_t streamId, bool all) {
VLOG(5) << "onHeaderAck streamId=" << streamId;
if (all) {
// Happens when a stream is reset
for (auto& references: it->second) {
for (auto i: references) {
for (auto& block: it->second) {
for (auto i: block.references) {
table_.subRef(i);
}
if (block.vulnerable) {
numVulnerable_--;
}
}
it->second.clear();
} else {
auto references = std::move(it->second.front());
auto block = std::move(it->second.front());
it->second.pop_front();
// a different stream, sub all the references
for (auto i: references) {
for (auto i: block.references) {
VLOG(5) << "Decrementing refcount for absoluteIndex=" << i;
table_.subRef(i);
}
if (block.vulnerable) {
numVulnerable_--;
}
}
if (it->second.empty()) {
outstanding_.erase(it);
Expand Down
19 changes: 16 additions & 3 deletions proxygen/lib/http/codec/compress/QPACKEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,15 @@ class QPACKEncoder : public HPACKEncoderBase, public QPACKContext {
HPACKEncoderBase::setHeaderTableSize(table_, size);
}

void setMaxVulnerable(uint32_t maxVulnerable) {
maxVulnerable_ = maxVulnerable;
}

private:
bool allowVulnerable() const {
return numVulnerable_ < maxVulnerable_;
}

bool shouldIndex(const HPACKHeader& header) const;

void encodeControl(const HPACKHeader& header);
Expand Down Expand Up @@ -99,11 +107,16 @@ class QPACKEncoder : public HPACKEncoderBase, public QPACKContext {
// List of highest index in control
std::list<uint32_t> outstandingControl_;
using BlockReferences = std::set<uint32_t>;
struct OutstandingBlock {
BlockReferences references;
bool vulnerable{false};
};
// Map streamID -> list of table index references for each outstanding block;
std::unordered_map<uint64_t, std::list<BlockReferences>> outstanding_;
BlockReferences* blockReferences_{nullptr};
std::unordered_map<uint64_t, std::list<OutstandingBlock>> outstanding_;
OutstandingBlock* curOutstanding_{nullptr};
uint32_t maxDepends_{0};
bool allowVulnerable_{true};
uint32_t maxVulnerable_{HPACK::kDefaultBlocking};
uint32_t numVulnerable_{0};
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ CompressionScheme* CompressionSimulator::getScheme(StringPiece domain) {
unique_ptr<CompressionScheme> CompressionSimulator::makeScheme() {
switch (params_.type) {
case SchemeType::QPACK:
return make_unique<QPACKScheme>(this, params_.tableSize);
return make_unique<QPACKScheme>(this, params_.tableSize,
params_.maxBlocking);
case SchemeType::QMIN:
return make_unique<QMINScheme>(this, params_.tableSize);
case SchemeType::HPACK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct SimParams {
bool blend;
bool samePacketCompression;
uint32_t tableSize;
uint32_t maxBlocking;
};

struct SimStats {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ DEFINE_int32(ooo_thresh, 0, "First seqn to allow ooo");
DEFINE_int32(table_size, 4096, "HPACK dynamic table size");
DEFINE_int64(seed, 0, "RNG seed");
DEFINE_bool(blend, true, "Blend all facebook.com and fbcdn.net domains");
DEFINE_int32(max_blocking, 100,
"Maximum number of vulnerable/blocking header blocks");
DEFINE_bool(same_packet_compression,
true,
"Allow QPACK to compress across "
Expand Down Expand Up @@ -72,7 +74,8 @@ int main(int argc, char* argv[]) {
uint16_t(FLAGS_ooo_thresh),
FLAGS_blend,
FLAGS_same_packet_compression,
uint32_t(FLAGS_table_size)};
uint32_t(FLAGS_table_size),
uint32_t(FLAGS_max_blocking)};
CompressionSimulator sim(p);
if (sim.readInputFromFileAndSchedule(FLAGS_input)) {
sim.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ namespace proxygen { namespace compress {

class QPACKScheme : public CompressionScheme {
public:
explicit QPACKScheme(CompressionSimulator* sim, uint32_t tableSize)
explicit QPACKScheme(CompressionSimulator* sim, uint32_t tableSize,
uint32_t maxBlocking)
: CompressionScheme(sim) {
client_.setHeaderIndexingStrategy(NoPathIndexingStrategy::getInstance());
server_.setHeaderIndexingStrategy(NoPathIndexingStrategy::getInstance());
client_.setEncoderHeaderTableSize(tableSize);
server_.setDecoderHeaderTableMaxSize(tableSize);
client_.setMaxVulnerable(maxBlocking);
server_.setMaxBlocking(maxBlocking);
}

~QPACKScheme() {
Expand Down
Loading

0 comments on commit 09cb130

Please sign in to comment.