Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph: Add endpoint metrics #4430

Merged
merged 7 commits into from
Mar 11, 2023
Merged

graph: Add endpoint metrics #4430

merged 7 commits into from
Mar 11, 2023

Conversation

mangas
Copy link
Contributor

@mangas mangas commented Mar 7, 2023

Add a generic mechanism to count errors based on host url

Add a generic mechanism to count errors based on host url
@@ -17,7 +17,7 @@ chrono = "0.4.23"
envconfig = "0.10.0"
Inflector = "0.11.3"
isatty = "0.1.9"
reqwest = { version = "0.11.2", features = ["json", "stream", "multipart"] }
reqwest = { version = "0.11.14", features = ["json", "stream", "multipart"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the rest of the change, was just looking into middleware options and noticed this was "outdated"

@mangas mangas marked this pull request as ready for review March 8, 2023 19:22
@mangas mangas requested review from lutter and neysofu March 8, 2023 19:22
// we need to -1 because there will always be a reference
// inside FirehoseEndpoints that is not used (is always cloned).
pub fn has_subgraph_capacity(self: &Arc<Self>) -> bool {
self.subgraph_limit
.has_capacity(Arc::strong_count(self).saturating_sub(1))
}

pub async fn get_block<M>(
fn new_client(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it's useful to have a macro here or if it would hurt readability

@mangas mangas force-pushed the filipe/endpoint-metrics branch 3 times, most recently from 81880ec to 65fa2f9 Compare March 8, 2023 20:26
Copy link
Collaborator

@lutter lutter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good. One thing I didn't understand is why this needs a separate thread - couldn't handling the per-host counters also be done inline from EndpointMetrics.success etc. ? Ultimately, it's just changing an atomic counter - I would assume that the implementation of the channel involves more costly locks, like a Mutex

chain/ethereum/examples/firehose.rs Show resolved Hide resolved
/// HostCount is the underlying structure to keep the count,
/// we require that all the hosts are known ahead of time, this way we can
/// avoid locking since we don't need to modify the entire struture.
type HostCount = Arc<HashMap<Host, AtomicU64>>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the comment, it would be good to say what constitutes a Host here. On first reading ,that's not clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the comment to Host instead

type HostCount = Arc<HashMap<Host, AtomicU64>>;

#[derive(Debug, Eq, PartialEq, Hash, Clone)]
pub struct Host(Box<str>);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use Word here which is also a Box<str>, but that's minor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Word is a terrible name, I copied a lot of the implementation but I wouldn't use it anywhere else by choice. There is just no scenario where someone reads "Word" and understand what that means in the context of the graph-node

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am totally open to renaming that - that shouldn't be a reason to duplicate code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheapString? BoxStr? BoxString? It's a small amount and I wasn't really sure why it was named this way so I didn't want to change the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left the rename out and made an alias, let's discuss options for naming and do that rename


impl EndpointMetrics {
/// This should only be used for testing.
pub fn noop() -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure that it is only used from tests, you can add #[cfg(debug_assertions)] and it will be ignored in release builds.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bikeshedding, but dummy or test_metrics would be a better name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it and had to remove it because we need to use it from the test crate, in that case debug_assertions doesn't work so I had to remove it .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced with mock to be consistent with other parts of the code

graph/src/firehose/endpoints.rs Show resolved Hide resolved
}
.filter(|x| x.has_subgraph_capacity())
.sorted_by_key(|x| x.current_error_count())
.find(|x| x.has_subgraph_capacity())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the point where find gets invoked, isn't x.has_subgraph_capacity() always true? I.e., this boils down to next()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-error behavior we'll get from this, I think, is that we use up the first endpoint to capacity, then the next etc. assuming that sorted_by_key is a stable sort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are two issue here:

  1. has_subgraph_capacity now goes up and down over time because we just check if we have capacity and now how much capacity, so if an adapter is close to, say, 100 then we could have capacity at 99 but once we take the adapter it will fail due to capacity. I think I swap has_capacity to remaining_capacity() -> u64 since this would allows to sort instead of filtering.
  2. Due to 1. we would need to have this particular has_subgraph_capacity return false so we can re-test the adapter that has errors.

I will try and address both of these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have address all the concerns, let me know what you think

graph/src/firehose/interceptors.rs Outdated Show resolved Hide resolved
@mangas
Copy link
Contributor Author

mangas commented Mar 9, 2023

Generally looks good. One thing I didn't understand is why this needs a separate thread - couldn't handling the per-host counters also be done inline from EndpointMetrics.success etc. ? Ultimately, it's just changing an atomic counter - I would assume that the implementation of the channel involves more costly locks, like a Mutex

In the initial iteration the synchronisation was a bit different, with this design I think we could remove the background processor. I was wondering if this is worth doing because we may want to do a few more things in here other than just counting the errors, like producing metrics and reporting tracing etc so I thought I'd leave it like this since it is prolly easier to add these other operations without paying an additional cost on the hot path. If you think it's not worth the extra complexity I'm happy to remove the asynchronous processing.

@lutter
Copy link
Collaborator

lutter commented Mar 10, 2023

Generally looks good. One thing I didn't understand is why this needs a separate thread - couldn't handling the per-host counters also be done inline from EndpointMetrics.success etc. ? Ultimately, it's just changing an atomic counter - I would assume that the implementation of the channel involves more costly locks, like a Mutex

In the initial iteration the synchronisation was a bit different, with this design I think we could remove the background processor. I was wondering if this is worth doing because we may want to do a few more things in here other than just counting the errors, like producing metrics and reporting tracing etc so I thought I'd leave it like this since it is prolly easier to add these other operations without paying an additional cost on the hot path. If you think it's not worth the extra complexity I'm happy to remove the asynchronous processing.

Since what we are doing now is very simple, and since we have plenty of places where we manipulate metrics inline on hot code paths, I would do that here, too, and remove the extra thread. We can always add it if we ever do need to do something that's more expensive than these two things.

@mangas mangas force-pushed the filipe/endpoint-metrics branch 3 times, most recently from 3a768ac to 5abb889 Compare March 10, 2023 16:51
Comment on lines +240 to +244
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code is repeated a few times, I would suggest a fn metrics_interceptor(&self) -> MetricsInterceptor<Channel> method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's going to be almost the exact same code, prolly the best way here would be to write a macro to implement these 3 functions, they are effectively the same with a different generated type

Copy link
Collaborator

@lutter lutter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Looks great!

@mangas mangas merged commit 8b1a524 into master Mar 11, 2023
@mangas mangas deleted the filipe/endpoint-metrics branch March 11, 2023 13:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants