New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prototype #1
Prototype #1
Conversation
@Jannis I went ahead and did the refactor such that I think it's ready for a proper review now, and feel comfortable about having more contributors like yourself because at this point I'm not planning to change the API or underlying mechanisms for a while. One thing that I'm happy about with this version is that it handles more of the boilerplate for you at the low level. So even the contents of source.subscribe(t => {
latestT = t
if (mapPromise === undefined) {
mapPromise = (async () => {
while (!equal(latestT, previousT)) {
previousT = latestT
output.push(await mapper(latestT))
}
mapPromise = undefined
})()
}
}) tokio::spawn(async move {
while let Ok(v) = source.next().await {
writer.write(f(v).await);
}
}); And that's with automatic cancellation! |
…mediately) once receivers are dropped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking excellent!
// Allow replacing the value until the time is up. This | ||
// necessarily introduces latency but de-duplicates when there | ||
// are intermittent bursts. Not sure what is better. Matching | ||
// common-ts for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems fine to me. The point of throttle is to output a value at most every N seconds to turn busts of potentially many updates into a single update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. The version that doesn't add latency is simpler to write (it looks exactly like timer but spits out next().await instead of the instant). For UIs and similar I think this pattern works better because it doesn't pick up the smallest first change. A better API that is the best of both worlds might have a min/max latency that will once a change is available spit out a value if there has been no change for min latency while never waiting longer than max latency. Not urgent in any case.
// but the part that is not clear here is what to do when the UI goes out of | ||
// scope. Should pipe provide an explicit handle that cancels on drop? | ||
// | ||
// TODO: Probably do not add filter or reduce, they don't make as much sense for eventuals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reduce
, I'd like to at least have the equivalent documented on map
so we know how to write an eventual "reducer", because we have a few of those in the indexer software and gateways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only use of reduce
that I could find in the Gateway was a poor implementation of log_errors
.
For example this one: https://github.com/edgeandnode/gateway/blob/680d77b2f602addd243085b996a242b26364fb70/packages/gateway/src/agent-syncing/syncing-client.ts#L55-L96
The basic pattern here was to (ab)use reduce to get access to the previous value so it could be yielded again when an error was logged. As far as I could tell, reduce was used for no other purpose.
A better version of log_errors
isn't bespoke for every case but is generalized like so:
fn log_errors(logger: Logger, source: Eventual<Result<T, Err>>) -> Eventual<T> {
Eventual::spawn(move |writer| async move {
loop {
match source.next().await {
Ok(Ok(v)) => out.write(v),
Ok(Err(e)) => error!(logger, e),
Err(_) => break;
}
}
})
}
Then you have: timer(interval).map(poll_for_changes).log_errors()
and it's done. The north star of this library is to get all the complexity out of the consumer code.
Stepping back from the use of reduce in The Gatway and addressing whether reduce has any use at all - I think it's tempting to think of Eventuals like Stream and apply all the commonly known operations to them (reduce included). This is a mistake in kind. An Eventual is an eventually consistent value which updates itself. Put another way, it is a subset of snapshots in time of a value. A Stream is a view of a progressively available ordered multi-set, where the values represent distinct items. There's a lot of libraries that deal with streams, and none other that I know of that deal with Eventuals. Reduce can be used to aggregate distinct values. But what does it mean to aggregate a subset of snapshots over time? I can't think of any compelling use-case for that.
If you do need access to previous values for aggregation, the above loop used in log_errors
does get you that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does remind me that we want an eventual-aware retry
though. The difference between a "regular" retry is that the eventual one would prefer to use a new value if available over retrying an old value. I've added a comment to the repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to get log_errors
work like this:
pub fn handle_errors<E, F, T, Error>(source: E, f: F) -> Eventual<T>
where
E: IntoReader<Output = Result<T, Error>>,
F: 'static + Send + Fn(Error),
T: Value + std::fmt::Debug,
Error: Value + std::fmt::Debug,
{
let mut reader = source.into_reader();
Eventual::spawn(move |mut writer| async move {
loop {
match dbg!(reader.next().await) {
Ok(Ok(v)) => writer.write(v),
Ok(Err(e)) => f(e),
Err(_) => break,
}
}
})
}
with the following test code:
use eventuals::*;
use tokio::test;
use std::future;
use std::sync::{Arc, Mutex};
#[test]
async fn basic() {
let (mut writer, numbers) = Eventual::<u32>::new();
let errors = Arc::new(Mutex::new(vec![]));
let errors_writer = errors.clone();
let validated: Eventual<Result<u32, u32>> = numbers.map(|n| match n % 2 {
0 => future::ready(Ok(n)),
_ => future::ready(Err(n)),
});
let mut even_numbers = handle_errors(validated, move |err: u32| {
println!("Err: {}", err);
let mut errors = errors_writer.lock().unwrap();
errors.push(err.clone());
}).subscribe();
for n in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] {
writer.write(n);
assert_eq!(even_numbers.next().await.ok().unwrap(), n);
}
assert_eq!(*errors.lock().unwrap(), vec![1, 3, 5, 7, 9]);
}
That of course freezes after the first odd number is written, because there is no next value yet.
This exercise made me realize two things:
-
Implementing
log_errors
orhandle_errors
, or whatever we want to call it, is difficult enough that we should probably provide something. The above code doesn't account for the fact that most error types are not'static + Send + Clone + Eq
. So it wouldn't even work with most fallible functions. -
We don't have an equivalent of
await eventual.value()
that returns the latest value or blocks, do we? Becauseeventual.next().await
blocks if there has been no new value since the last call tonext()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handle_errors
thing you wrote seems nice to me. (Submit a PR?) I like the idea here of providing some handler. The only reason I hadn't supplied log_errors
was to avoid taking a dependency on Logger
which your idea elegantly avoids.
The reason for the "deadlock" is that you would normally need to spawn the writer. It's not particularly a fault of handle_errors
but rather of the circular dependency that exists in the test code (write depending on read of same eventual). This code is wrong anyway, because it would have panicked. With the circular dependency you can still get this to work by adding if n % 2 == 0 assert_eq!(even_numbers.next()..., n)
I think that most code will naturally avoid circular dependencies, but it's something to be aware of in test code in particular.
You can make almost any Error type impl Value
by wrapping it in ByAddress(Arc)
as long as they already impl Send
. Most complex types (eg: Vec<Indexer>
) will want to be wrapped in ByAddress(Arc)
which will make clone and comparisons effectively free.
We don't have that specifically. I think you can do eventual.subscribe().next()
which works the same way, but may make the borrow checker unhappy because of the temporary subscribe
. To get it to compile you may need the unfortunate...
let next = {
let subscribe = eventual.subscribe();
subscribe.await
};
This is sad enough that I think I'll go ahead and add eventual.next
because that was very useful in the Gateway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, subscribe().next()
already works as a replacement for value
(no complaint about the temporary from the borrow checker). So you could change your test to...
assert_eq!(even_numbers.subscribe().next().await.ok().unwrap(), (n / 2) * 2);
If you also remove the earlier subscribe()
. I still think there is a case for .value()
.
// TODO: Add pipe? The "GC" semantics make this unclear. The idea | ||
// behind pipe is to produce some side effect, which is a desirable | ||
// end goal for eventuals (eg: pipe this value into a UI, or log the latest) | ||
// but the part that is not clear here is what to do when the UI goes out of | ||
// scope. Should pipe provide an explicit handle that cancels on drop? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipe
is essentially just map
with a different name to make the side-effect intent clear. We could have a cancel handle like you describe, but I'm not sure it'd see much use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, pipe
is great. The only issue is that the way things work right now in regards to cancellation mimics Rust's cancel on drop idea with futures. This is generally great, but for side effects something needs to keep the pipeline alive. I think I'm going to just return something which is opaque that cancels on drop (like the reader would) but we can expose a .leak() or something for the Gateway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
self.eventual | ||
.change | ||
.swap_or_wake(&mut swap, &self.eventual.prev, cx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand how swap_or_wake
works right now. Could you describe this in words?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's a little confusing. Get the latest value if unique and if a value is not retrieved then set awake the task later. It may be possible to simplify. Part of the reason it's a bit weird is that it only makes sense within the method it's being called, but it's separate to make all the other privacy and separation of concerns work out.
where | ||
T: Value, | ||
{ | ||
state: Weak<SharedState<T>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that if this is a Weak
reference, something else owns the SharedState<T>
? Or is Weak
used so that if all subscribers/readers are dropped, they drop their SharedState<T>
references and after this, the weak reference will contain None
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's Weak
so that the SharedState
drops when all the readers drop. Once the readers are dropped the writer can be notified. If the Writer held onto the SharedState strongly then it would never be notified. In this way the automatic cancellation is bi-directional. (The writer stops if there is no possibility that values will be read, and a reader stops when there is no possibility of further writes)
Quick and dirty implementation. Next thing to do is to go lock-free, cleanup, add more tests, and add map.
The current idea for map is something like...
Need to make a couple adjustments but that is where this would be headed. I feel it's at a good state for a Zoom call at least.
Something cool from the above is that cancellation is automatic. When the writer for the source is dropped, this task will observe
Err(Closed)
instead ofOk(v)
which will leave thewhile
loop, dropping thisEventual
, and cancelling anything downstream as well. Magic.