Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync and release 0.2.0 #238

Merged
merged 12 commits into from
May 10, 2024
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0d6c0fbd55a4b0d8b5f2ae892a6602457dc61b7e
28022cfd206a7dfa1670a9884eb5b8bf621e8bfd
33 changes: 33 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,39 @@

All notable changes to this project will be documented in this file.

## [0.2.0](https://github.com/cloudflare/pingora/compare/0.1.1...0.2.0) - 2024-05-10

### 🚀 Features
- Add support for downstream h2 trailers and add an upstream h2 response trailer filter
- Add the ability to set TCP recv buf size
- Add a convenience function to retrieve Session digest
- Add `body_bytes_read()` method to Session
- Add `cache_not_modified_filter`
- Add `SSLKEYLOG` support for tls upstream
- Add `Service<HttpProxy<T>>` constructor for providing name
- Add `purge_response` callback
- Make `pop_closed` pub, to simplify DIY drains

### 🐛 Bug Fixes
- Fixed gRPC trailer proxying
- Fixed `response_body_filter` `end_of_stream` always being false
- Fixed compile error in Rust <= 1.73
- Fixed non linux build
- Fixed the counting problem of used_weight data field in `LruUnit<T>`
- Fixed `cargo run --example server` missing cert
- Fixed error log string interpolation outside of proper context
- Fixed tinylfu test flake

### ⚙️ Changes and Miscellaneous Tasks
- API change: `Server::run_forever` now takes ownership and ensures exit semantics
- API change: `cleanup()` method of `ServerApp` trait is now async
- Behavior change: Always return `HttpTask::Body` on body done instead of `HttpTask::done`
- Behavior change: HTTP/1 reason phrase is now parsed and proxied
- Updated `h2` dependency for RUSTSEC-2024-0332
- Updated zstd dependencies
- Code optimization and refactor in a few crates
- More examples and docs

## [0.1.1](https://github.com/cloudflare/pingora/compare/0.1.0...0.1.1) - 2024-04-05

### 🚀 Features
Expand Down
6 changes: 3 additions & 3 deletions docs/user_guide/phase.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ If the error is not retry-able, the request will end.
### `upstream_request_filter()`
This phase is to modify requests before sending to upstream.

### `upstream_response_filter()/upstream_response_body_filter()`
This phase is triggered after an upstream response header/body is received.
### `upstream_response_filter()/upstream_response_body_filter()/upstream_response_trailer_filter()`
This phase is triggered after an upstream response header/body/trailer is received.

This phase is to modify or process response headers (or body) before sending to downstream. Note that this phase is called _prior_ to HTTP caching and therefore any changes made here will affect the response stored in the HTTP cache.
This phase is to modify or process response headers, body, or trailers before sending to downstream. Note that this phase is called _prior_ to HTTP caching and therefore any changes made here will affect the response stored in the HTTP cache.

### `response_filter()/response_body_filter()/response_trailer_filter()`
This phase is triggered after a response header/body/trailer is ready to send to downstream.
Expand Down
2 changes: 1 addition & 1 deletion pingora-boringssl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pingora-boringssl"
version = "0.1.0"
version = "0.2.0"
authors = ["Yuchen Wu <yuchen@cloudflare.com>"]
license = "Apache-2.0"
edition = "2021"
Expand Down
14 changes: 7 additions & 7 deletions pingora-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pingora-cache"
version = "0.1.0"
version = "0.2.0"
authors = ["Yuchen Wu <yuchen@cloudflare.com>"]
license = "Apache-2.0"
edition = "2021"
Expand All @@ -17,12 +17,12 @@ name = "pingora_cache"
path = "src/lib.rs"

[dependencies]
pingora-core = { version = "0.1.0", path = "../pingora-core", default-features = false }
pingora-error = { version = "0.1.0", path = "../pingora-error" }
pingora-header-serde = { version = "0.1.0", path = "../pingora-header-serde" }
pingora-http = { version = "0.1.0", path = "../pingora-http" }
pingora-lru = { version = "0.1.0", path = "../pingora-lru" }
pingora-timeout = { version = "0.1.0", path = "../pingora-timeout" }
pingora-core = { version = "0.2.0", path = "../pingora-core", default-features = false }
pingora-error = { version = "0.2.0", path = "../pingora-error" }
pingora-header-serde = { version = "0.2.0", path = "../pingora-header-serde" }
pingora-http = { version = "0.2.0", path = "../pingora-http" }
pingora-lru = { version = "0.2.0", path = "../pingora-lru" }
pingora-timeout = { version = "0.2.0", path = "../pingora-timeout" }
http = { workspace = true }
indexmap = "1"
once_cell = { workspace = true }
Expand Down
16 changes: 8 additions & 8 deletions pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pingora-core"
version = "0.1.0"
version = "0.2.0"
authors = ["Yuchen Wu <yuchen@cloudflare.com>"]
license = "Apache-2.0"
edition = "2021"
Expand All @@ -19,13 +19,13 @@ name = "pingora_core"
path = "src/lib.rs"

[dependencies]
pingora-runtime = { version = "0.1.0", path = "../pingora-runtime" }
pingora-openssl = { version = "0.1.0", path = "../pingora-openssl", optional = true }
pingora-boringssl = { version = "0.1.0", path = "../pingora-boringssl", optional = true }
pingora-pool = { version = "0.1.0", path = "../pingora-pool" }
pingora-error = { version = "0.1.0", path = "../pingora-error" }
pingora-timeout = { version = "0.1.0", path = "../pingora-timeout" }
pingora-http = { version = "0.1.0", path = "../pingora-http" }
pingora-runtime = { version = "0.2.0", path = "../pingora-runtime" }
pingora-openssl = { version = "0.2.0", path = "../pingora-openssl", optional = true }
pingora-boringssl = { version = "0.2.0", path = "../pingora-boringssl", optional = true }
pingora-pool = { version = "0.2.0", path = "../pingora-pool" }
pingora-error = { version = "0.2.0", path = "../pingora-error" }
pingora-timeout = { version = "0.2.0", path = "../pingora-timeout" }
pingora-http = { version = "0.2.0", path = "../pingora-http" }
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
futures = "0.3"
async-trait = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions pingora-core/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub trait ServerApp {
) -> Option<Stream>;

/// This callback will be called once after the service stops listening to its endpoints.
fn cleanup(&self) {}
async fn cleanup(&self) {}
}

/// This trait defines the interface of an HTTP application.
Expand All @@ -77,7 +77,7 @@ pub trait HttpServerApp {
None
}

fn http_cleanup(&self) {}
async fn http_cleanup(&self) {}
}

#[cfg_attr(not(doc_async_trait), async_trait)]
Expand Down Expand Up @@ -141,7 +141,7 @@ where
}
}

fn cleanup(&self) {
self.http_cleanup()
async fn cleanup(&self) {
self.http_cleanup().await;
}
}
42 changes: 30 additions & 12 deletions pingora-core/src/protocols/http/v1/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ impl HttpSession {
_ => Version::HTTP_09,
});

response_header.set_reason_phrase(resp.reason)?;

let buf = buf.freeze();

for header in header_refs {
Expand Down Expand Up @@ -596,27 +598,26 @@ impl HttpSession {
if self.should_read_resp_header() {
let resp_header = self.read_resp_header_parts().await?;
let end_of_body = self.is_body_done();
debug!("Response header: {:?}", resp_header);
debug!("Response header: {resp_header:?}");
trace!(
"Raw Response header: {:?}",
str::from_utf8(self.get_headers_raw()).unwrap()
);
Ok(HttpTask::Header(resp_header, end_of_body))
} else if self.is_body_done() {
// no body
debug!("Response is done");
Ok(HttpTask::Done)
} else {
/* need to read body */
let data = self.read_body_bytes().await?;
let body = self.read_body_bytes().await?;
let end_of_body = self.is_body_done();
if let Some(body) = data {
debug!("Response body: {} bytes", body.len());
trace!("Response body: {:?}", body);
Ok(HttpTask::Body(Some(body), end_of_body))
} else {
debug!("Response is done");
Ok(HttpTask::Done)
}
debug!(
"Response body: {} bytes, end: {end_of_body}",
body.as_ref().map_or(0, |b| b.len())
);
trace!("Response body: {body:?}");
Ok(HttpTask::Body(body, end_of_body))
}
// TODO: support h1 trailer
}
Expand Down Expand Up @@ -720,6 +721,20 @@ mod tests_stream {
assert_eq!(0, http_stream.resp_header().unwrap().headers.len());
}

#[tokio::test]
async fn read_response_custom_reason() {
init_log();
let input = b"HTTP/1.1 200 Just Fine\r\n\r\n";
let mock_io = Builder::new().read(&input[..]).build();
let mut http_stream = HttpSession::new(Box::new(mock_io));
let res = http_stream.read_response().await;
assert_eq!(input.len(), res.unwrap());
assert_eq!(
http_stream.resp_header().unwrap().get_reason_phrase(),
Some("Just Fine")
);
}

#[tokio::test]
async fn read_response_default() {
init_log();
Expand Down Expand Up @@ -967,9 +982,12 @@ mod tests_stream {
// read body
let task = http_stream.read_response_task().await.unwrap();
match task {
HttpTask::Done => {}
HttpTask::Body(b, eob) => {
assert!(b.is_none());
assert!(eob);
}
_ => {
panic!("task should be Done")
panic!("task should be body with end of stream")
}
}
}
Expand Down
50 changes: 33 additions & 17 deletions pingora-core/src/protocols/http/v1/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,29 +854,31 @@ impl HttpSession {
}

async fn response_duplex(&mut self, task: HttpTask) -> Result<bool> {
match task {
let end_stream = match task {
HttpTask::Header(header, end_stream) => {
self.write_response_header(header)
.await
.map_err(|e| e.into_down())?;
Ok(end_stream)
end_stream
}
HttpTask::Body(data, end_stream) => match data {
Some(d) => {
if !d.is_empty() {
self.write_body(&d).await.map_err(|e| e.into_down())?;
}
Ok(end_stream)
end_stream
}
None => Ok(end_stream),
None => end_stream,
},
HttpTask::Trailer(_) => Ok(true), // h1 trailer is not supported yet
HttpTask::Done => {
self.finish_body().await.map_err(|e| e.into_down())?;
Ok(true)
}
HttpTask::Failed(e) => Err(e),
HttpTask::Trailer(_) => true, // h1 trailer is not supported yet
HttpTask::Done => true,
HttpTask::Failed(e) => return Err(e),
};
if end_stream {
// no-op if body wasn't initialized or is finished already
self.finish_body().await.map_err(|e| e.into_down())?;
}
Ok(end_stream)
}

// TODO: use vectored write to avoid copying
Expand Down Expand Up @@ -905,12 +907,7 @@ impl HttpSession {
None => end_stream,
},
HttpTask::Trailer(_) => true, // h1 trailer is not supported yet
HttpTask::Done => {
// flush body first
self.write_body_buf().await.map_err(|e| e.into_down())?;
self.finish_body().await.map_err(|e| e.into_down())?;
return Ok(true);
}
HttpTask::Done => true,
HttpTask::Failed(e) => {
// flush the data we have and quit
self.write_body_buf().await.map_err(|e| e.into_down())?;
Expand All @@ -923,6 +920,10 @@ impl HttpSession {
}
}
self.write_body_buf().await.map_err(|e| e.into_down())?;
if end_stream {
// no-op if body wasn't initialized or is finished already
self.finish_body().await.map_err(|e| e.into_down())?;
}
Ok(end_stream)
}
}
Expand Down Expand Up @@ -1013,7 +1014,7 @@ fn http_resp_header_to_buf(
let status = resp.status;
buf.put_slice(status.as_str().as_bytes());
buf.put_u8(b' ');
let reason = status.canonical_reason();
let reason = resp.get_reason_phrase();
if let Some(reason_buf) = reason {
buf.put_slice(reason_buf.as_bytes());
}
Expand Down Expand Up @@ -1381,6 +1382,21 @@ mod tests_stream {
.unwrap();
}

#[tokio::test]
async fn write_custom_reason() {
let wire = b"HTTP/1.1 200 Just Fine\r\nFoo: Bar\r\n\r\n";
let mock_io = Builder::new().write(wire).build();
let mut http_stream = HttpSession::new(Box::new(mock_io));
let mut new_response = ResponseHeader::build(StatusCode::OK, None).unwrap();
new_response.set_reason_phrase(Some("Just Fine")).unwrap();
new_response.append_header("Foo", "Bar").unwrap();
http_stream.update_resp_headers = false;
http_stream
.write_response_header_ref(&new_response)
.await
.unwrap();
}

#[tokio::test]
async fn write_informational() {
let wire = b"HTTP/1.1 100 Continue\r\n\r\nHTTP/1.1 200 OK\r\nFoo: Bar\r\n\r\n";
Expand Down
9 changes: 5 additions & 4 deletions pingora-core/src/protocols/http/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,16 @@ impl HttpSession {
true
}
HttpTask::Trailer(None) => true,
HttpTask::Done => {
self.finish().map_err(|e| e.into_down())?;
return Ok(true);
}
HttpTask::Done => true,
HttpTask::Failed(e) => {
return Err(e);
}
} || end_stream // safe guard in case `end` in tasks flips from true to false
}
if end_stream {
// no-op if finished already
self.finish().map_err(|e| e.into_down())?;
}
Ok(end_stream)
}

Expand Down
19 changes: 4 additions & 15 deletions pingora-core/src/server/transfer_fd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,13 @@ impl Fds {
}

pub fn serialize(&self) -> (Vec<String>, Vec<RawFd>) {
let serialized: Vec<(String, RawFd)> = self
.map
.iter()
.map(|(key, value)| (key.clone(), *value))
.collect();

(
serialized.iter().map(|v| v.0.clone()).collect(),
serialized.iter().map(|v| v.1).collect(),
)
// Surely there is a better way of doing this
self.map.iter().map(|(key, val)| (key.clone(), val)).unzip()
}

pub fn deserialize(&mut self, binds: Vec<String>, fds: Vec<RawFd>) {
assert!(binds.len() == fds.len());
// TODO: use zip()
for i in 0..binds.len() {
self.map.insert(binds[i].clone(), fds[i]);
assert_eq!(binds.len(), fds.len());
for (bind, fd) in binds.into_iter().zip(fds) {
self.map.insert(bind, fd);
}
}

Expand Down
2 changes: 1 addition & 1 deletion pingora-core/src/services/listening.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl<A: ServerApp + Send + Sync + 'static> ServiceTrait for Service<A> {

futures::future::join_all(handlers).await;
self.listeners.cleanup();
self.app_logic.cleanup();
self.app_logic.cleanup().await;
}

fn name(&self) -> &str {
Expand Down
2 changes: 1 addition & 1 deletion pingora-error/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pingora-error"
version = "0.1.0"
version = "0.2.0"
authors = ["Yuchen Wu <yuchen@cloudflare.com>"]
license = "Apache-2.0"
edition = "2021"
Expand Down