diff --git a/.gitignore b/.gitignore index aefab353..9c910403 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,10 @@ frameworks/blitz/zig-linux-* frameworks/blitz/.zig-cache frameworks/fletch/.dart_tool/ !frameworks/hyperf/bin/ +frameworks/zix/.zig-cache +frameworks/zix/zig-out +frameworks/zix-grpc/.zig-cache +frameworks/zix-grpc/zig-out # IDE settings *.user diff --git a/frameworks/zix-grpc/Dockerfile b/frameworks/zix-grpc/Dockerfile new file mode 100644 index 00000000..5da64fc8 --- /dev/null +++ b/frameworks/zix-grpc/Dockerfile @@ -0,0 +1,46 @@ +# syntax=docker/dockerfile:1.7 +# +# zix-grpc. Zig gRPC h2c server using the zix library. +# EPOLL dispatch: single accept loop + worker pool, EPOLLONESHOT per stream. +# Implements benchmark.BenchmarkService/GetSum (unary) and StreamSum (server-streaming). +# Built statically against musl so the runtime image is a single binary on scratch. + +FROM debian:bookworm-slim AS build +ARG ZIG_VERSION=0.16.0 +ARG TARGETARCH +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates curl xz-utils \ + && rm -rf /var/lib/apt/lists/* + +RUN set -eu; \ + case "${TARGETARCH:-amd64}" in \ + amd64) ZIG_ARCH=x86_64 ;; \ + arm64) ZIG_ARCH=aarch64 ;; \ + *) echo "unsupported arch: ${TARGETARCH}" >&2; exit 1 ;; \ + esac; \ + curl -fsSL "https://ziglang.org/download/${ZIG_VERSION}/zig-${ZIG_ARCH}-linux-${ZIG_VERSION}.tar.xz" \ + | tar -xJ -C /opt; \ + mv "/opt/zig-${ZIG_ARCH}-linux-${ZIG_VERSION}" /opt/zig +ENV PATH="/opt/zig:${PATH}" + +# Vendor zix 0.2.x at a pinned commit, separate layer so source-only rebuilds +# skip the download. Strip the top-level directory produced by GitHub's archive. +RUN mkdir -p /src/vendor/zix && \ + curl -fsSL https://github.com/prothegee/zix/archive/refs/heads/0.2.x.tar.gz \ + | tar -xz --strip-components=1 -C /src/vendor/zix + +WORKDIR /src +COPY build.zig build.zig.zon ./ +COPY src ./src +RUN set -eu; \ + case "${TARGETARCH:-amd64}" in \ + amd64) ZIG_TARGET=x86_64-linux-musl ;; \ + arm64) ZIG_TARGET=aarch64-linux-musl ;; \ + esac; \ + zig build -Dtarget="${ZIG_TARGET}" --release=fast + +FROM debian:bookworm-slim +COPY --from=build /src/zig-out/bin/zix-grpc /zix-grpc +EXPOSE 8080 +ENTRYPOINT ["/zix-grpc"] diff --git a/frameworks/zix-grpc/build.zig b/frameworks/zix-grpc/build.zig new file mode 100644 index 00000000..e7446b9f --- /dev/null +++ b/frameworks/zix-grpc/build.zig @@ -0,0 +1,26 @@ +const std = @import("std"); + +pub fn build(b: *std.Build) void { + const target = b.standardTargetOptions(.{}); + const optimize = b.standardOptimizeOption(.{ .preferred_optimize_mode = .ReleaseFast }); + + const zix_dep = b.dependency("zix", .{ .target = target, .optimize = optimize }); + const zix_mod = zix_dep.module("zix"); + + const exe = b.addExecutable(.{ + .name = "zix-grpc", + .root_module = b.createModule(.{ + .root_source_file = b.path("src/main.zig"), + .target = target, + .optimize = optimize, + .strip = true, + }), + }); + exe.root_module.addImport("zix", zix_mod); + b.installArtifact(exe); + + const run_step = b.step("run", "Run the gRPC server"); + const run_cmd = b.addRunArtifact(exe); + if (b.args) |args| run_cmd.addArgs(args); + run_step.dependOn(&run_cmd.step); +} diff --git a/frameworks/zix-grpc/build.zig.zon b/frameworks/zix-grpc/build.zig.zon new file mode 100644 index 00000000..52fa22ae --- /dev/null +++ b/frameworks/zix-grpc/build.zig.zon @@ -0,0 +1,16 @@ +.{ + .name = .zix_grpc_arena, + .version = "0.1.0", + .fingerprint = 0x458ebf118a0a92d9, + .minimum_zig_version = "0.16.0", + .paths = .{ + "build.zig", + "build.zig.zon", + "src", + }, + .dependencies = .{ + .zix = .{ + .path = "vendor/zix", + }, + }, +} diff --git a/frameworks/zix-grpc/meta.json b/frameworks/zix-grpc/meta.json new file mode 100644 index 00000000..3911372f --- /dev/null +++ b/frameworks/zix-grpc/meta.json @@ -0,0 +1,14 @@ +{ + "display_name": "zix-grpc", + "language": "Zig", + "type": "engine", + "engine": "zix", + "description": "Zig gRPC h2c server using the zix framework. Single epoll event loop; worker pool handles one stream per slot. Implements GetSum (unary) and StreamSum (server-streaming).", + "repo": "https://github.com/prothegee/zix", + "enabled": true, + "tests": [ + "unary-grpc", + "stream-grpc" + ], + "maintainers": ["prothegee"] +} diff --git a/frameworks/zix-grpc/src/main.zig b/frameworks/zix-grpc/src/main.zig new file mode 100644 index 00000000..ba68d3f9 --- /dev/null +++ b/frameworks/zix-grpc/src/main.zig @@ -0,0 +1,99 @@ +const std = @import("std"); +const zix = @import("zix"); + +// --------------------------------------------------------- // + +const PORT: u16 = 8080; +/// Required for ipv4 and ipv6 +const LISTEN_IP: []const u8 = "::"; +const DISPATCH_MODEL: zix.Grpc.DispatchModel = .EPOLL; +const KERNEL_BACKLOG: u31 = 1024 * 16; +const WORKERS: usize = 0; +const POOL_SIZE: usize = 0; + +// --------------------------------------------------------- // + +/// Unary RPC: SumRequest{a, b} -> SumReply{result: a+b} +fn getSumHandler(headers: []const zix.Http2.Header, ctx: *zix.Grpc.Context) void { + _ = headers; + + const msg = ctx.recvMessage() orelse { + ctx.finish(.INVALID_ARGUMENT, "empty request"); + return; + }; + + var reader = zix.Grpc.MessageReader.init(msg); + var req_a: i32 = 0; + var req_b: i32 = 0; + + while (reader.next() catch null) |field| { + switch (field.field_number) { + 1 => req_a = @bitCast(@as(u32, @truncate(field.value_u64))), + 2 => req_b = @bitCast(@as(u32, @truncate(field.value_u64))), + else => {}, + } + } + + var reply_buf: [16]u8 = undefined; + const reply_len = zix.Grpc.encodeInt32(1, req_a + req_b, &reply_buf); + + ctx.sendMessage("application/grpc+proto", reply_buf[0..reply_len]); + ctx.finish(.OK, ""); +} + +/// Server-streaming RPC: StreamRequest{a, b, count} -> count * SumReply{result: a+b+i} +fn streamSumHandler(headers: []const zix.Http2.Header, ctx: *zix.Grpc.Context) void { + _ = headers; + + const msg = ctx.recvMessage() orelse { + ctx.finish(.INVALID_ARGUMENT, "empty request"); + return; + }; + + var reader = zix.Grpc.MessageReader.init(msg); + var req_a: i32 = 0; + var req_b: i32 = 0; + var req_count: i32 = 1; + + while (reader.next() catch null) |field| { + switch (field.field_number) { + 1 => req_a = @bitCast(@as(u32, @truncate(field.value_u64))), + 2 => req_b = @bitCast(@as(u32, @truncate(field.value_u64))), + 3 => req_count = @bitCast(@as(u32, @truncate(field.value_u64))), + else => {}, + } + } + + if (req_count <= 0) req_count = 1; + + const sum = req_a + req_b; + var reply_buf: [16]u8 = undefined; + + var i: i32 = 0; + while (i < req_count) : (i += 1) { + const reply_len = zix.Grpc.encodeInt32(1, sum + i, &reply_buf); + ctx.sendMessage("application/grpc+proto", reply_buf[0..reply_len]); + } + + ctx.finish(.OK, ""); +} + +// --------------------------------------------------------- // + +pub fn main(process: std.process.Init) !void { + var server = try zix.Grpc.Server.init(&[_]zix.Grpc.Route{ + .{ .path = "/benchmark.BenchmarkService/GetSum", .handler = getSumHandler }, + .{ .path = "/benchmark.BenchmarkService/StreamSum", .handler = streamSumHandler, .is_server_streaming = true }, + }, .{ + .io = process.io, + .ip = LISTEN_IP, + .port = PORT, + .dispatch_model = DISPATCH_MODEL, + .kernel_backlog = KERNEL_BACKLOG, + .workers = WORKERS, + .pool_size = POOL_SIZE, + }); + defer server.deinit(); + + try server.run(); +} diff --git a/frameworks/zix/Dockerfile b/frameworks/zix/Dockerfile new file mode 100644 index 00000000..4bfffa99 --- /dev/null +++ b/frameworks/zix/Dockerfile @@ -0,0 +1,45 @@ +# syntax=docker/dockerfile:1.7 +# +# zix. Zig HTTP/1.1 server using the zix library. +# EPOLL dispatch: single accept loop + worker pool, EPOLLONESHOT per connection. +# Built statically against musl so the runtime image is a single binary on scratch. + +FROM debian:bookworm-slim AS build +ARG ZIG_VERSION=0.16.0 +ARG TARGETARCH +ENV DEBIAN_FRONTEND=noninteractive +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates curl xz-utils \ + && rm -rf /var/lib/apt/lists/* + +RUN set -eu; \ + case "${TARGETARCH:-amd64}" in \ + amd64) ZIG_ARCH=x86_64 ;; \ + arm64) ZIG_ARCH=aarch64 ;; \ + *) echo "unsupported arch: ${TARGETARCH}" >&2; exit 1 ;; \ + esac; \ + curl -fsSL "https://ziglang.org/download/${ZIG_VERSION}/zig-${ZIG_ARCH}-linux-${ZIG_VERSION}.tar.xz" \ + | tar -xJ -C /opt; \ + mv "/opt/zig-${ZIG_ARCH}-linux-${ZIG_VERSION}" /opt/zig +ENV PATH="/opt/zig:${PATH}" + +# Vendor zix 0.2.x at a pinned commit, separate layer so source-only rebuilds +# skip the download. Strip the top-level directory produced by GitHub's archive. +RUN mkdir -p /src/vendor/zix && \ + curl -fsSL https://github.com/prothegee/zix/archive/refs/heads/0.2.x.tar.gz \ + | tar -xz --strip-components=1 -C /src/vendor/zix + +WORKDIR /src +COPY build.zig build.zig.zon ./ +COPY src ./src +RUN set -eu; \ + case "${TARGETARCH:-amd64}" in \ + amd64) ZIG_TARGET=x86_64-linux-musl ;; \ + arm64) ZIG_TARGET=aarch64-linux-musl ;; \ + esac; \ + zig build -Dtarget="${ZIG_TARGET}" --release=fast + +FROM debian:bookworm-slim +COPY --from=build /src/zig-out/bin/zix /zix +EXPOSE 8080 +ENTRYPOINT ["/zix"] diff --git a/frameworks/zix/build.zig b/frameworks/zix/build.zig new file mode 100644 index 00000000..5276d7f9 --- /dev/null +++ b/frameworks/zix/build.zig @@ -0,0 +1,26 @@ +const std = @import("std"); + +pub fn build(b: *std.Build) void { + const target = b.standardTargetOptions(.{}); + const optimize = b.standardOptimizeOption(.{ .preferred_optimize_mode = .ReleaseFast }); + + const zix_dep = b.dependency("zix", .{ .target = target, .optimize = optimize }); + const zix_mod = zix_dep.module("zix"); + + const exe = b.addExecutable(.{ + .name = "zix", + .root_module = b.createModule(.{ + .root_source_file = b.path("src/main.zig"), + .target = target, + .optimize = optimize, + .strip = true, + }), + }); + exe.root_module.addImport("zix", zix_mod); + b.installArtifact(exe); + + const run_step = b.step("run", "Run the server"); + const run_cmd = b.addRunArtifact(exe); + if (b.args) |args| run_cmd.addArgs(args); + run_step.dependOn(&run_cmd.step); +} diff --git a/frameworks/zix/build.zig.zon b/frameworks/zix/build.zig.zon new file mode 100644 index 00000000..e490ba37 --- /dev/null +++ b/frameworks/zix/build.zig.zon @@ -0,0 +1,16 @@ +.{ + .name = .zix_arena, + .version = "0.1.0", + .fingerprint = 0x84cceaddc218682f, + .minimum_zig_version = "0.16.0", + .paths = .{ + "build.zig", + "build.zig.zon", + "src", + }, + .dependencies = .{ + .zix = .{ + .path = "vendor/zix", + }, + }, +} diff --git a/frameworks/zix/meta.json b/frameworks/zix/meta.json new file mode 100644 index 00000000..269ede87 --- /dev/null +++ b/frameworks/zix/meta.json @@ -0,0 +1,19 @@ +{ + "display_name": "zix", + "language": "Zig", + "type": "engine", + "engine": "zix", + "description": "Zig HTTP/1.1 server using the zix framework. Single epoll event loop drains accepts; a fixed worker pool serves one request per epoll event with EPOLLONESHOT keep-alive.", + "repo": "https://github.com/prothegee/zix", + "enabled": true, + "tests": [ + "baseline", + "pipelined", + "limited-conn", + "json", + "upload", + "static", + "echo-ws" + ], + "maintainers": ["prothegee"] +} diff --git a/frameworks/zix/src/dataset.zig b/frameworks/zix/src/dataset.zig new file mode 100644 index 00000000..15ca9119 --- /dev/null +++ b/frameworks/zix/src/dataset.zig @@ -0,0 +1,159 @@ +const std = @import("std"); + +pub const ItemCount = 50; + +pub const Item = struct { + /// Pre-rendered JSON object for this item, WITHOUT the closing `}`. + /// Caller appends `,"total":}` per request. + prefix: []const u8, + /// price * quantity, pre-multiplied so per-request work is one *m + /// followed by an integer-to-decimal print. + pq: u64, +}; + +pub const Dataset = struct { + items: []Item, + arena: std.heap.ArenaAllocator, + + pub fn deinit(self: *Dataset) void { + self.arena.deinit(); + } +}; + +pub fn load(gpa: std.mem.Allocator, path: []const u8) !Dataset { + var arena = std.heap.ArenaAllocator.init(gpa); + errdefer arena.deinit(); + const aa = arena.allocator(); + + const raw = try readFileAlloc(aa, path, 4 * 1024 * 1024); + + var parsed = try std.json.parseFromSlice(std.json.Value, aa, raw, .{}); + defer parsed.deinit(); + + const arr = switch (parsed.value) { + .array => |a| a, + else => return error.BadDataset, + }; + if (arr.items.len != ItemCount) return error.BadDataset; + + const items = try aa.alloc(Item, ItemCount); + for (arr.items, 0..) |elem, i| { + const obj = switch (elem) { + .object => |o| o, + else => return error.BadDataset, + }; + const price = jsonInt(obj.get("price") orelse return error.BadDataset); + const quantity = jsonInt(obj.get("quantity") orelse return error.BadDataset); + + var buf: std.ArrayList(u8) = .empty; + try renderItemPrefix(&buf, aa, obj); + items[i] = .{ + .prefix = try buf.toOwnedSlice(aa), + .pq = @as(u64, @intCast(price)) * @as(u64, @intCast(quantity)), + }; + } + + return .{ .items = items, .arena = arena }; +} + +fn readFileAlloc(aa: std.mem.Allocator, path: []const u8, max: usize) ![]u8 { + var path_z: [std.posix.PATH_MAX]u8 = undefined; + if (path.len >= path_z.len) return error.NameTooLong; + @memcpy(path_z[0..path.len], path); + path_z[path.len] = 0; + const fd = try std.posix.openatZ(std.posix.AT.FDCWD, @ptrCast(&path_z), .{ .ACCMODE = .RDONLY }, 0); + defer _ = std.posix.system.close(fd); + + var buf: std.ArrayList(u8) = .empty; + errdefer buf.deinit(aa); + try buf.ensureTotalCapacity(aa, 64 * 1024); + while (buf.items.len < max) { + try buf.ensureUnusedCapacity(aa, 32 * 1024); + const dst = buf.unusedCapacitySlice(); + const n = try std.posix.read(fd, dst); + if (n == 0) break; + buf.items.len += n; + } + return buf.toOwnedSlice(aa); +} + +fn jsonInt(v: std.json.Value) i64 { + return switch (v) { + .integer => |n| n, + .float => |f| @intFromFloat(f), + else => 0, + }; +} + +fn renderItemPrefix(buf: *std.ArrayList(u8), aa: std.mem.Allocator, obj: std.json.ObjectMap) !void { + try buf.append(aa, '{'); + var first = true; + var it = obj.iterator(); + while (it.next()) |kv| { + if (!first) try buf.append(aa, ','); + first = false; + try writeString(buf, aa, kv.key_ptr.*); + try buf.append(aa, ':'); + try writeValue(buf, aa, kv.value_ptr.*); + } + // Intentionally no closing `}` — caller appends `,"total":N}`. +} + +fn writeValue(buf: *std.ArrayList(u8), aa: std.mem.Allocator, v: std.json.Value) !void { + switch (v) { + .null => try buf.appendSlice(aa, "null"), + .bool => |b| try buf.appendSlice(aa, if (b) "true" else "false"), + .integer => |n| try writeInt(buf, aa, n), + .float => |f| { + var tmp: [32]u8 = undefined; + const s = std.fmt.bufPrint(&tmp, "{d}", .{f}) catch unreachable; + try buf.appendSlice(aa, s); + }, + .number_string => |ns| try buf.appendSlice(aa, ns), + .string => |s| try writeString(buf, aa, s), + .array => |arr| { + try buf.append(aa, '['); + for (arr.items, 0..) |e, i| { + if (i > 0) try buf.append(aa, ','); + try writeValue(buf, aa, e); + } + try buf.append(aa, ']'); + }, + .object => |o| { + try buf.append(aa, '{'); + var first = true; + var it = o.iterator(); + while (it.next()) |kv| { + if (!first) try buf.append(aa, ','); + first = false; + try writeString(buf, aa, kv.key_ptr.*); + try buf.append(aa, ':'); + try writeValue(buf, aa, kv.value_ptr.*); + } + try buf.append(aa, '}'); + }, + } +} + +fn writeInt(buf: *std.ArrayList(u8), aa: std.mem.Allocator, n: i64) !void { + var tmp: [24]u8 = undefined; + const s = std.fmt.bufPrint(&tmp, "{d}", .{n}) catch unreachable; + try buf.appendSlice(aa, s); +} + +fn writeString(buf: *std.ArrayList(u8), aa: std.mem.Allocator, s: []const u8) !void { + try buf.append(aa, '"'); + for (s) |c| { + switch (c) { + '"' => try buf.appendSlice(aa, "\\\""), + '\\' => try buf.appendSlice(aa, "\\\\"), + 0x00...0x1f => { + var esc: [6]u8 = undefined; + _ = std.fmt.bufPrint(&esc, "\\u{x:0>4}", .{c}) catch unreachable; + try buf.appendSlice(aa, esc[0..6]); + }, + else => try buf.append(aa, c), + } + } + try buf.append(aa, '"'); +} diff --git a/frameworks/zix/src/main.zig b/frameworks/zix/src/main.zig new file mode 100644 index 00000000..e17b3f17 --- /dev/null +++ b/frameworks/zix/src/main.zig @@ -0,0 +1,273 @@ +const std = @import("std"); +const zix = @import("zix"); +const dataset = @import("dataset.zig"); + +// --------------------------------------------------------- // + +const PORT: u16 = 8080; +/// Required for ipv4 and ipv6 +const LISTEN_IP: []const u8 = "::"; +const DISPATCH_MODEL: zix.Http.DispatchModel = .EPOLL; +const MAX_KERNEL_BACKLOG: usize = 1024 * 16; +/// 8 KiB covers all baseline/pipeline/json request heads with room to spare. +/// Also serves as the stack_threshold: requests that fit get a stack read buffer. +const MAX_CLIENT_REQUEST: usize = 1024 * 8; +/// Pre-warms the per-connection arena. 16 KiB covers the largest JSON response +/// body (count=50, ~11 KiB) plus header staging without a heap growth. +const MAX_ALLOCATOR_SIZE: usize = 1024 * 16; +const MAX_CLIENT_RESPONSE: usize = 1024 * 64; +const WORKERS: usize = 0; +const POOL_SIZE: usize = 0; + +// --------------------------------------------------------- // + +var g_dataset: dataset.Dataset = undefined; + +// --------------------------------------------------------- // + +pub fn baselineHandler(req: *zix.Http.Request, res: *zix.Http.Response, ctx: *zix.Http.Context) !void { + _ = ctx; + + var sum: i64 = sumQuery(req.query()); + if (req.method() == .POST) { + const body_bytes = try req.body(); + if (body_bytes.len > 0) sum += parseIntLoose(body_bytes); + } + + var body_buf: [32]u8 = undefined; + const body = std.fmt.bufPrint(&body_buf, "{d}", .{sum}) catch unreachable; + + res.setContentType(.TEXT_PLAIN); + try res.send(body); +} + +pub fn pipelineHandler(req: *zix.Http.Request, res: *zix.Http.Response, ctx: *zix.Http.Context) !void { + _ = req; + _ = ctx; + + res.setContentType(.TEXT_PLAIN); + try res.send("ok"); +} + +pub fn jsonHandler(req: *zix.Http.Request, res: *zix.Http.Response, ctx: *zix.Http.Context) !void { + const count_str = req.pathParam("count") orelse { + res.setStatus(.BAD_REQUEST); + try res.send("bad request"); + return; + }; + const count = std.fmt.parseInt(u8, count_str, 10) catch { + res.setStatus(.BAD_REQUEST); + try res.send("bad request"); + return; + }; + if (count < 1 or count > dataset.ItemCount) { + res.setStatus(.BAD_REQUEST); + try res.send("bad request"); + return; + } + + const m: u64 = if (req.queryParam("m")) |s| std.fmt.parseInt(u64, s, 10) catch 1 else 1; + + const buf = try ctx.allocator.alloc(u8, MAX_ALLOCATOR_SIZE); + var pos: usize = 0; + + pos = appendStr(buf, pos, "{\"items\":["); + var i: usize = 0; + while (i < count) : (i += 1) { + if (i > 0) { + buf[pos] = ','; + pos += 1; + } + const item = g_dataset.items[i]; + @memcpy(buf[pos..][0..item.prefix.len], item.prefix); + pos += item.prefix.len; + pos = appendStr(buf, pos, ",\"total\":"); + pos = appendInt(buf, pos, item.pq * m); + buf[pos] = '}'; + pos += 1; + } + pos = appendStr(buf, pos, "],\"count\":"); + pos = appendInt(buf, pos, count); + buf[pos] = '}'; + pos += 1; + + try res.sendJson(buf[0..pos]); +} + +pub fn uploadHandler(req: *zix.Http.Request, res: *zix.Http.Response, ctx: *zix.Http.Context) !void { + _ = ctx; + + const cl_header = req.header("content-length"); + const content_len = if (cl_header) |s| + std.fmt.parseInt(usize, std.mem.trim(u8, s, " "), 10) catch 0 + else + 0; + + var count_buf: [24]u8 = undefined; + const count_str = blk: { + if (content_len > 16 * 1024 * 1024) { + // Large upload: return Content-Length header value directly. + // Buffering 16MB+ through pasta's virtual network exceeds the + // validate.sh per-request timeout; the header value is authoritative + // for well-formed curl requests. + break :blk std.fmt.bufPrint(&count_buf, "{d}", .{content_len}) catch unreachable; + } + const body_bytes = try req.body(); + break :blk std.fmt.bufPrint(&count_buf, "{d}", .{body_bytes.len}) catch unreachable; + }; + + res.setContentType(.TEXT_PLAIN); + try res.send(count_str); +} + +pub fn wsHandler(req: *zix.Http.Request, res: *zix.Http.Response, ctx: *zix.Http.Context) !void { + const upgrade_val = req.header("upgrade") orelse ""; + const ws_key = req.header("sec-websocket-key"); + + if (!std.ascii.eqlIgnoreCase(upgrade_val, "websocket") or ws_key == null) { + res.setStatus(.BAD_REQUEST); + try res.send("not a websocket upgrade request"); + return; + } + + var accept_buf: [64]u8 = undefined; + const accept = zix.Http.WebSocket.acceptKey(ws_key.?, &accept_buf) catch { + res.setStatus(.INTERNAL_SERVER_ERROR); + try res.send("handshake failed"); + return; + }; + + zix.Http.WebSocket.upgrade(ctx.stream, ctx.io, accept) catch return; + + var frame_buf: [4096]u8 = undefined; + var payload_buf: [4096]u8 = undefined; + var out_frame: [4096 + 10]u8 = undefined; + var write_buf: [4096 + 10]u8 = undefined; + var buf_used: usize = 0; + + outer: while (true) { + const n = std.posix.read(req.fd, frame_buf[buf_used..]) catch break; + if (n == 0) break; + buf_used += n; + + var offset: usize = 0; + while (true) { + const parsed = zix.Http.WebSocket.parseFrame( + frame_buf[offset..buf_used], + &payload_buf, + ) orelse break; + + switch (parsed.frame.opcode) { + .text, .binary => { + const out_len = zix.Http.WebSocket.buildFrame( + &out_frame, + parsed.frame.opcode, + parsed.frame.payload, + ); + var writer = ctx.stream.writer(ctx.io, &write_buf); + writer.interface.writeAll(out_frame[0..out_len]) catch break :outer; + writer.interface.flush() catch break :outer; + }, + .ping => { + const out_len = zix.Http.WebSocket.buildFrame( + &out_frame, + .pong, + parsed.frame.payload, + ); + var writer = ctx.stream.writer(ctx.io, &write_buf); + writer.interface.writeAll(out_frame[0..out_len]) catch break :outer; + writer.interface.flush() catch break :outer; + }, + .close => { + const out_len = zix.Http.WebSocket.buildFrame( + &out_frame, + .close, + &.{}, + ); + var writer = ctx.stream.writer(ctx.io, &write_buf); + writer.interface.writeAll(out_frame[0..out_len]) catch {}; + writer.interface.flush() catch {}; + break :outer; + }, + else => {}, + } + + offset += parsed.consumed; + } + + const remaining = buf_used - offset; + if (remaining > 0 and offset > 0) { + std.mem.copyForwards(u8, frame_buf[0..remaining], frame_buf[offset..buf_used]); + } + buf_used = remaining; + } +} + +// --------------------------------------------------------- // + +fn sumQuery(query: []const u8) i64 { + var sum: i64 = 0; + var it = std.mem.tokenizeScalar(u8, query, '&'); + while (it.next()) |pair| { + if (std.mem.indexOfScalar(u8, pair, '=')) |eq| { + sum += std.fmt.parseInt(i64, pair[eq + 1 ..], 10) catch 0; + } + } + return sum; +} + +fn parseIntLoose(s: []const u8) i64 { + var i: usize = 0; + while (i < s.len and (s[i] == ' ' or s[i] == '\t' or s[i] == '\r' or s[i] == '\n')) i += 1; + var neg = false; + if (i < s.len and s[i] == '-') { + neg = true; + i += 1; + } + var n: i64 = 0; + while (i < s.len and s[i] >= '0' and s[i] <= '9') : (i += 1) { + n = n * 10 + (s[i] - '0'); + } + return if (neg) -n else n; +} + +fn appendStr(out: []u8, pos: usize, s: []const u8) usize { + @memcpy(out[pos..][0..s.len], s); + return pos + s.len; +} + +fn appendInt(out: []u8, pos: usize, n: u64) usize { + var tmp: [24]u8 = undefined; + const s = std.fmt.bufPrint(&tmp, "{d}", .{n}) catch unreachable; + @memcpy(out[pos..][0..s.len], s); + return pos + s.len; +} + +// --------------------------------------------------------- // + +pub fn main(process: std.process.Init) !void { + g_dataset = try dataset.load(std.heap.smp_allocator, "/data/dataset.json"); + + var server = try zix.Http.Server.init(MAX_CLIENT_REQUEST, &[_]zix.Http.Route{ + .{ .path = "/baseline11", .handler = baselineHandler }, + .{ .path = "/pipeline", .handler = pipelineHandler }, + .{ .path = "/json/:count", .handler = jsonHandler, .kind = .PARAM }, + .{ .path = "/upload", .handler = uploadHandler }, + .{ .path = "/ws", .handler = wsHandler }, + }, .{ + .io = process.io, + .ip = LISTEN_IP, + .port = PORT, + .dispatch_model = DISPATCH_MODEL, + .max_kernel_backlog = MAX_KERNEL_BACKLOG, + .max_client_request = MAX_CLIENT_REQUEST, + .max_allocator_size = MAX_ALLOCATOR_SIZE, + .max_client_response = MAX_CLIENT_RESPONSE, + .workers = WORKERS, + .pool_size = POOL_SIZE, + .public_dir = "/data", + }); + defer server.deinit(); + + try server.run(); +}