Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial prometheus metrics #37

Merged
merged 16 commits into from Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion build.zig
Expand Up @@ -23,6 +23,7 @@ pub fn build(b: *std.Build) void {
const zig_network_module = b.dependency("zig-network", opts).module("network");
const zig_cli_module = b.dependency("zig-cli", opts).module("zig-cli");
const getty_mod = b.dependency("getty", opts).module("getty");
const httpz_mod = b.dependency("httpz", opts).module("httpz");

const lib = b.addStaticLibrary(.{
.name = "sig",
Expand Down Expand Up @@ -53,13 +54,18 @@ pub fn build(b: *std.Build) void {
.name = "getty",
.module = getty_mod,
},
.{
.name = "httpz",
.module = httpz_mod,
},
},
});

lib.addModule("base58-zig", base58_module);
lib.addModule("zig-network", zig_network_module);
lib.addModule("zig-cli", zig_cli_module);
lib.addModule("getty", getty_mod);
lib.addModule("httpz", httpz_mod);

// This declares intent for the library to be installed into the standard
// location when the user invokes the "install" step (the default step when
Expand All @@ -77,6 +83,8 @@ pub fn build(b: *std.Build) void {
tests.addModule("base58-zig", base58_module);
tests.addModule("zig-cli", zig_cli_module);
tests.addModule("getty", getty_mod);
tests.addModule("httpz", httpz_mod);

const run_tests = b.addRunArtifact(tests);
const test_step = b.step("test", "Run library tests");
test_step.dependOn(&lib.step);
Expand All @@ -94,6 +102,7 @@ pub fn build(b: *std.Build) void {
exe.addModule("zig-network", zig_network_module);
exe.addModule("zig-cli", zig_cli_module);
exe.addModule("getty", getty_mod);
exe.addModule("httpz", httpz_mod);

// This declares intent for the executable to be installed into the
// standard location when the user invokes the "install" step (the default
Expand Down Expand Up @@ -137,6 +146,8 @@ pub fn build(b: *std.Build) void {
fuzz_exe.addModule("zig-network", zig_network_module);
fuzz_exe.addModule("zig-cli", zig_cli_module);
fuzz_exe.addModule("getty", getty_mod);
fuzz_exe.addModule("httpz", httpz_mod);

b.installArtifact(fuzz_exe);
const fuzz_cmd = b.addRunArtifact(fuzz_exe);
if (b.args) |args| {
Expand All @@ -158,11 +169,12 @@ pub fn build(b: *std.Build) void {
benchmark_exe.addModule("zig-network", zig_network_module);
benchmark_exe.addModule("zig-cli", zig_cli_module);
benchmark_exe.addModule("getty", getty_mod);
benchmark_exe.addModule("httpz", httpz_mod);

b.installArtifact(benchmark_exe);
const benchmark_cmd = b.addRunArtifact(benchmark_exe);
if (b.args) |args| {
benchmark_cmd.addArgs(args);
}

b.step("benchmark", "benchmark gossip").dependOn(&benchmark_cmd.step);
}
4 changes: 4 additions & 0 deletions build.zig.zon
Expand Up @@ -18,5 +18,9 @@
.url = "https://github.com/getty-zig/getty/archive/5b0e750d92ee4ef8e46ad743bb8ced63723acd00.tar.gz",
.hash = "12209398657d260abcd6dae946d8da4cd3057b8c7990608476a9f8011aae570d2ebb",
},
.httpz = .{
.url = "https://github.com/karlseguin/http.zig/archive/7a751549a751d9b45952037abdb127b3225b2ac1.tar.gz",
.hash = "122004f74adf46001fe9129d8cec54bd4a98895ce89f0897790e13b60fa99e527b99",
},
},
}
25 changes: 25 additions & 0 deletions src/cmd/cmd.zig
Expand Up @@ -8,6 +8,9 @@ const io = std.io;
const Pubkey = @import("../core/pubkey.zig").Pubkey;
const SocketAddr = @import("../net/net.zig").SocketAddr;
const GossipService = @import("../gossip/gossip_service.zig").GossipService;
const servePrometheus = @import("../prometheus/http.zig").servePrometheus;
const global_registry = @import("../prometheus/registry.zig").global_registry;
const Registry = @import("../prometheus/registry.zig").Registry;

var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const gpa_allocator = gpa.allocator();
Expand All @@ -31,11 +34,21 @@ var gossip_entrypoints_option = cli.Option{
.value_name = "Entrypoints",
};

var metrics_port_option = cli.Option{
.long_name = "metrics-port",
.help = "port to expose prometheus metrics via http",
.short_alias = 'm',
.value = cli.OptionValue{ .int = 12345 },
.required = false,
.value_name = "port_number",
};

var app = &cli.App{
.name = "sig",
.description = "Sig is a Solana client implementation written in Zig.\nThis is still a WIP, PRs welcome.",
.version = "0.1.1",
.author = "Syndica & Contributors",
.options = &.{&metrics_port_option},
.subcommands = &.{
&cli.Command{
.name = "identity",
Expand Down Expand Up @@ -76,6 +89,8 @@ fn gossip(_: []const []const u8) !void {

// var logger: Logger = .noop;

const metrics_thread = try spawnMetrics(gpa_allocator);

var my_keypair = try getOrInitIdentity(gpa_allocator, logger);

var gossip_port: u16 = @intCast(gossip_port_option.value.int.?);
Expand Down Expand Up @@ -119,6 +134,16 @@ fn gossip(_: []const []const u8) !void {
);

handle.join();
metrics_thread.detach();
}

/// Initializes the global registry. Returns error if registry was already initialized.
/// Spawns a thread to serve the metrics over http on the CLI configured port.
/// Uses same allocator for both registry and http adapter.
fn spawnMetrics(allocator: std.mem.Allocator) !std.Thread {
var metrics_port: u16 = @intCast(metrics_port_option.value.int.?);
const registry = try global_registry.initialize(Registry(.{}).init, .{allocator});
return try std.Thread.spawn(.{}, servePrometheus, .{ allocator, registry, metrics_port });
}

pub fn run() !void {
Expand Down
11 changes: 11 additions & 0 deletions src/lib.zig
Expand Up @@ -46,6 +46,7 @@ pub const sync = struct {
pub usingnamespace @import("sync/mpmc.zig");
pub usingnamespace @import("sync/ref.zig");
pub usingnamespace @import("sync/mux.zig");
pub usingnamespace @import("sync/once_cell.zig");
pub usingnamespace @import("sync/thread_pool.zig");
};

Expand Down Expand Up @@ -75,3 +76,13 @@ pub const net = struct {
pub usingnamespace @import("net/net.zig");
pub usingnamespace @import("net/echo.zig");
};

pub const prometheus = struct {
pub usingnamespace @import("prometheus/counter.zig");
pub usingnamespace @import("prometheus/gauge.zig");
pub usingnamespace @import("prometheus/gauge_fn.zig");
pub usingnamespace @import("prometheus/http.zig");
pub usingnamespace @import("prometheus/histogram.zig");
pub usingnamespace @import("prometheus/metric.zig");
pub usingnamespace @import("prometheus/registry.zig");
};
89 changes: 89 additions & 0 deletions src/prometheus/counter.zig
@@ -0,0 +1,89 @@
const std = @import("std");
const mem = std.mem;
const testing = std.testing;

const Metric = @import("metric.zig").Metric;

pub const Counter = struct {
const Self = @This();

metric: Metric = Metric{ .getResultFn = getResult },
value: std.atomic.Atomic(u64) = std.atomic.Atomic(u64).init(0),

pub fn inc(self: *Self) void {
_ = self.value.fetchAdd(1, .Monotonic);
}

pub fn add(self: *Self, value: anytype) void {
switch (@typeInfo(@TypeOf(value))) {
.Int, .Float, .ComptimeInt, .ComptimeFloat => {},
else => @compileError("can't add a non-number"),
}

_ = self.value.fetchAdd(@intCast(value), .Monotonic);
}

pub fn get(self: *const Self) u64 {
return self.value.load(.Monotonic);
}

pub fn reset(self: *Self) void {
_ = self.value.store(0, .Monotonic);
}

fn getResult(metric: *Metric, _: mem.Allocator) Metric.Error!Metric.Result {
const self = @fieldParentPtr(Self, "metric", metric);
return Metric.Result{ .counter = self.get() };
}
};

test "prometheus.counter: inc/add/dec/set/get" {
var buffer = std.ArrayList(u8).init(testing.allocator);
defer buffer.deinit();

var counter = Counter{};

try testing.expectEqual(@as(u64, 0), counter.get());

counter.inc();
try testing.expectEqual(@as(u64, 1), counter.get());

counter.add(200);
try testing.expectEqual(@as(u64, 201), counter.get());
}

test "prometheus.counter: concurrent" {
var counter = Counter{};

var threads: [4]std.Thread = undefined;
for (&threads) |*thread| {
thread.* = try std.Thread.spawn(
.{},
struct {
fn run(c: *Counter) void {
var i: usize = 0;
while (i < 20) : (i += 1) {
c.inc();
}
}
}.run,
.{&counter},
);
}

for (&threads) |*thread| thread.join();

try testing.expectEqual(@as(u64, 80), counter.get());
}

test "prometheus.counter: write" {
var counter = Counter{ .value = .{ .value = 340 } };

var buffer = std.ArrayList(u8).init(testing.allocator);
defer buffer.deinit();

var metric = &counter.metric;
try metric.write(testing.allocator, buffer.writer(), "mycounter");

try testing.expectEqualStrings("mycounter 340\n", buffer.items);
}
50 changes: 50 additions & 0 deletions src/prometheus/gauge.zig
@@ -0,0 +1,50 @@
const std = @import("std");

const Metric = @import("metric.zig").Metric;

/// A gauge that stores the value it reports.
/// Read and write operations are atomic and monotonic.
pub fn Gauge(comptime T: type) type {
return struct {
value: std.atomic.Atomic(T) = .{ .value = 0 },
metric: Metric = .{ .getResultFn = getResult },

const Self = @This();

pub fn inc(self: *Self) void {
self.value.fetchAdd(1, .Monotonic);
}

pub fn add(self: *Self, v: T) void {
self.value.fetchAdd(v, .Monotonic);
}

pub fn dec(self: *Self) void {
self.value.fetchSub(1, .Monotonic);
}

pub fn sub(self: *Self, v: T) void {
self.value.fetchAdd(v, .Monotonic);
}

pub fn set(self: *Self, v: T) void {
self.value.store(v, .Monotonic);
}

pub fn get(self: *Self) T {
return self.value.load(.Monotonic);
}

fn getResult(metric: *Metric, allocator: std.mem.Allocator) Metric.Error!Metric.Result {
_ = allocator;

const self = @fieldParentPtr(Self, "metric", metric);

return switch (T) {
f64 => Metric.Result{ .gauge = self.get() },
u64 => Metric.Result{ .gauge_int = self.get() },
else => unreachable, // Gauge Return may only be 'f64' or 'u64'
};
}
};
}