Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions crates/openshell-sandbox/src/l7/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ use miette::Result;
use std::future::Future;
use tokio::io::{AsyncRead, AsyncWrite};

/// Outcome of relaying a single HTTP request/response pair.
#[derive(Debug)]
pub enum RelayOutcome {
/// Connection is reusable for further HTTP requests (keep-alive).
Reusable,
/// Connection was consumed (e.g. read-until-EOF or `Connection: close`).
Consumed,
/// Server responded with 101 Switching Protocols.
/// The connection has been upgraded (e.g. to WebSocket) and must be
/// relayed as raw bidirectional TCP from this point forward.
/// Contains any overflow bytes read from upstream after the 101 headers
/// that must be forwarded to the client before switching to copy mode.
Upgraded { overflow: Vec<u8> },
}

/// Body framing for HTTP requests/responses.
#[derive(Debug, Clone, Copy)]
pub enum BodyLength {
Expand Down Expand Up @@ -54,14 +69,15 @@ pub trait L7Provider: Send + Sync {

/// Forward an allowed request to upstream and relay the response back.
///
/// Returns `true` if the upstream connection is reusable (keep-alive),
/// `false` if it was consumed (e.g. read-until-EOF or `Connection: close`).
/// Returns a [`RelayOutcome`] indicating whether the connection is
/// reusable (keep-alive), consumed, or has been upgraded (101 Switching
/// Protocols) and must be relayed as raw bidirectional TCP.
fn relay<C, U>(
&self,
req: &L7Request,
client: &mut C,
upstream: &mut U,
) -> impl Future<Output = Result<bool>> + Send
) -> impl Future<Output = Result<RelayOutcome>> + Send
where
C: AsyncRead + AsyncWrite + Unpin + Send,
U: AsyncRead + AsyncWrite + Unpin + Send;
Expand Down
65 changes: 52 additions & 13 deletions crates/openshell-sandbox/src/l7/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
//! Parses each request within the tunnel, evaluates it against OPA policy,
//! and either forwards or denies the request.

use crate::l7::provider::L7Provider;
use crate::l7::provider::{L7Provider, RelayOutcome};
use crate::l7::{EnforcementMode, L7EndpointConfig, L7Protocol, L7RequestInfo};
use crate::secrets::SecretResolver;
use miette::{IntoDiagnostic, Result, miette};
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing::{debug, info, warn};

/// Context for L7 request policy evaluation.
Expand Down Expand Up @@ -134,20 +134,42 @@ where

if allowed || config.enforcement == EnforcementMode::Audit {
// Forward request to upstream and relay response
let reusable = crate::l7::rest::relay_http_request_with_resolver(
let outcome = crate::l7::rest::relay_http_request_with_resolver(
&req,
client,
upstream,
ctx.secret_resolver.as_deref(),
)
.await?;
if !reusable {
debug!(
host = %ctx.host,
port = ctx.port,
"Upstream connection not reusable, closing L7 relay"
);
return Ok(());
match outcome {
RelayOutcome::Reusable => {} // continue loop
RelayOutcome::Consumed => {
debug!(
host = %ctx.host,
port = ctx.port,
"Upstream connection not reusable, closing L7 relay"
);
return Ok(());
}
RelayOutcome::Upgraded { overflow } => {
info!(
host = %ctx.host,
port = ctx.port,
overflow_bytes = overflow.len(),
"101 Switching Protocols — switching to raw bidirectional relay"
);
// Forward any overflow bytes from the upgrade response
if !overflow.is_empty() {
client.write_all(&overflow).await.into_diagnostic()?;
client.flush().await.into_diagnostic()?;
}
// Switch to raw bidirectional TCP copy for the upgraded
// protocol (WebSocket, HTTP/2, etc.)
tokio::io::copy_bidirectional(client, upstream)
.await
.into_diagnostic()?;
return Ok(());
}
}
} else {
// Enforce mode: deny with 403 and close connection
Expand Down Expand Up @@ -278,12 +300,29 @@ where
// Forward request with credential rewriting and relay the response.
// relay_http_request_with_resolver handles both directions: it sends
// the request upstream and reads the response back to the client.
let reusable =
let outcome =
crate::l7::rest::relay_http_request_with_resolver(&req, client, upstream, resolver)
.await?;

if !reusable {
break;
match outcome {
RelayOutcome::Reusable => {} // continue loop
RelayOutcome::Consumed => break,
RelayOutcome::Upgraded { overflow } => {
info!(
host = %ctx.host,
port = ctx.port,
overflow_bytes = overflow.len(),
"101 Switching Protocols — switching to raw bidirectional relay"
);
if !overflow.is_empty() {
client.write_all(&overflow).await.into_diagnostic()?;
client.flush().await.into_diagnostic()?;
}
tokio::io::copy_bidirectional(client, upstream)
.await
.into_diagnostic()?;
return Ok(());
}
}
}

Expand Down
Loading