Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions apps/freenet-ping/app/tests/run_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ async fn test_node_diagnostics_query() -> TestResult {
let config = config_gw.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -248,7 +248,7 @@ async fn test_node_diagnostics_query() -> TestResult {
let config = config_node.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down Expand Up @@ -592,7 +592,7 @@ async fn test_ping_multi_node() -> TestResult {
let config = config_gw.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -603,7 +603,7 @@ async fn test_ping_multi_node() -> TestResult {
let config = config_node1.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -614,7 +614,7 @@ async fn test_ping_multi_node() -> TestResult {
let config = config_node2.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down Expand Up @@ -1187,7 +1187,7 @@ async fn test_ping_application_loop() -> TestResult {
let config = config_gw.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -1198,7 +1198,7 @@ async fn test_ping_application_loop() -> TestResult {
let config = config_node1.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -1209,7 +1209,7 @@ async fn test_ping_application_loop() -> TestResult {
let config = config_node2.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down Expand Up @@ -1678,7 +1678,7 @@ async fn test_ping_partially_connected_network() -> TestResult {
let config = config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -1693,7 +1693,7 @@ async fn test_ping_partially_connected_network() -> TestResult {
let config = config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down
6 changes: 3 additions & 3 deletions apps/freenet-ping/app/tests/run_app_blocked_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
let config = config_gw.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -191,7 +191,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
let config = config_node1.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -201,7 +201,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
let config = config_node2.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async fn test_ping_partially_connected_network() -> TestResult {
let config = config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -218,7 +218,7 @@ async fn test_ping_partially_connected_network() -> TestResult {
let config = config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down
4 changes: 2 additions & 2 deletions apps/freenet-ping/app/tests/test_50_node_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async fn setup_50_node_network() -> TestResult<(Vec<WebApi>, Vec<WebApi>, Contra
let config = config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down Expand Up @@ -192,7 +192,7 @@ async fn setup_50_node_network() -> TestResult<(Vec<WebApi>, Vec<WebApi>, Contra
let config = config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down
4 changes: 2 additions & 2 deletions apps/freenet-ping/app/tests/test_connection_timing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn test_connection_timing() -> TestResult {
let config = config_gw.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -76,7 +76,7 @@ async fn test_connection_timing() -> TestResult {
let config = config_node1.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down
6 changes: 3 additions & 3 deletions apps/freenet-ping/app/tests/test_small_network_get_issue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn test_small_network_get_failure() -> TestResult {
node_config.min_number_of_connections(2);
node_config.max_number_of_connections(10);
let node = node_config
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -103,7 +103,7 @@ async fn test_small_network_get_failure() -> TestResult {
node_config.min_number_of_connections(2);
node_config.max_number_of_connections(10);
let node = node_config
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -116,7 +116,7 @@ async fn test_small_network_get_failure() -> TestResult {
node_config.min_number_of_connections(2);
node_config.max_number_of_connections(10);
let node = node_config
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/bin/freenet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ async fn run_local(config: Config) -> anyhow::Result<()> {
async fn run_network(config: Config) -> anyhow::Result<()> {
tracing::info!("Starting freenet node in network mode");

let clients = serve_gateway(config.ws_api).await;
let clients = serve_gateway(config.ws_api)
.await
.with_context(|| "failed to start HTTP/WebSocket gateway")?;
tracing::info!("Initializing node configuration");

let node_config = NodeConfig::new(config)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ pub async fn run_local_node(
_ => {}
}

let (mut gw, mut ws_proxy) = crate::server::serve_gateway_in(socket).await;
let (mut gw, mut ws_proxy) = crate::server::serve_gateway_in(socket).await?;

// TODO: use combinator instead
// let mut all_clients =
Expand Down
40 changes: 28 additions & 12 deletions crates/core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,28 @@ pub(crate) enum HostCallbackResult {
},
}

fn serve(socket: SocketAddr, router: axum::Router) {
async fn serve(socket: SocketAddr, router: axum::Router) -> std::io::Result<()> {
let listener = tokio::net::TcpListener::bind(socket).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::AddrInUse {
std::io::Error::new(
std::io::ErrorKind::AddrInUse,
format!(
"Port {} is already in use. Another freenet process may be running. \
Use 'pkill freenet' to stop it, or specify a different port with --gateway-port.",
socket.port()
),
)
} else {
e
}
})?;
tracing::info!("HTTP gateway listening on {}", socket);
tokio::spawn(async move {
tracing::info!("HTTP gateway listening on {}", socket);
let listener = tokio::net::TcpListener::bind(socket).await.unwrap();
axum::serve(listener, router).await.map_err(|e| {
tracing::error!("Error while running HTTP gateway server: {e}");
})
});
Ok(())
}

pub mod local_node {
Expand Down Expand Up @@ -101,7 +115,7 @@ pub mod local_node {
let (mut gw, gw_router) = HttpGateway::as_router(&socket);
let (mut ws_proxy, ws_router) = WebSocketProxy::create_router(gw_router);

serve(socket, ws_router.layer(TraceLayer::new_for_http()));
serve(socket, ws_router.layer(TraceLayer::new_for_http())).await?;

// TODO: use combinator instead
// let mut all_clients =
Expand Down Expand Up @@ -199,23 +213,25 @@ pub mod local_node {
}
}

pub async fn serve_gateway(config: WebsocketApiConfig) -> [BoxedClient; 2] {
let (gw, ws_proxy) = serve_gateway_in(config).await;
[Box::new(gw), Box::new(ws_proxy)]
pub async fn serve_gateway(config: WebsocketApiConfig) -> std::io::Result<[BoxedClient; 2]> {
let (gw, ws_proxy) = serve_gateway_in(config).await?;
Ok([Box::new(gw), Box::new(ws_proxy)])
}

/// Serves the gateway and returns the concrete types (for integration testing).
/// This allows tests to access internal state like the attested_contracts map.
pub async fn serve_gateway_for_test(
config: WebsocketApiConfig,
) -> (
) -> std::io::Result<(
http_gateway::HttpGateway,
crate::client_events::websocket::WebSocketProxy,
) {
)> {
serve_gateway_in(config).await
}

pub(crate) async fn serve_gateway_in(config: WebsocketApiConfig) -> (HttpGateway, WebSocketProxy) {
pub(crate) async fn serve_gateway_in(
config: WebsocketApiConfig,
) -> std::io::Result<(HttpGateway, WebSocketProxy)> {
let ws_socket = (config.address, config.port).into();

// Create a shared attested_contracts map with token expiration support
Expand All @@ -234,8 +250,8 @@ pub(crate) async fn serve_gateway_in(config: WebsocketApiConfig) -> (HttpGateway
let (ws_proxy, ws_router) =
WebSocketProxy::create_router_with_attested_contracts(gw_router, attested_contracts);

serve(ws_socket, ws_router.layer(TraceLayer::new_for_http()));
(gw, ws_proxy)
serve(ws_socket, ws_router.layer(TraceLayer::new_for_http())).await?;
Ok((gw, ws_proxy))
}

/// Spawns a background task that periodically removes expired authentication tokens.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tests/error_notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ async fn test_connection_drop_error_notification() -> anyhow::Result<()> {
let config = gateway_config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -391,7 +391,7 @@ async fn test_connection_drop_error_notification() -> anyhow::Result<()> {
let config = peer_config.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;

// Run node until we receive shutdown signal
Expand Down
10 changes: 5 additions & 5 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2219,7 +2219,7 @@ async fn test_gateway_packet_size_change_after_60s() -> TestResult {
let config = config_gw1.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -2230,7 +2230,7 @@ async fn test_gateway_packet_size_change_after_60s() -> TestResult {
let config = config_gw2.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -2241,7 +2241,7 @@ async fn test_gateway_packet_size_change_after_60s() -> TestResult {
let config = config_client.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down Expand Up @@ -2420,7 +2420,7 @@ async fn test_production_decryption_error_scenario() -> TestResult {
let config = config_gw.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand All @@ -2430,7 +2430,7 @@ async fn test_production_decryption_error_scenario() -> TestResult {
let config = config_client.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.build(serve_gateway(config.ws_api).await?)
.await?;
node.run().await
}
Expand Down
4 changes: 3 additions & 1 deletion crates/core/tests/token_expiration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ async fn test_token_cleanup_removes_expired_tokens() -> TestResult {

let ws_socket = TcpListener::bind("127.0.0.1:0")?;
let ws_port = ws_socket.local_addr()?.port();
// Drop the socket to release the port before the gateway binds to it
drop(ws_socket);

let config = WebsocketApiConfig {
address: Ipv4Addr::LOCALHOST.into(),
Expand All @@ -207,7 +209,7 @@ async fn test_token_cleanup_removes_expired_tokens() -> TestResult {
};

// Start the gateway server (which spawns the cleanup task)
let (gw, _ws_proxy) = serve_gateway_for_test(config).await;
let (gw, _ws_proxy) = serve_gateway_for_test(config).await?;

// Access the attested_contracts map via the test-only method
let attested_contracts = gw.attested_contracts();
Expand Down
Loading
Loading