Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,24 @@ The repository should be cloned under `/root` so the provided `setup-*.sh` scrip
chain per port)
- service names are unique within a stack; dependency chains stay intra-stack. Service names may
be reused across different stacks
- `protocol` selects how a proxy-reachable service is exposed: `http` (the default — routed by
`Host` header on the shared 80/443 listeners) or `tcp`/`udp`, which each require `listen_port` —
the external port nullnet-proxy binds directly and forwards raw traffic from. `listen_port` must
be globally unique per protocol across every stack (the server refuses to start, or rejects a
hot-reload, if two services claim the same `protocol`/`listen_port` pair):
```
[[services]]
name = "redis.internal"
timeout = 0
protocol = "tcp"
listen_port = 6379

[[services]]
name = "dns.internal"
timeout = 0
protocol = "udp"
listen_port = 53
```

- run the project as a daemon (from the repo root)
```
Expand Down Expand Up @@ -109,6 +127,10 @@ The repository should be cloned under `/root` so the provided `setup-*.sh` scrip

- the proxy listens on port 80 (requests in the form `service_name:80`) and, for hosts that have a
TLS certificate, on port 443 — HTTP requests to those hosts get a 301 redirect to HTTPS
- for services declared with `protocol = "tcp"` or `"udp"` in the server's stack config, the proxy
also opens a raw listener on each `listen_port` and forwards traffic to the matching service —
no `Host` header involved. This table is pushed live by the server, so listeners open and close
as `services/<stack>.toml` changes, without a proxy restart

***

Expand Down
42 changes: 41 additions & 1 deletion members/nullnet-grpc-lib/proto/nullnet_grpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ service NullnetGrpc {
// subscribe (proxy startup load) and again whenever it changes, so every proxy
// hot-reloads without a restart.
rpc WatchCertificates(Empty) returns (stream CertBundle);

// Port mapping distribution APIs ---------------------------------------------------------------------------------

// Long-lived stream: the server pushes the full TCP/UDP port→service mapping
// table immediately on subscribe and again whenever services.toml changes,
// so the proxy opens/closes raw TCP/UDP listeners without a restart.
rpc WatchPortMappings(Empty) returns (stream PortMappingBundle);
}

// TAP-based clients ---------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -157,6 +164,31 @@ message Upstream {
uint32 port = 2;
}

// Protocol a proxy-reachable service is exposed over. HTTP is routed by Host
// header on the shared 80/443 listeners; TCP/UDP each get a dedicated
// listen_port the proxy binds directly.
enum ServiceProtocol {
HTTP = 0;
TCP = 1;
UDP = 2;
}

// One entry in the live port→service table. Only services with a non-HTTP
// protocol need an entry — HTTP stays on the existing Host-header routing.
message PortMapping {
string service_name = 1;
ServiceProtocol protocol = 2;
uint32 listen_port = 3;
// Mirrors the service's configured per-client idle timeout (0 disables
// it), so the proxy can expire UDP sessions without a second timeout
// mechanism.
uint64 idle_timeout_secs = 4;
}

message PortMappingBundle {
repeated PortMapping mappings = 1;
}

// Backend-triggered chains --------------------------------------------------------------------------------------------

message BackendTriggerRequest {
Expand Down Expand Up @@ -226,6 +258,10 @@ message AgentEvent {
AgentUpstreamIpParseFailed upstream_ip_parse_failed = 20;
AgentProxyClientNotInet proxy_client_not_inet = 21;
AgentTlsCertificateInvalid tls_certificate_invalid = 23;
AgentTcpListenerBindFailed tcp_listener_bind_failed = 26;
AgentUdpListenerBindFailed udp_listener_bind_failed = 27;
AgentTcpUpstreamConnectFailed tcp_upstream_connect_failed = 28;
AgentUdpUpstreamConnectFailed udp_upstream_connect_failed = 29;
// Proxy info events
AgentProxyRequestRouted proxy_request_routed = 22;
}
Expand Down Expand Up @@ -255,4 +291,8 @@ message AgentProxyRequestInvalidHost { string client_ip = 1; }
message AgentUpstreamIpParseFailed { string raw_ip = 1; string service_name = 2; }
message AgentProxyClientNotInet { string address_family = 1; }
message AgentTlsCertificateInvalid { string domain = 1; string reason = 2; }
message AgentProxyRequestRouted { string service_name = 1; string client_ip = 2; string upstream_ip = 3; uint64 latency_ms = 4; }
message AgentProxyRequestRouted { string service_name = 1; string client_ip = 2; string upstream_ip = 3; uint64 latency_ms = 4; }
message AgentTcpListenerBindFailed { uint32 listen_port = 1; string service_name = 2; string error_message = 3; }
message AgentUdpListenerBindFailed { uint32 listen_port = 1; string service_name = 2; string error_message = 3; }
message AgentTcpUpstreamConnectFailed { string service_name = 1; string client_ip = 2; string error_message = 3; }
message AgentUdpUpstreamConnectFailed { string service_name = 1; string client_ip = 2; string error_message = 3; }
18 changes: 16 additions & 2 deletions members/nullnet-grpc-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ mod proto;

use crate::nullnet_grpc::nullnet_grpc_client::NullnetGrpcClient;
use crate::nullnet_grpc::{
AgentEvent, BackendTriggerRequest, CertBundle, Empty, MsgId, NetMessage, NetType, ProxyRequest,
Services, ServicesListResponse, Upstream,
AgentEvent, BackendTriggerRequest, CertBundle, Empty, MsgId, NetMessage, NetType,
PortMappingBundle, ProxyRequest, Services, ServicesListResponse, Upstream,
};
pub use proto::*;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -140,4 +140,18 @@ impl NullnetGrpcInterface {
.map_err(|e| e.to_string())?
.into_inner())
}

/// Subscribe to port-mapping changes: the returned stream yields the full
/// TCP/UDP port→service table immediately on subscribe and again whenever
/// it changes.
#[allow(clippy::missing_errors_doc)]
pub async fn watch_port_mappings(&self) -> Result<Streaming<PortMappingBundle>, String> {
Ok(self
.client
.clone()
.watch_port_mappings(Request::new(Empty {}))
.await
.map_err(|e| e.to_string())?
.into_inner())
}
}
191 changes: 190 additions & 1 deletion members/nullnet-grpc-lib/src/proto/nullnet_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,27 @@ pub struct Upstream {
#[prost(uint32, tag = "2")]
pub port: u32,
}
/// One entry in the live port→service table. Only services with a non-HTTP
/// protocol need an entry — HTTP stays on the existing Host-header routing.
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct PortMapping {
#[prost(string, tag = "1")]
pub service_name: ::prost::alloc::string::String,
#[prost(enumeration = "ServiceProtocol", tag = "2")]
pub protocol: i32,
#[prost(uint32, tag = "3")]
pub listen_port: u32,
/// Mirrors the service's configured per-client idle timeout (0 disables
/// it), so the proxy can expire UDP sessions without a second timeout
/// mechanism.
#[prost(uint64, tag = "4")]
pub idle_timeout_secs: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PortMappingBundle {
#[prost(message, repeated, tag = "1")]
pub mappings: ::prost::alloc::vec::Vec<PortMapping>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct BackendTriggerRequest {
#[prost(string, tag = "1")]
Expand Down Expand Up @@ -212,7 +233,7 @@ pub struct Empty {}
pub struct AgentEvent {
#[prost(
oneof = "agent_event::Event",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 25, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 22"
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 25, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 26, 27, 28, 29, 22"
)]
pub event: ::core::option::Option<agent_event::Event>,
}
Expand Down Expand Up @@ -271,6 +292,14 @@ pub mod agent_event {
ProxyClientNotInet(super::AgentProxyClientNotInet),
#[prost(message, tag = "23")]
TlsCertificateInvalid(super::AgentTlsCertificateInvalid),
#[prost(message, tag = "26")]
TcpListenerBindFailed(super::AgentTcpListenerBindFailed),
#[prost(message, tag = "27")]
UdpListenerBindFailed(super::AgentUdpListenerBindFailed),
#[prost(message, tag = "28")]
TcpUpstreamConnectFailed(super::AgentTcpUpstreamConnectFailed),
#[prost(message, tag = "29")]
UdpUpstreamConnectFailed(super::AgentUdpUpstreamConnectFailed),
/// Proxy info events
#[prost(message, tag = "22")]
ProxyRequestRouted(super::AgentProxyRequestRouted),
Expand Down Expand Up @@ -447,6 +476,42 @@ pub struct AgentProxyRequestRouted {
#[prost(uint64, tag = "4")]
pub latency_ms: u64,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct AgentTcpListenerBindFailed {
#[prost(uint32, tag = "1")]
pub listen_port: u32,
#[prost(string, tag = "2")]
pub service_name: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub error_message: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct AgentUdpListenerBindFailed {
#[prost(uint32, tag = "1")]
pub listen_port: u32,
#[prost(string, tag = "2")]
pub service_name: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub error_message: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct AgentTcpUpstreamConnectFailed {
#[prost(string, tag = "1")]
pub service_name: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub client_ip: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub error_message: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct AgentUdpUpstreamConnectFailed {
#[prost(string, tag = "1")]
pub service_name: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub client_ip: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub error_message: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Net {
Expand All @@ -473,6 +538,38 @@ impl Net {
}
}
}
/// Protocol a proxy-reachable service is exposed over. HTTP is routed by Host
/// header on the shared 80/443 listeners; TCP/UDP each get a dedicated
/// listen_port the proxy binds directly.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum ServiceProtocol {
Http = 0,
Tcp = 1,
Udp = 2,
}
impl ServiceProtocol {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Self::Http => "HTTP",
Self::Tcp => "TCP",
Self::Udp => "UDP",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"HTTP" => Some(Self::Http),
"TCP" => Some(Self::Tcp),
"UDP" => Some(Self::Udp),
_ => None,
}
}
}
/// Generated client implementations.
pub mod nullnet_grpc_client {
#![allow(
Expand Down Expand Up @@ -732,6 +829,35 @@ pub mod nullnet_grpc_client {
);
self.inner.server_streaming(req, path, codec).await
}
/// Long-lived stream: the server pushes the full TCP/UDP port→service mapping
/// table immediately on subscribe and again whenever services.toml changes,
/// so the proxy opens/closes raw TCP/UDP listeners without a restart.
pub async fn watch_port_mappings(
&mut self,
request: impl tonic::IntoRequest<super::Empty>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::PortMappingBundle>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic_prost::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nullnet_grpc.NullnetGrpc/WatchPortMappings",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("nullnet_grpc.NullnetGrpc", "WatchPortMappings"),
);
self.inner.server_streaming(req, path, codec).await
}
}
}
/// Generated server implementations.
Expand Down Expand Up @@ -806,6 +932,22 @@ pub mod nullnet_grpc_server {
tonic::Response<Self::WatchCertificatesStream>,
tonic::Status,
>;
/// Server streaming response type for the WatchPortMappings method.
type WatchPortMappingsStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::PortMappingBundle, tonic::Status>,
>
+ std::marker::Send
+ 'static;
/// Long-lived stream: the server pushes the full TCP/UDP port→service mapping
/// table immediately on subscribe and again whenever services.toml changes,
/// so the proxy opens/closes raw TCP/UDP listeners without a restart.
async fn watch_port_mappings(
&self,
request: tonic::Request<super::Empty>,
) -> std::result::Result<
tonic::Response<Self::WatchPortMappingsStream>,
tonic::Status,
>;
}
#[derive(Debug)]
pub struct NullnetGrpcServer<T> {
Expand Down Expand Up @@ -1191,6 +1333,53 @@ pub mod nullnet_grpc_server {
};
Box::pin(fut)
}
"/nullnet_grpc.NullnetGrpc/WatchPortMappings" => {
#[allow(non_camel_case_types)]
struct WatchPortMappingsSvc<T: NullnetGrpc>(pub Arc<T>);
impl<
T: NullnetGrpc,
> tonic::server::ServerStreamingService<super::Empty>
for WatchPortMappingsSvc<T> {
type Response = super::PortMappingBundle;
type ResponseStream = T::WatchPortMappingsStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::Empty>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NullnetGrpc>::watch_port_mappings(&inner, request)
.await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = WatchPortMappingsSvc(inner);
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
let mut response = http::Response::new(
Expand Down
2 changes: 1 addition & 1 deletion members/nullnet-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ openssl = "0.10"
arc-swap = "1"
nullnet-grpc-lib.workspace = true
nullnet-liberror.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "time", "sync", "io-util", "macros"] }
ctrlc = { version = "3.5.2", features = ["termination"] }
gag.workspace = true
chrono.workspace = true
Loading