-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Polling file monitor #3411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Polling file monitor #3411
Conversation
evaporei
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic seems correct, I've added a bunch of comments for further discussion 🙂
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn polling_monitor_unordered() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I've followed how this tests the "unorderedness" of the PollingMonitor since the test does indeed send/receive in a specific order, can you explain it a bit further please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sends in the order req-0, req-1, meaning req-1 is called first on the service. But the response for req-0 arrives first and is received without waiting for req-1. This tests that we're using .call_all(stream).unordered().
d4c0464 to
f99155c
Compare
|
There's no rush to merge this, so I'll leave it as PR for a while to see if tower 0.5 is released in the meantime. Note to self: Whitelist the multihashes to cryptographically safe ones before merging. |
f99155c to
1573bdc
Compare
Theodus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
|
||
| { | ||
| let queue = queue.cheap_clone(); | ||
| graph::spawn(async move { | ||
| let mut responses = service.call_all(queue_to_stream).unordered().boxed(); | ||
| while let Some(response) = responses.next().await { | ||
| match response { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For each PollingMonitor<S>, I think it would make sense to have metrics for:
- queue depth (possibly overkill depending on how we use this)
- timeout rate
- error rate
1573bdc to
cd3a7c9
Compare
cd3a7c9 to
fbeaccd
Compare
As part of #3072, this adds the components to poll IPFS for the availability of a file. This does not track unavailability, once a file is found it is not polled again.
The load management approach is to have a configurable capacity for the IPFS service, which is shared among all subgraphs, and then a per-subgraph queue of pending requests. This should effectively distribute the service capacity fairly among subgraphs.
The implementation has two pieces, the monitoring algorithm which is generic over any
tower::Serviceand an implementation ofServicefor IPFS.