Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions src/lean_api.zig
Original file line number Diff line number Diff line change
@@ -1,21 +1,49 @@
const std = @import("std");
const log = @import("log.zig");
const posix = std.posix;

pub const Slots = struct {
justified_slot: u64,
finalized_slot: u64,
};

/// Fetch finalized and justified slots from lean node endpoints
/// - Finalized: /lean/v0/states/finalized (SSZ-encoded LeanState)
/// - Justified: /lean/v0/checkpoints/justified (JSON checkpoint with slot)
/// Falls back to finalized slot for justified when the justified endpoint is unavailable
/// Apply SO_RCVTIMEO and SO_SNDTIMEO to the request's underlying TCP socket so
/// any blocking read/write on an unresponsive upstream returns within the
/// configured deadline rather than hanging on the kernel's TCP retransmit
/// window (~15 minutes on Linux).
///
/// Zig 0.14's std.http.Client does not expose connect/read timeouts at the
/// client level (see @hasField checks in poller.zig / server.zig), so setting
/// socket options directly on the stream handle after `client.open` is the
/// only way to bound the worker's lifetime. Without this, each timed-out
/// detached worker holds a file descriptor open until the peer eventually
/// sends RST/FIN, producing the ESTAB / CLOSE-WAIT socket pile-up that
/// eventually exhausts RLIMIT_NOFILE.
fn applySocketTimeouts(req: *std.http.Client.Request, timeout_ms: u64) void {
const conn = req.connection orelse return;
const tv = posix.timeval{
.sec = @intCast(timeout_ms / 1000),
.usec = @intCast((timeout_ms % 1000) * 1000),
};
const bytes = std.mem.asBytes(&tv);
posix.setsockopt(conn.stream.handle, posix.SOL.SOCKET, posix.SO.RCVTIMEO, bytes) catch |err| {
log.debug("setsockopt RCVTIMEO failed: {s}", .{@errorName(err)});
};
posix.setsockopt(conn.stream.handle, posix.SOL.SOCKET, posix.SO.SNDTIMEO, bytes) catch |err| {
log.debug("setsockopt SNDTIMEO failed: {s}", .{@errorName(err)});
};
}

/// Fetch finalized and justified slots from the Lean HTTP API (clients must implement these).
/// - Finalized: GET /lean/v0/states/finalized — SSZ body, `Accept: application/octet-stream`
/// - Justified: GET /lean/v0/checkpoints/justified — JSON `{"root":"0x...","slot":N}`
pub fn fetchSlots(
allocator: std.mem.Allocator,
client: *std.http.Client,
base_url: []const u8,
_: []const u8, // path parameter not used anymore
_: []const u8, // upstream path (health URL in config); slot polling uses Lean paths above
out_state_ssz: *?[]u8,
timeout_ms: u64,
) !Slots {
// Fetch finalized slot from SSZ-encoded endpoint
const finalized_slot = try fetchSlotFromSSZEndpoint(
Expand All @@ -24,13 +52,15 @@ pub fn fetchSlots(
base_url,
"/lean/v0/states/finalized",
out_state_ssz,
timeout_ms,
);

// Fetch justified slot from JSON checkpoint endpoint (zeam serves this)
const justified_slot = fetchJustifiedSlotFromJsonEndpoint(
allocator,
client,
base_url,
timeout_ms,
) catch |err| {
log.debug("Justified checkpoint unavailable ({s}), using finalized slot", .{@errorName(err)});
return Slots{
Expand All @@ -51,6 +81,7 @@ fn fetchJustifiedSlotFromJsonEndpoint(
allocator: std.mem.Allocator,
client: *std.http.Client,
base_url: []const u8,
timeout_ms: u64,
) !u64 {
var url_buf: [512]u8 = undefined;
const url = try std.fmt.bufPrint(&url_buf, "{s}/lean/v0/checkpoints/justified", .{base_url});
Expand All @@ -65,6 +96,7 @@ fn fetchJustifiedSlotFromJsonEndpoint(
},
});
defer req.deinit();
applySocketTimeouts(&req, timeout_ms);

try req.send();
try req.finish();
Expand Down Expand Up @@ -129,6 +161,7 @@ fn fetchSlotFromSSZEndpoint(
base_url: []const u8,
path: []const u8,
out_state_ssz: *?[]u8,
timeout_ms: u64,
) !u64 {
// Build full URL
var url_buf: [512]u8 = undefined;
Expand All @@ -148,6 +181,7 @@ fn fetchSlotFromSSZEndpoint(
},
});
defer req.deinit();
applySocketTimeouts(&req, timeout_ms);

try req.send();
try req.finish();
Expand Down Expand Up @@ -250,6 +284,7 @@ pub fn fetchForkChoice(
allocator: std.mem.Allocator,
client: *std.http.Client,
base_url: []const u8,
timeout_ms: u64,
) ![]const u8 {
var url_buf: [512]u8 = undefined;
const url = try std.fmt.bufPrint(&url_buf, "{s}/lean/v0/fork_choice", .{base_url});
Expand All @@ -264,6 +299,7 @@ pub fn fetchForkChoice(
},
});
defer req.deinit();
applySocketTimeouts(&req, timeout_ms);

try req.send();
try req.finish();
Expand Down
1 change: 1 addition & 0 deletions src/poller.zig
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub const Poller = struct {
self.config.lean_api_base_url,
self.config.lean_api_path,
&state_ssz,
self.config.request_timeout_ms,
) catch |err| {
var msg_buf = std.ArrayList(u8).init(self.allocator);
defer msg_buf.deinit();
Expand Down
2 changes: 1 addition & 1 deletion src/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ fn handleApiUpstreamForkChoice(
client.read_timeout = config.request_timeout_ms * std.time.ns_per_ms;
}

const body = lean_api.fetchForkChoice(allocator, &client, upstream.url) catch |err| {
const body = lean_api.fetchForkChoice(allocator, &client, upstream.url, config.request_timeout_ms) catch |err| {
log.warn("fetchForkChoice from {s} failed: {}", .{ upstream.url, err });
try respondText(req, .bad_gateway, "failed to fetch fork choice from upstream\n", "text/plain");
return;
Expand Down
9 changes: 9 additions & 0 deletions src/upstreams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ pub const UpstreamManager = struct {
base_url: []u8,
path: []u8,
name: []u8,
// Socket-level timeout applied to the HTTP request inside the worker.
// Bounds the worker's lifetime so detached threads clean up their
// sockets within a predictable window (rather than hanging for the
// kernel's TCP retransmit window on unresponsive peers).
timeout_ms: u64,
// written by the thread, read by the spawner after deadline
done: std.atomic.Value(bool),
slots: ?lean_api.Slots,
Expand All @@ -139,6 +144,7 @@ pub const UpstreamManager = struct {
name: []const u8,
base_url: []const u8,
path: []const u8,
timeout_ms: u64,
) !*PollCtx {
const ctx = try allocator.create(PollCtx);
ctx.* = .{
Expand All @@ -147,6 +153,7 @@ pub const UpstreamManager = struct {
.name = try allocator.dupe(u8, name),
.base_url = try allocator.dupe(u8, base_url),
.path = try allocator.dupe(u8, path),
.timeout_ms = timeout_ms,
.done = std.atomic.Value(bool).init(false),
.slots = null,
.error_msg = null,
Expand Down Expand Up @@ -184,6 +191,7 @@ pub const UpstreamManager = struct {
ctx.base_url,
ctx.path,
&state_ssz,
ctx.timeout_ms,
) catch |err| {
ctx.error_msg = std.fmt.allocPrint(ctx.allocator, "{s}", .{@errorName(err)}) catch null;
ctx.done.store(true, .release);
Expand Down Expand Up @@ -246,6 +254,7 @@ pub const UpstreamManager = struct {
target.name,
target.base_url,
target.path,
timeout_ms,
) catch |err| {
log.warn("Failed to allocate poll context for {s}: {s}", .{ target.name, @errorName(err) });
continue;
Expand Down
Loading