-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sync): high performance channel implementation + other sync utilities #127
Conversation
0d0265f
to
29e9dd2
Compare
src/sync/parker.zig
Outdated
parker.unpark(); | ||
} | ||
|
||
test "parker untimed" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we fix the tests to follow the sync.parker:
format
const State = enum { | ||
empty, | ||
parked, | ||
notified, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a better name for this? - maybe unpark_requested
or keep notified but add a comment above stating this is when a request to unpark is sent (similar to how you have comments on each state of thread_context state)
src/sync/parker.zig
Outdated
return &thread_local_parker; | ||
} | ||
|
||
pub fn assertEq(left: anytype, right: @TypeOf(left)) void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline?
src/sync/parker.zig
Outdated
} | ||
|
||
pub fn park(self: *Self) void { | ||
// if we were previously notified, we return early |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to add a comment that we do this check in case we can return without locking
// read from the write it made to `state`. | ||
var old: State = @enumFromInt(self.state.swap(@intFromEnum(State.empty), .SeqCst)); | ||
assertEq(c.pthread_mutex_unlock(&self.lock), SUCCESS); | ||
assertEq(old, State.notified); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldnt this be false
if park() is called multiple times because the first compareAndSwap on line 58 doesnt use the mutex?
ie, state = notified at the start of this block (switch(other_state)
) but before the self.state.swap()
is called, park() is called again, which changes it to empty
on line 58, which leads to old = empty
and the assert being false?
src/sync/chanx.zig
Outdated
} | ||
} | ||
|
||
test "sync.chanx.bounded works" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep consistent with the other test formats => ie, sync.chanx: bounded works
} | ||
}; | ||
|
||
test "thread state conversion to/from usize" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix test format - sync.thread_context:
src/sync/waker.zig
Outdated
} | ||
|
||
pub inline fn notify(self: *Self) void { | ||
if (!self.is_empty.load(.SeqCst)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we do this check, then lock, then check again?
why not lock, then check?
src/sync/bounded.zig
Outdated
} | ||
} | ||
|
||
if (timeout != null and (std.time.Instant.now() catch unreachable).order(timeout.?) == .gt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would probs be more clear with a if (timeout) |time_o| { ... }
block
src/sync/bounded.zig
Outdated
if (self.n_receivers.load(.seq_cst) == 0) | ||
return false; | ||
if (self.n_receivers.fetchSub(1, .seq_cst) == 1) { | ||
self.disconnect(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we be consistent with if statement brackets here
ie, theres no brackets here, but two lines down theres brackets
on a more general note: idk if its worth a discussion but i would vote we always use brackets in if statements across the entire codebase - imo the option to bracket or not bracket single line blocks goes against the simplicity/'one way to do it' of zig which i do not like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this specific code example, I agree that it's best to use braces for both. Typically, I prefer to use braces in if
statements.
In if
expressions (when an if-else
evaluates to a non-void value), I would argue to only use braces if it is actually required. For example:
const max = if (a > b) a else b;
Using braces, it would need to be be written like this:
const max = if (a > b) blk: {
break :blk a;
} else blk: {
break :blk b;
};
Braces would only be required if you need to execute multiple statements for each condition, since multiple statements would need to be executed in a block.
this pr has been stale for a while now (merge conflicts will likely be expensive to solve) - gonna close it and move it to this ticket which contains more context and a ref to this pr: https://github.com/orgs/Syndica/projects/2/views/1?pane=issue&itemId=71252009 |
This PR includes new synchronization utilities including a new channel implementation based on Rust's crossbeam but simplified significantly. Changes include:
sync/chanx.zig
: a newChannel(T)
enum implementation allowing for ease of switching underlying channel implementation if needed for fine-tuning, etc.sync/backoff.zig
: a simple backoff utility allowing for exponential backoff when busy-wait spin-looping allowing for spin-loop hinting to the processor and yielding when certain thresholds are reached.sync/bounded.zig
: a bounded channel implementation based on Rust's crossbeam crate but simplified and tuned for better performance (~15-20% better than crossbeam)sync/waker.zig
: a thread-safe sleeper waker for waking up sleeping threads :)sync/parker.zig
: a thread parking utility usinglibc.pthreads
mutexes and conditionssync/thread_context.zig
: a thread-local context object which allows for thread state to be shared with waker and holds theParker
Benchmarks:
Below is a benchmark run comparing the two channel implementations, Simple (old/original) and Bounded (new) and the results are very positive:
The one bechmark in specific to observe is the
benchmarkBoundedUsizeChannel( 5m_items, 4_senders, 4_receivers )
runs at an impressive288690 us
or0.288 s
on average when comparing to the analogous Rust crossbeam benchmark results below (+22%
speedup):The most meaningful speedups occur in the larger item channel benchmarks such as
PacketChannel
(~1.2Kb
per item). Some stats below:It's on average 450%+ speedup across the different variations of items, senders and receivers.
Another thing to note, the
SimpleChannel
(or old/original) is pre-allocating half the items in it's benchmark where as theBoundedChannel
(or new) is only allocating a total of4096
items across all the tests, significantly reducing the memory footprint of the channel. See memory footprint below:In addition to the above, the
SimpleChannel
's underlying receive mechanism ispop()
on thestd.ArrayList(T)
in order to allow for anO(1)
operation as opposed toO(N)
. This means that order is not preserved whereas theBoundedChannel
is ordered and alwaysO(1)
when receiving being that it utilizes a ring-buffer.Remaining todos:
Channel(T)
tests (with backingBounded(T)
implementation)sendTimeout()
andreceiveTimeout()
, etc.Waker
andParker
more thoroughly