Skip to content

Conversation

@glyh
Copy link
Member

@glyh glyh commented May 27, 2025

Add Choosable_synchronous_pipe and Swappable_strict_pipe modules

Overview

This PR introduces two new pipe modules to handle specific concurrency patterns in a more controlled and efficient manner.

Choosable synchronous pipe

A synchronous pipe implementation designed to work with Deferred.choose that provides:

  • Idempotent read operations (multiple reads return the same value)
  • Synchronous write operations that only proceed when a read is initiated
  • Thread-safe iteration and reading
  • Support for graceful termination
  • Ability to use it in Deferred.choose with guarantee that write happens only if the choice is selected

Key features:

  • Write operations only proceed when a corresponding read is initiated
  • Each operation returns an "updated pipe" that must be used for subsequent operations
  • Thread-safe for parallel reads and iterations
  • Graceful handling of pipe closure

Swappable strict pipe

A higher-level pipe that builds on Choosable_synchronous_pipe and Strict_pipe to provide:

  • The ability to swap readers while maintaining data consistency
  • No data loss during reader swaps
  • Support for multiple concurrent iterators over the same data stream
  • Graceful termination handling

Key features:

  • swap_reader to safely switch to a new short-lived iterator on a long-lived strict pipe
  • Guarantees no data loss or duplication during reader swaps
  • Thread-safe iteration
  • Clean shutdown support via kill

Motivation

These modules address specific concurrency patterns needed for:

  1. Coordinating between multiple producers and consumer with backpressure
    • Specifically we want to use it in bootstrap controller and transition frontier controller
  2. Implementing hot-swappable data processing pipelines
  3. Building robust, deadlock-free concurrent systems
  4. Handling graceful shutdown scenarios

Testing

Comprehensive test coverage includes:

  • Basic read/write operations
  • Pipe closure behavior
  • Multiple writes on the same pipe
  • Idempotent reads
  • Sequential read/write patterns
  • Iterator behavior
  • Reader swapping
  • Termination scenarios

Implementation Notes

  • Built on top of Async_kernel for compatibility with existing code
  • Careful attention to thread safety and resource cleanup
  • Detailed documentation of behavior and edge cases
  • No external dependencies beyond Core_kernel and Async_kernel

@glyh glyh requested a review from a team as a code owner May 27, 2025 03:05
@glyh glyh force-pushed the corvo/swappable-pipe branch from a7239dd to 3878082 Compare May 27, 2025 03:07
@glyh
Copy link
Member Author

glyh commented May 27, 2025

!ci-build-me

georgeee added 4 commits May 27, 2025 05:41
Enhanced the test to write multiple values and verify:
- Values are read in the correct sequence
- A reader that skips reading doesn't interfere with subsequent readers

The test currently fails and will be addressed in the next commit.
('data_in_pipe, Strict_pipe.synchronous, unit Deferred.t) Strict_pipe.Writer.t

type ('data_in_pipe, 'write_return) t =
| Swappable :
Copy link
Member Author

@glyh glyh May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE for myself: This is a GADT because 'pipe_kind is dangling.

returns an updated state (fields [short_lived_sink] and [data_unconsumed]
are updated).
*)
type ('data_in_pipe, 'write_return) state_t =
Copy link
Member Author

@glyh glyh May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE for myself: This DS now owns <= 3 writers:

  • long_lived_writer one live as long as the whole DS;
  • short_lived_sink swapped as new reader requests come;
  • exposed.next_short_lived_sink will swap short_lived_sink, and will die yet another swap request arrives.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I still don't understand the reason of splitting t and state_t

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, state_t is immutable structure which is updated on every iteration in a purely functional way, only the step function has access to it.

Structure t is a structure which has a mutable field, which may is updated by the step function and read by any user thread. Crucially, things called from outside the step function don't need access to state, and types provide clear separation for it.

@georgeee georgeee force-pushed the corvo/swappable-pipe branch from a0ca99e to 3e99874 Compare May 28, 2025 12:22
Copy link
Member Author

@glyh glyh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed, left a bunch of requests/questions/bugspots.

*)
let short_lived_write state sink data =
choose
[ terminate_choice state
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this branch necessary if we have a catch-all termination branch in step? Do we want to terminate eagerly?
Answering myself: we don't want any operation to be blocking and want termination to be possible to arrive any time.

*)
type 'data_in_pipe short_lived_pipe_t =
| Short_lived_pipe of
[ `Eof | `Ok of 'data_in_pipe * 'data_in_pipe short_lived_pipe_t ] Ivar.t
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my... Ivar inside of Ivar...... I get that this is a single shot request/response structure, we could probably make some kind of abstraction for it? It's also used below next_short_lived_pipe it seems.
Could be in follow-up PRs.

Copy link
Member Author

@glyh glyh May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This is a Pulling Cons List/Pipe. Downstream would iterate and pull data from upstream.

In Strict Pipe, in contrast, source is proactively writing and sink is proactively reading. Hence this Pulling Cons List is simpler than Strict Pipe. We may want to factor this out as another sync primitive?

And we could use similar pattern to design a Pushing Cons List/Pipe.

Ensures that any reader on it will get [`Eof].
*)
let terminate_short_lived_pipe (Short_lived_pipe outer_ivar) =
Ivar.fill_if_empty outer_ivar (Ivar.create_full `Eof) ;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for myself: We're prefilling the response for next invocation, so next invocation of Iterator.read_one, would return Eof directly. This handles the case when no requests arrived.

let test_buffered_overflow () =
let open Pipe_lib.Strict_pipe.Swappable in
let pipe = create_buffered_swappable 4 in
(* Write 7 elements to a pipe with capacity 5, first 2 should be dropped due to overflow *)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we redesign the UI so the capacity and the number we're passing in are the same number. So to hide underlying implementation and avoid confusion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to strengthen strict pipe's tests and fix off-by-one. This is definitely outside scope of this PR.

(* First iteration *)
let%bind () = read_all_values_or_timeout ~pipe ~expected:elements reader in
(* Second iteration - should see the same elements again *)
read_all_values_or_timeout ~expected:elements reader
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually when I'm facing something called an Iterator, I'm thinking C++ iterators that are volatile and used once throw away. Maybe a better name for this is an View

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I called it iterator because it has a single method iter, and I had no intention to provide any other method :D

(* Try to write to the killed pipe *)
let%map () = Async_kernel_scheduler.yield () in
let threw = ref false in
(try write pipe 1 with _ -> threw := true) ;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use Alcotest.check_raises

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried with:

  Alcotest.check_raises "write to killed pipe raises"
    (Failure "write to closed pipe")
    (fun () -> write pipe 1)

And it didn't work because the exception's message is less clear:

expecting (Failure \\\"write to closed pipe\\\"), got (\\\"write to closed pipe\\\"\\n  (pipe\\n    ((id 8) (buffer ()) (size_budget 0) (pushback (Full ()))\\n      (num_values_read 0) (blocked_flushes ()) (blocked_reads ())\\n      (closed (Full ())) (read_closed Empty) (consumers ())\\n      (upstream_flusheds ())))).\")")

So seems like it's not worth it in this case

if !first_iter_count = 2 then
(* After reading two elements, create second iterator and write more elements *)
let%map next_reader = swap_reader pipe in
next_reader_ref := Some next_reader
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A clever use, but I would avoid that in general. Is there any usecase you have on mind for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I tried to simulate two threads executing in parallel. This is test code, so I think it's fine. This isn't a recommended way to use the structure. If we happen with similar code outside of test context, we'll need some sort of fold_until or find. But looks like overkill for just test implementation.

georgeee added 6 commits May 30, 2025 09:14
Problem: when more than one pipe is created while no write happens to a
long-lived pipe, this will lead to the older pipe receiving reads, which
is against the "contract".

Solution: add a choice to choose operation that attempts a read from
long-lived pipe.
georgeee added 3 commits May 30, 2025 16:27
Cover all important properties with tests.
Represent choosable synchronous pipe as a pair of reader and writer.

Internally implementation still uses the same type `'a t` to represent
both reader and writer, but on the interface this detail is hidden.

With clear separation of reader and writer, it's made clear on type
level that closure should be attempted on writer, while read/iteration
on reader.

Implementation still initializes one ivar and returns it both as writer
and reader (and over the process of read/write these two will morph into
different values).

As a side-change commit also updates some of the tests in a minor way.
@georgeee
Copy link
Member

georgeee commented May 30, 2025

Regarding Yihang's question about termination:

What if the response(Inner Ivar) has value unconsumed, yet? This would cause the inner invocation fill_if_empty to do nothing, and the downstream consumer hanging, no?

I added some doc-comments. The short explanation is that the interface of choosable pipe allows you to attempt closure on a pipe with unconsumed data. And the result will be "nothing", no close. So I mentioned that in documentation of that module, as "expected" though unlikely desirable behavior. I.e. if user of that primitive doesn't order write-close operations well, they could get some unexpected outcomes, but the module's docs explain what these outcome would look like with a great precision.

As for usage in swappable pipe, in the state we always hold a short-lived pipe to which no write was attempted before (and when write happens, we replace it in the state with an updated pipe, always).

So close will always happen on top of short-lived pipe which didn't have any previous writes, hence no value unconsumed is possible.

@georgeee
Copy link
Member

!ci-build-me

@georgeee
Copy link
Member

!ci-bypass-changelog

returns an updated state (fields [short_lived_sink] and [data_unconsumed]
are updated).
*)
type ('data_in_pipe, 'write_return) state_t =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I still don't understand the reason of splitting t and state_t

Local variable renamed
Comment on lines 28 to 31
It is not recommended to use the same pipe for writing from parallel threads.
While, behavior is well specified, it's unlikely to be a desired one. Same
applies to parallelism of [close] operation or some combination of [close]
and [write_choice]. *)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behaviour is that one writer thread will win and write (data, new_sink) to the pipe, and all reader threads will start following new_sink, right? The other writer threads will still create a new_sink, but they'll fail to write to the pipe, so no reader thread will ever see new_sink. The other writers will end up locking up the next time they try to write to their pipes, because they'll be waiting for a reader to fill the outer Ivar.t.

If that's right, it might be better to make it an error to write to a stale pipe, instead of dropping the write attempt.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, could writers follow the chain of filled Ivars down to the end when they write?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@georgeee said this won't happen when swappable pipe use it, and it's downstream user's responsibility to ensure such scenario doesn't happen. That said, I'm also not a fan of this silent fail API.

Copy link
Member

@georgeee georgeee Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, here is the implementation:

let write_choice ~on_chosen sink data =
  choice (Ivar.read sink) (fun ivar ->
      let new_sink = Ivar.create () in
      (* We use [Ivar.fill_if_empty] because [close] operation
         may have been invoked and [`Eof] might already be written
         to the ivar. *)
      Ivar.fill_if_empty ivar (`Ok (data, new_sink)) ;
      on_chosen new_sink )

The other writer threads will still create a new_sink. The other writers will end up locking up the next time they try to write to their pipes, because they'll be waiting for a reader to fill the outer Ivar.t.

Yeah, now I understand I haven't being evaluating the code in parallel writer case one of the writers will hang, and this is simply useless

Copy link
Member

@georgeee georgeee Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choosable_synchronous_pipe was originally designed as a low-level primitive. I've tried to keep its implementation minimal and hard to misuse, and I'm hesitant to introduce extra conditions or complexity into the API unless strictly necessary. However I agree that leaving implementation as is is problematic, exception is always better than silently hanging on a write.

There are a couple of options we could consider:

  • Replace fill_if_empty with fill
    This would raise an exception if the Ivar is already filled. I originally used fill_if_empty to avoid exceptions in a potential race between close and write_choice. In the current implementation, that race neither throws nor fills the pipe, which avoids crashing but introduces the risk of hanging.

  • Ensure writers always follow the chain of filled Ivars before writing
    Instead of using on_chosen new_sink, we could do on_chosen (Ivar.value_exn ivar), which guarantees that the writer reaches a final sink, although the value currently written is lost. This avoids both exceptions and hanging.

    • Downside: If two writers operate in parallel, only one write will succeed, and the others will be silently ignored.

To simplify the contract I'll choose the first option. I doubt the second option would ever be useful. And in context of single-threaded linearized usage both models are equivalent.

Throw exception on repetitive attempts to invoke close/write operations
using the same pipe handle.
@georgeee
Copy link
Member

georgeee commented Jun 2, 2025

!ci-build-me

@cjjdespres
Copy link
Member

Could you go into a bit more detail about how the swappable pipe is going to be used? I'm wondering why we need the ability to swap out opaque readers on a pipe.

@glyh
Copy link
Member Author

glyh commented Jun 3, 2025

Could you go into a bit more detail about how the swappable pipe is going to be used? I'm wondering why we need the ability to swap out opaque readers on a pipe.

Here's the commit @georgeee applied this DS. b82f981 (#17314)

Here's quote from @georgeee

Just wanted to clarify the reasoning behind the mechanism where we kill the old pipe and create a new one.
The core idea is that we start an infinite processing loop tied to a pipe. To stop that loop from continuing when we no longer need it, we kill the pipe—which effectively terminates the loop. It’s not necessarily a great architectural choice, but that’s how things currently work.
The application has two distinct states: bootstrap and catch-up. Each has its own dedicated infinite loop, and we want to keep these loops fully isolated. So, when the state changes from one to the other, we completely terminate the current loop (by killing the pipe) and start a fresh one.
We manage this by using a reference that holds the current pipe. When the state changes, we kill the old pipe, and when the app returns to that state, it writes a new pipe into the reference.
Let me know if anything’s unclear!

On top of that, pipe-swapping is implemented in transition router but with some nasty synchronization bugs unfixable in place.

Copy link
Member

@cjjdespres cjjdespres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the choosable and swappable pipes seem to do what they say they do. I left a few comments for myself above, as I was working through how it works.

One more question - do you end up using the property that you can iter over a choosable pipe twice and get the same result? That's probably in b82f981 somewhere, but I'm not that familiar with the logic. Is it more that you want to ensure that if a reader is already consuming some data from the pipe and a swap_reader occurs, then that first reader should be able to finish processing all the data that was written up to the point the swap occurred? I think that's equivalent to the double-iter property.

@@ -0,0 +1,35 @@
(** swappable strict pipe *)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point this should probably be documented in as much detail as Choosable_synchronous_pipe.t is.

and the first-that-came wins, while for later calls an immediately closed
reader is returned.
*)
if Ivar.is_full t.termination_signal || Ivar.is_full t.next_short_lived_pipe
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the t.next_short_lived_pipe needs to be an empty Ivar.t except for the duration of a reader swap for this to work. (Looks like it is.)

Comment on lines +210 to +211
| _, None, _ ->
read_short_lived_pipe state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, the purpose of this clause is to wait until we have something to do. We might not get to the termination signal in the very next step, but the explicit handling of the termination signal above means that we will get to it the step after that, at worst.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually this functions calls a choose [ terminate_choice state; read_short_lived_pipe_choice state ] operation, so termination signal will arrive immediately if it happens before the short lived pipe is received.

Copy link
Member

@cjjdespres cjjdespres Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of what might happen if a swap and a terminate somehow happened within the same cycle. I was thinking of this:

  1. swap reader checks if termination signal is full, and it isn't, so it continues
  2. terminate is called on the pipe, which fills the signal
  3. swap reader finishes requesting a new reader pipe

I could see the Choice for either the terminate or the swap being chosen.

This might not be possible with how async works, or because we're still on ocaml 4.

Comment on lines +212 to +215
| None, Some _, _ ->
read_long_lived state
| Some data, Some short_lived_sink, _ ->
short_lived_write state short_lived_sink data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this is where the main work of the swappable pipe occurs - it flips back and forth between buffering a single piece of data from whatever is writing to the pipe and sending that data to whatever is attached to the pipe.

@glyh
Copy link
Member Author

glyh commented Jun 4, 2025

do you end up using the property that you can iter over a choosable pipe twice and get the same result?

I'm inclining to no, it seems this is a bit over-engineered.

But you should rely on @georgeee for response because this is mostly designed by him.

@cjjdespres
Copy link
Member

cjjdespres commented Jun 4, 2025

I'm inclining to no, it seems this is a bit over-engineered.

A bit, yeah. I'd say it's fine right now, to the extent that it's trying to reproduce the existing concurrency structure, except isolated and not buggy. Or if the double-iter thing happens to be equivalent to something we really want to be true, or something like that.

Not to dwell on it, but it's not an obviously desirable property if the pipe is to be used for data processing - I think it's usually expected that consumers of pipes/streams will want to go over the data yielded from the pipe at most once. Iterating twice could be a sign that something has gone wrong.

@glyh
Copy link
Member Author

glyh commented Jun 4, 2025

I think it's usually expected that consumers of pipes/streams will want to go over the data yielded from the pipe at most once.

I suggested to change the name from an Iterator to an View, @georgeee rejected because he thinks we can call iter on this thing so it should be an Iterator.

@georgeee
Copy link
Member

georgeee commented Jun 5, 2025

I suggested to change the name from an Iterator to an View, @georgeee rejected because he thinks we can call iter on this thing so it should be an Iterator.

Yeah, so I just removed the Iterator to avoid any confusion :D

@georgeee
Copy link
Member

georgeee commented Jun 5, 2025

do you end up using the property that you can iter over a choosable pipe twice and get the same result?

I didn't, and I didn't want this property in the first place.

I only wanted the correct write_choice implementation. And the structure I came up with _ Ivar.t Ivar.t seemed the easiest possible in the universe of concurrency primitives defined by async library. But then I realized that choosable synchronous pipe is not a pipe in common sense, because of double-reading property.

I could fix that by putting the _ reader_t into a ref, and then simulate the regular pipe behavior. But this was totally unnecessary for the usage of this structure in transition router.

Hence I took a different path. Yes, we have a weird kind of pipe. But it fits alright to the use case it was written for, without any additional refs, runtime checks and exceptions. So I simply decided to embrace the weird structure provided by the choosable synchronous pipe and document it heavily.

The way we use the swappable pipe and choosable synchronous pipe introduces no issue. If we ever want to use it in some other way, well, read the docs before you use it, and the type signatures are weird enough for people to consider reading the docs :D.

Last but not least, I think we could use this structure to implement broadcastable pipe. We use this primitive in a number of places, and now it's implemented via strict pipe. So, like via a lot of non-trivial code with buffer-backed pipe at the bottom. At the same time _ Ivar.t Ivar.t implementation is elegant and simple. I don't think it's a priority to try the replacement right away, but in future we can consider that, especially when we'll be upgrading our homegrown concurrency libraries to be compatible with Ocaml 5.x.

Comment on lines +210 to +211
| _, None, _ ->
read_short_lived_pipe state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually this functions calls a choose [ terminate_choice state; read_short_lived_pipe_choice state ] operation, so termination signal will arrive immediately if it happens before the short lived pipe is received.

@georgeee
Copy link
Member

georgeee commented Jun 5, 2025

@cjjdespres @glyh merging this PR, but if the conversation above happens to converge on some new understanding, I'll create a follow-up PR

@georgeee georgeee merged commit 3232ca2 into compatible Jun 5, 2025
58 checks passed
@georgeee georgeee deleted the corvo/swappable-pipe branch June 5, 2025 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants