diff --git a/src/lean_api.zig b/src/lean_api.zig index 6f8d93e..505db15 100644 --- a/src/lean_api.zig +++ b/src/lean_api.zig @@ -1,21 +1,49 @@ const std = @import("std"); const log = @import("log.zig"); +const posix = std.posix; pub const Slots = struct { justified_slot: u64, finalized_slot: u64, }; -/// Fetch finalized and justified slots from lean node endpoints -/// - Finalized: /lean/v0/states/finalized (SSZ-encoded LeanState) -/// - Justified: /lean/v0/checkpoints/justified (JSON checkpoint with slot) -/// Falls back to finalized slot for justified when the justified endpoint is unavailable +/// Apply SO_RCVTIMEO and SO_SNDTIMEO to the request's underlying TCP socket so +/// any blocking read/write on an unresponsive upstream returns within the +/// configured deadline rather than hanging on the kernel's TCP retransmit +/// window (~15 minutes on Linux). +/// +/// Zig 0.14's std.http.Client does not expose connect/read timeouts at the +/// client level (see @hasField checks in poller.zig / server.zig), so setting +/// socket options directly on the stream handle after `client.open` is the +/// only way to bound the worker's lifetime. Without this, each timed-out +/// detached worker holds a file descriptor open until the peer eventually +/// sends RST/FIN, producing the ESTAB / CLOSE-WAIT socket pile-up that +/// eventually exhausts RLIMIT_NOFILE. +fn applySocketTimeouts(req: *std.http.Client.Request, timeout_ms: u64) void { + const conn = req.connection orelse return; + const tv = posix.timeval{ + .sec = @intCast(timeout_ms / 1000), + .usec = @intCast((timeout_ms % 1000) * 1000), + }; + const bytes = std.mem.asBytes(&tv); + posix.setsockopt(conn.stream.handle, posix.SOL.SOCKET, posix.SO.RCVTIMEO, bytes) catch |err| { + log.debug("setsockopt RCVTIMEO failed: {s}", .{@errorName(err)}); + }; + posix.setsockopt(conn.stream.handle, posix.SOL.SOCKET, posix.SO.SNDTIMEO, bytes) catch |err| { + log.debug("setsockopt SNDTIMEO failed: {s}", .{@errorName(err)}); + }; +} + +/// Fetch finalized and justified slots from the Lean HTTP API (clients must implement these). +/// - Finalized: GET /lean/v0/states/finalized — SSZ body, `Accept: application/octet-stream` +/// - Justified: GET /lean/v0/checkpoints/justified — JSON `{"root":"0x...","slot":N}` pub fn fetchSlots( allocator: std.mem.Allocator, client: *std.http.Client, base_url: []const u8, - _: []const u8, // path parameter not used anymore + _: []const u8, // upstream path (health URL in config); slot polling uses Lean paths above out_state_ssz: *?[]u8, + timeout_ms: u64, ) !Slots { // Fetch finalized slot from SSZ-encoded endpoint const finalized_slot = try fetchSlotFromSSZEndpoint( @@ -24,6 +52,7 @@ pub fn fetchSlots( base_url, "/lean/v0/states/finalized", out_state_ssz, + timeout_ms, ); // Fetch justified slot from JSON checkpoint endpoint (zeam serves this) @@ -31,6 +60,7 @@ pub fn fetchSlots( allocator, client, base_url, + timeout_ms, ) catch |err| { log.debug("Justified checkpoint unavailable ({s}), using finalized slot", .{@errorName(err)}); return Slots{ @@ -51,6 +81,7 @@ fn fetchJustifiedSlotFromJsonEndpoint( allocator: std.mem.Allocator, client: *std.http.Client, base_url: []const u8, + timeout_ms: u64, ) !u64 { var url_buf: [512]u8 = undefined; const url = try std.fmt.bufPrint(&url_buf, "{s}/lean/v0/checkpoints/justified", .{base_url}); @@ -65,6 +96,7 @@ fn fetchJustifiedSlotFromJsonEndpoint( }, }); defer req.deinit(); + applySocketTimeouts(&req, timeout_ms); try req.send(); try req.finish(); @@ -129,6 +161,7 @@ fn fetchSlotFromSSZEndpoint( base_url: []const u8, path: []const u8, out_state_ssz: *?[]u8, + timeout_ms: u64, ) !u64 { // Build full URL var url_buf: [512]u8 = undefined; @@ -148,6 +181,7 @@ fn fetchSlotFromSSZEndpoint( }, }); defer req.deinit(); + applySocketTimeouts(&req, timeout_ms); try req.send(); try req.finish(); @@ -250,6 +284,7 @@ pub fn fetchForkChoice( allocator: std.mem.Allocator, client: *std.http.Client, base_url: []const u8, + timeout_ms: u64, ) ![]const u8 { var url_buf: [512]u8 = undefined; const url = try std.fmt.bufPrint(&url_buf, "{s}/lean/v0/fork_choice", .{base_url}); @@ -264,6 +299,7 @@ pub fn fetchForkChoice( }, }); defer req.deinit(); + applySocketTimeouts(&req, timeout_ms); try req.send(); try req.finish(); diff --git a/src/poller.zig b/src/poller.zig index 15bf7be..ca20882 100644 --- a/src/poller.zig +++ b/src/poller.zig @@ -79,6 +79,7 @@ pub const Poller = struct { self.config.lean_api_base_url, self.config.lean_api_path, &state_ssz, + self.config.request_timeout_ms, ) catch |err| { var msg_buf = std.ArrayList(u8).init(self.allocator); defer msg_buf.deinit(); diff --git a/src/server.zig b/src/server.zig index ea6a851..6956174 100644 --- a/src/server.zig +++ b/src/server.zig @@ -286,7 +286,7 @@ fn handleApiUpstreamForkChoice( client.read_timeout = config.request_timeout_ms * std.time.ns_per_ms; } - const body = lean_api.fetchForkChoice(allocator, &client, upstream.url) catch |err| { + const body = lean_api.fetchForkChoice(allocator, &client, upstream.url, config.request_timeout_ms) catch |err| { log.warn("fetchForkChoice from {s} failed: {}", .{ upstream.url, err }); try respondText(req, .bad_gateway, "failed to fetch fork choice from upstream\n", "text/plain"); return; diff --git a/src/upstreams.zig b/src/upstreams.zig index 744ba40..069532d 100644 --- a/src/upstreams.zig +++ b/src/upstreams.zig @@ -126,6 +126,11 @@ pub const UpstreamManager = struct { base_url: []u8, path: []u8, name: []u8, + // Socket-level timeout applied to the HTTP request inside the worker. + // Bounds the worker's lifetime so detached threads clean up their + // sockets within a predictable window (rather than hanging for the + // kernel's TCP retransmit window on unresponsive peers). + timeout_ms: u64, // written by the thread, read by the spawner after deadline done: std.atomic.Value(bool), slots: ?lean_api.Slots, @@ -139,6 +144,7 @@ pub const UpstreamManager = struct { name: []const u8, base_url: []const u8, path: []const u8, + timeout_ms: u64, ) !*PollCtx { const ctx = try allocator.create(PollCtx); ctx.* = .{ @@ -147,6 +153,7 @@ pub const UpstreamManager = struct { .name = try allocator.dupe(u8, name), .base_url = try allocator.dupe(u8, base_url), .path = try allocator.dupe(u8, path), + .timeout_ms = timeout_ms, .done = std.atomic.Value(bool).init(false), .slots = null, .error_msg = null, @@ -184,6 +191,7 @@ pub const UpstreamManager = struct { ctx.base_url, ctx.path, &state_ssz, + ctx.timeout_ms, ) catch |err| { ctx.error_msg = std.fmt.allocPrint(ctx.allocator, "{s}", .{@errorName(err)}) catch null; ctx.done.store(true, .release); @@ -246,6 +254,7 @@ pub const UpstreamManager = struct { target.name, target.base_url, target.path, + timeout_ms, ) catch |err| { log.warn("Failed to allocate poll context for {s}: {s}", .{ target.name, @errorName(err) }); continue;