Skip to content

Commit

Permalink
Initial QPACK dynamic table decode
Browse files Browse the repository at this point in the history
This change adds a basic dynamic table support for QPACK decoder.
It only implements the table without the blocking. Still in many
cases it allows for much better compression than the static only
table.
  • Loading branch information
vkrasnov committed Nov 19, 2023
1 parent a48c6f7 commit 0db6df1
Show file tree
Hide file tree
Showing 9 changed files with 771 additions and 147 deletions.
5 changes: 2 additions & 3 deletions apps/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,12 +734,11 @@ fn make_h3_config(
}

if let Some(v) = qpack_max_table_capacity {
// quiche doesn't support dynamic QPACK, so clamp to 0 for now.
config.set_qpack_max_table_capacity(v.clamp(0, 0));
config.set_qpack_max_table_capacity(v);
}

if let Some(v) = qpack_blocked_streams {
// quiche doesn't support dynamic QPACK, so clamp to 0 for now.
// quiche doesn't support dynamic QPACK blocking, so clamp to 0 for now.
config.set_qpack_blocked_streams(v.clamp(0, 0));
}

Expand Down
8 changes: 4 additions & 4 deletions fuzz/src/qpack_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use quiche::h3::NameValue;
// input. However, that transformation is not guaranteed to be the identify
// function, as there are multiple ways the same hdr list could be encoded.
fuzz_target!(|data: &[u8]| {
let mut decoder = quiche::h3::qpack::Decoder::new();
let mut decoder = quiche::h3::qpack::Decoder::new(0);
let mut encoder = quiche::h3::qpack::Encoder::new();

let hdrs = match decoder.decode(&mut data.to_vec(), u64::MAX) {
let hdrs = match decoder.decode(&mut data.to_vec(), u64::MAX, 0) {
Err(_) => return,
Ok(hdrs) => hdrs,
};
Expand All @@ -25,7 +25,7 @@ fuzz_target!(|data: &[u8]| {
let encoded_size = encoder.encode(&hdrs, &mut encoded_hdrs).unwrap();

let decoded_hdrs = decoder
.decode(&encoded_hdrs[..encoded_size], u64::MAX)
.decode(&encoded_hdrs[..encoded_size], u64::MAX, 0)
.unwrap();

let mut expected_hdrs = Vec::new();
Expand All @@ -35,7 +35,7 @@ fuzz_target!(|data: &[u8]| {
for h in &hdrs {
let name = h.name().to_ascii_lowercase();

expected_hdrs.push(quiche::h3::Header::new(&name, h.value()));
expected_hdrs.push(quiche::h3::Header::new(name, h.value()));
}

assert_eq!(expected_hdrs, decoded_hdrs)
Expand Down
6 changes: 3 additions & 3 deletions quiche/examples/qpack-decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn main() {

let mut file = File::open(args.next().unwrap()).unwrap();

let mut dec = qpack::Decoder::new();
let mut dec = qpack::Decoder::new(0);

loop {
let mut stream_id: [u8; 8] = [0; 8];
Expand All @@ -72,11 +72,11 @@ fn main() {
debug!("Got stream={} len={}", stream_id, len);

if stream_id == 0 {
dec.control(&mut data[..len]).unwrap();
dec.control(&data[..len]).unwrap();
continue;
}

for hdr in dec.decode(&data[..len], u64::MAX).unwrap() {
for hdr in dec.decode(&data[..len], u64::MAX, 0).unwrap() {
let name = std::str::from_utf8(hdr.name()).unwrap();
let value = std::str::from_utf8(hdr.value()).unwrap();
println!("{name}\t{value}");
Expand Down
117 changes: 73 additions & 44 deletions quiche/src/h3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ use qlog::events::EventImportance;
#[cfg(feature = "qlog")]
use qlog::events::EventType;

use crate::Shutdown;

/// List of ALPN tokens of supported HTTP/3 versions.
///
/// This can be passed directly to the [`Config::set_application_protos()`]
Expand Down Expand Up @@ -570,6 +572,11 @@ pub trait NameValue {

/// Returns the object's value.
fn value(&self) -> &[u8];

/// Return the qpack cost of the pair
fn qpack_cost(&self) -> u64 {
self.name().len() as u64 + self.value().len() as u64 + 32
}
}

impl<N, V> NameValue for (N, V)
Expand Down Expand Up @@ -611,8 +618,11 @@ impl Header {
/// Creates a new header.
///
/// Both `name` and `value` will be cloned.
pub fn new(name: &[u8], value: &[u8]) -> Self {
Self(name.to_vec(), value.to_vec())
pub fn new<N, V>(name: N, value: V) -> Self
where
Vec<u8>: From<N> + From<V>,
{
Self(name.into(), value.into())
}
}

Expand Down Expand Up @@ -881,7 +891,9 @@ impl Connection {
peer_control_stream_id: None,

qpack_encoder: qpack::Encoder::new(),
qpack_decoder: qpack::Decoder::new(),
qpack_decoder: qpack::Decoder::new(
config.qpack_max_table_capacity.unwrap_or(0),
),

local_qpack_streams: QpackStreams {
encoder_stream_id: None,
Expand Down Expand Up @@ -1590,6 +1602,25 @@ impl Connection {
};
}

// Send any outstanding QPACK decoder instructions
if let Some(stream_id) = self.local_qpack_streams.decoder_stream_id {
let mut buf = [0u8; 64];

while self.qpack_decoder.has_instructions() {
let cap = conn.stream_capacity(stream_id)?;
if cap == 0 {
break;
}

let n = self.qpack_decoder.emit_instructions(&mut buf);
if n == 0 {
break;
}

conn.stream_send(stream_id, &buf[..n], false)?;
}
}

// Process finished streams list.
if let Some(finished) = self.finished_streams.pop_front() {
return Ok((finished, Event::Finished));
Expand All @@ -1606,8 +1637,10 @@ impl Connection {

// Return early if the stream was reset, to avoid returning
// a Finished event later as well.
Err(Error::TransportError(crate::Error::StreamReset(e))) =>
return Ok((s, Event::Reset(e))),
Err(Error::TransportError(crate::Error::StreamReset(e))) => {
self.qpack_decoder.cancel_stream(s);
return Ok((s, Event::Reset(e)));
},

Err(e) => return Err(e),
};
Expand Down Expand Up @@ -2296,8 +2329,19 @@ impl Connection {
return Ok((stream_id, Event::Data));
},

stream::State::QpackInstruction => {
let mut d = [0; 4096];
stream::State::QpackEncoderInstruction => {
let mut d = [0; 1024];

loop {
let (n, _) = conn.stream_recv(stream_id, &mut d)?;
self.qpack_decoder
.control(&d[..n])
.map_err(|_| Error::QpackDecompressionFailed)?;
}
},

stream::State::QpackDecoderInstruction => {
let mut d = [0; 1024];

// Read data from the stream and discard immediately.
loop {
Expand Down Expand Up @@ -2423,10 +2467,11 @@ impl Connection {
.max_field_section_size
.unwrap_or(u64::MAX);

let headers = match self
.qpack_decoder
.decode(&header_block[..], max_size)
{
let headers = match self.qpack_decoder.decode(
&header_block[..],
max_size,
stream_id,
) {
Ok(v) => v,

Err(e) => {
Expand Down Expand Up @@ -2713,6 +2758,23 @@ impl Connection {

Err(Error::Done)
}

/// Shuts down reading or writing from/to the specified stream. This method
/// should be preferred over the equivalent `quiche::stream_shutdown`
/// method, as the other method may prevent evictions from the QPACK
/// decoder.
pub fn stream_shutdown(
&mut self, conn: &mut super::Connection, stream_id: u64,
direction: Shutdown, err: u64,
) -> Result<()> {
let is_read = direction == Shutdown::Read;
conn.stream_shutdown(stream_id, direction, err)?;
if is_read {
self.qpack_decoder.cancel_stream(stream_id);
}

Ok(())
}
}

/// Generates an HTTP/3 GREASE variable length integer.
Expand Down Expand Up @@ -4471,39 +4533,6 @@ mod tests {
assert!(qpack_stream_closed);
}

#[test]
/// Client sends QPACK data.
fn qpack_data() {
// TODO: QPACK instructions are ignored until dynamic table support is
// added so we just test that the data is safely ignored.
let mut s = Session::new().unwrap();
s.handshake().unwrap();

let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
let d = [0; 20];

s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
s.advance().ok();

s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
s.advance().ok();

loop {
match s.server.poll(&mut s.pipe.server) {
Ok(_) => (),

Err(Error::Done) => {
break;
},

Err(_) => {
panic!();
},
}
}
}

#[test]
/// Tests limits for the stream state buffer maximum size.
fn max_state_buf_size() {
Expand Down

0 comments on commit 0db6df1

Please sign in to comment.