Skip to content

Commit

Permalink
Introduce HttpModules to pingora-proxy.
Browse files Browse the repository at this point in the history
This feature allows to plug in 3rd party extensions with minimal code
required.

The downstream compression logic is now a module.

Add: a new early_request_filter API to fine tune the behavior of the modules.
Add: a few APIs to write the response with these modules enabled. This
should address issues like compression is not applied to custom
responses.
  • Loading branch information
eaufavor authored and johnhurt committed Jun 7, 2024
1 parent 53e696d commit 11863d2
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
a06136947618424b0ffa3b5c1a280db6a4b577dc
e59fa832b5e4be70f084398d28af33a84bece61a
2 changes: 1 addition & 1 deletion pingora-core/src/apps/http_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
}
if !body.is_empty() {
// TODO: check if chunked encoding is needed
match http.write_response_body(body.into()).await {
match http.write_response_body(body.into(), true).await {
Ok(_) => debug!("HTTP response written."),
Err(e) => error!(
"HTTP server fails to write to downstream: {e}, {}",
Expand Down
19 changes: 18 additions & 1 deletion pingora-core/src/modules/http/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,25 @@

use super::*;
use crate::protocols::http::compression::ResponseCompressionCtx;
use std::ops::{Deref, DerefMut};

/// HTTP response compression module
pub struct ResponseCompression(ResponseCompressionCtx);

impl Deref for ResponseCompression {
type Target = ResponseCompressionCtx;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for ResponseCompression {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

#[async_trait]
impl HttpModule for ResponseCompression {
fn as_any(&self) -> &dyn std::any::Any {
Expand Down Expand Up @@ -52,7 +67,9 @@ impl HttpModule for ResponseCompression {
return Ok(());
}
let compressed = self.0.response_body_filter(body.as_ref(), end_of_stream);
*body = compressed;
if compressed.is_some() {
*body = compressed;
}
Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions pingora-core/src/protocols/http/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ impl ResponseCompressionCtx {
}

/// Stream the response body chunks into this ctx. The return value will be the compressed data
///
/// Return None if the compressed is not enabled
pub fn response_body_filter(&mut self, data: Option<&Bytes>, end: bool) -> Option<Bytes> {
match &mut self.0 {
CtxInner::HeaderPhase {
Expand Down
22 changes: 16 additions & 6 deletions pingora-core/src/protocols/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,19 @@ impl Session {
}

/// Write the response body to client
pub async fn write_response_body(&mut self, data: Bytes) -> Result<()> {
pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
if data.is_empty() && !end {
// writing 0 byte to a chunked encoding h1 would finish the stream
// writing 0 bytes to h2 is noop
// we don't want to actually write in either cases
return Ok(());
}
match self {
Self::H1(s) => {
s.write_body(&data).await?;
Ok(())
}
Self::H2(s) => s.write_body(data, false),
Self::H2(s) => s.write_body(data, end),
}
}

Expand Down Expand Up @@ -236,14 +242,18 @@ impl Session {
}
}

/// Send error response to client
pub async fn respond_error(&mut self, error: u16) {
let resp = match error {
pub fn generate_error(error: u16) -> ResponseHeader {
match error {
/* common error responses are pre-generated */
502 => error_resp::HTTP_502_RESPONSE.clone(),
400 => error_resp::HTTP_400_RESPONSE.clone(),
_ => error_resp::gen_error_response(error),
};
}
}

/// Send error response to client
pub async fn respond_error(&mut self, error: u16) {
let resp = Self::generate_error(error);

// TODO: we shouldn't be closing downstream connections on internally generated errors
// and possibly other upstream connect() errors (connection refused, timeout, etc)
Expand Down
Loading

0 comments on commit 11863d2

Please sign in to comment.