Skip to content

Commit

Permalink
Add body_bytes_read() method to Session
Browse files Browse the repository at this point in the history
Add an API to retrieve request body bytes read so far.
  • Loading branch information
drcaramelsyrup authored and eaufavor committed Apr 22, 2024
1 parent 01c6965 commit cfb86c3
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
d87a50338cb6483a3b0cbfe73d3b0491a336b7a2
c0a9b66dfde7c081c6f9153677af66f619aee158
10 changes: 9 additions & 1 deletion pingora-core/src/protocols/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,22 @@ impl Session {
}
}

/// How many response body bytes already sent
/// Return how many response body bytes (application, not wire) already sent downstream
pub fn body_bytes_sent(&self) -> usize {
match self {
Self::H1(s) => s.body_bytes_sent(),
Self::H2(s) => s.body_bytes_sent(),
}
}

/// Return how many request body bytes (application, not wire) already read from downstream
pub fn body_bytes_read(&self) -> usize {
match self {
Self::H1(s) => s.body_bytes_read(),
Self::H2(s) => s.body_bytes_read(),
}
}

/// Return the client (peer) address of the connnection.
pub fn client_addr(&self) -> Option<&SocketAddr> {
match self {
Expand Down
7 changes: 4 additions & 3 deletions pingora-core/src/protocols/http/v1/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ pub struct HttpSession {
pub(crate) digest: Box<Digest>,
response_header: Option<Box<ResponseHeader>>,
request_written: Option<Box<RequestHeader>>,
bytes_sent: usize,
// request body bytes written to upstream
body_bytes_sent: usize,
upgraded: bool,
}

Expand All @@ -77,10 +78,10 @@ impl HttpSession {
keepalive_timeout: KeepaliveStatus::Off,
response_header: None,
request_written: None,
body_bytes_sent: 0,
read_timeout: None,
write_timeout: None,
digest,
bytes_sent: 0,
upgraded: false,
}
}
Expand Down Expand Up @@ -127,7 +128,7 @@ impl HttpSession {
.await;

if let Ok(Some(num_bytes)) = written {
self.bytes_sent += num_bytes;
self.body_bytes_sent += num_bytes;
}

written
Expand Down
73 changes: 43 additions & 30 deletions pingora-core/src/protocols/http/v1/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct HttpSession {
body_write_buf: BytesMut,
/// Track how many application (not on the wire) body bytes already sent
body_bytes_sent: usize,
/// Track how many application (not on the wire) body bytes already read
body_bytes_read: usize,
/// Whether to update headers like connection, Date
update_resp_headers: bool,
/// timeouts:
Expand All @@ -61,7 +63,7 @@ pub struct HttpSession {
write_timeout: Option<Duration>,
/// A copy of the response that is already written to the client
response_written: Option<Box<ResponseHeader>>,
/// The parse request header
/// The parsed request header
request_header: Option<Box<RequestHeader>>,
/// An internal buffer that holds a copy of the request body up to a certain size
retry_buffer: Option<FixedBuffer>,
Expand Down Expand Up @@ -100,6 +102,7 @@ impl HttpSession {
read_timeout: None,
write_timeout: None,
body_bytes_sent: 0,
body_bytes_read: 0,
retry_buffer: None,
upgraded: false,
digest,
Expand Down Expand Up @@ -338,6 +341,7 @@ impl HttpSession {
let read = self.read_body().await?;
Ok(read.map(|b| {
let bytes = Bytes::copy_from_slice(self.get_body(&b));
self.body_bytes_read += bytes.len();
if let Some(buffer) = self.retry_buffer.as_mut() {
buffer.write_to_buffer(&bytes);
}
Expand Down Expand Up @@ -627,11 +631,16 @@ impl HttpSession {
Ok(res)
}

/// Return how many (application, not wire) body bytes that have been written
/// Return how many response body bytes (application, not wire) already sent downstream
pub fn body_bytes_sent(&self) -> usize {
self.body_bytes_sent
}

/// Return how many request body bytes (application, not wire) already read from downstream
pub fn body_bytes_read(&self) -> usize {
self.body_bytes_read
}

fn is_chunked_encoding(&self) -> bool {
is_header_value_chunked_encoding(self.get_header(header::TRANSFER_ENCODING))
}
Expand Down Expand Up @@ -1088,10 +1097,10 @@ mod tests_stream {
.build();
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.read_request().await.unwrap();
let res = http_stream.read_body().await.unwrap().unwrap();
assert_eq!(res, BufRef::new(0, 3));
let res = http_stream.read_body_bytes().await.unwrap().unwrap();
assert_eq!(res, input3.as_slice());
assert_eq!(http_stream.body_reader.body_state, ParseState::Complete(3));
assert_eq!(input3, http_stream.get_body(&res));
assert_eq!(http_stream.body_bytes_read(), 3);
}

#[tokio::test]
Expand All @@ -1110,7 +1119,8 @@ mod tests_stream {
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.read_timeout = Some(Duration::from_secs(1));
http_stream.read_request().await.unwrap();
let res = http_stream.read_body().await;
let res = http_stream.read_body_bytes().await;
assert_eq!(http_stream.body_bytes_read(), 0);
assert_eq!(res.unwrap_err().etype(), &ReadTimedout);
}

Expand All @@ -1122,10 +1132,10 @@ mod tests_stream {
let mock_io = Builder::new().read(&input1[..]).read(&input2[..]).build();
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.read_request().await.unwrap();
let res = http_stream.read_body().await.unwrap().unwrap();
assert_eq!(res, BufRef::new(0, 3));
let res = http_stream.read_body_bytes().await.unwrap().unwrap();
assert_eq!(res, b"abc".as_slice());
assert_eq!(http_stream.body_reader.body_state, ParseState::Complete(3));
assert_eq!(b"abc", http_stream.get_body(&res));
assert_eq!(http_stream.body_bytes_read(), 3);
}

#[tokio::test]
Expand All @@ -1143,13 +1153,14 @@ mod tests_stream {
.build();
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.read_request().await.unwrap();
let res = http_stream.read_body().await.unwrap().unwrap();
assert_eq!(res, BufRef::new(0, 1));
let res = http_stream.read_body_bytes().await.unwrap().unwrap();
assert_eq!(res, input3.as_slice());
assert_eq!(http_stream.body_reader.body_state, ParseState::HTTP1_0(1));
assert_eq!(input3, http_stream.get_body(&res));
let res = http_stream.read_body().await.unwrap();
assert_eq!(res, None);
assert_eq!(http_stream.body_bytes_read(), 1);
let res = http_stream.read_body_bytes().await.unwrap();
assert!(res.is_none());
assert_eq!(http_stream.body_reader.body_state, ParseState::Complete(1));
assert_eq!(http_stream.body_bytes_read(), 1);
}

#[tokio::test]
Expand All @@ -1167,16 +1178,15 @@ mod tests_stream {
.build();
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.read_request().await.unwrap();
let res = http_stream.read_body().await.unwrap().unwrap();
assert_eq!(res, BufRef::new(0, 1));
let res = http_stream.read_body_bytes().await.unwrap().unwrap();
assert_eq!(res, b"a".as_slice());
assert_eq!(http_stream.body_reader.body_state, ParseState::HTTP1_0(1));
assert_eq!(b"a", http_stream.get_body(&res));
let res = http_stream.read_body().await.unwrap().unwrap();
assert_eq!(res, BufRef::new(0, 1));
let res = http_stream.read_body_bytes().await.unwrap().unwrap();
assert_eq!(res, b"b".as_slice());
assert_eq!(http_stream.body_reader.body_state, ParseState::HTTP1_0(2));
assert_eq!(input3, http_stream.get_body(&res));
let res = http_stream.read_body().await.unwrap();
assert_eq!(res, None);
let res = http_stream.read_body_bytes().await.unwrap();
assert_eq!(http_stream.body_bytes_read(), 2);
assert!(res.is_none());
assert_eq!(http_stream.body_reader.body_state, ParseState::Complete(2));
}

Expand All @@ -1188,8 +1198,9 @@ mod tests_stream {
let mock_io = Builder::new().read(&input1[..]).read(&input2[..]).build();
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.read_request().await.unwrap();
let res = http_stream.read_body().await.unwrap();
assert_eq!(res, None);
let res = http_stream.read_body_bytes().await.unwrap();
assert!(res.is_none());
assert_eq!(http_stream.body_bytes_read(), 0);
assert_eq!(http_stream.body_reader.body_state, ParseState::Complete(0));
}

Expand All @@ -1207,8 +1218,9 @@ mod tests_stream {
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.read_request().await.unwrap();
assert!(http_stream.is_chunked_encoding());
let res = http_stream.read_body().await.unwrap();
assert_eq!(res, None);
let res = http_stream.read_body_bytes().await.unwrap();
assert!(res.is_none());
assert_eq!(http_stream.body_bytes_read(), 0);
assert_eq!(http_stream.body_reader.body_state, ParseState::Complete(0));
}

Expand All @@ -1226,14 +1238,15 @@ mod tests_stream {
let mut http_stream = HttpSession::new(Box::new(mock_io));
http_stream.read_request().await.unwrap();
assert!(http_stream.is_chunked_encoding());
let res = http_stream.read_body().await.unwrap().unwrap();
assert_eq!(res, BufRef::new(3, 1));
let res = http_stream.read_body_bytes().await.unwrap().unwrap();
assert_eq!(res, b"a".as_slice());
assert_eq!(
http_stream.body_reader.body_state,
ParseState::Chunked(1, 0, 0, 0)
);
let res = http_stream.read_body().await.unwrap();
assert_eq!(res, None);
let res = http_stream.read_body_bytes().await.unwrap();
assert!(res.is_none());
assert_eq!(http_stream.body_bytes_read(), 1);
assert_eq!(http_stream.body_reader.body_state, ParseState::Complete(1));
}

Expand Down
13 changes: 10 additions & 3 deletions pingora-core/src/protocols/http/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ pub struct HttpSession {
// Indicate that whether a END_STREAM is already sent
// in order to tell whether needs to send one extra FRAME when this response finishes
ended: bool,
// How many request body bytes have been read so far.
// How many (application, not wire) request body bytes have been read so far.
body_read: usize,
// How many response body bytes have been sent so far.
// How many (application, not wire) response body bytes have been sent so far.
body_sent: usize,
// buffered request body for retry logic
retry_buffer: Option<FixedBuffer>,
Expand Down Expand Up @@ -413,11 +413,16 @@ impl HttpSession {
}
}

/// How many response body bytes sent to the client
/// Return how many response body bytes (application, not wire) already sent downstream
pub fn body_bytes_sent(&self) -> usize {
self.body_sent
}

/// Return how many request body bytes (application, not wire) already read from downstream
pub fn body_bytes_read(&self) -> usize {
self.body_read
}

/// Return the [Digest] of the connection.
pub fn digest(&self) -> Option<&Digest> {
Some(&self.digest)
Expand Down Expand Up @@ -490,6 +495,7 @@ mod test {
let body = http.read_body_or_idle(false).await.unwrap().unwrap();
assert_eq!(body, client_body);
assert!(http.is_body_done());
assert_eq!(http.body_bytes_read(), 16);

let retry_body = http.get_retry_buffer().unwrap();
assert_eq!(retry_body, client_body);
Expand All @@ -511,6 +517,7 @@ mod test {

// end: false here to verify finish() closes the stream nicely
http.write_body(server_body.into(), false).unwrap();
assert_eq!(http.body_bytes_sent(), 16);

http.finish().unwrap();
});
Expand Down

0 comments on commit cfb86c3

Please sign in to comment.