-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Pulsar Function localrun with multiple instances and metrics server is enabled #10208
Fix Pulsar Function localrun with multiple instances and metrics server is enabled #10208
Conversation
01f8410
to
bf1269b
Compare
@michaeljmarshall please also take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for tagging me @jerrypeng. I left a couple of comments.
|
||
public class FunctionCollectorRegistryImpl extends FunctionCollectorRegistry { | ||
|
||
private final Map<String, Collector> namesToCollectors = new HashMap<String, Collector>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. I think it might be more efficient to use a ConcurrentHashMap
for this map, given that the map is accessed from several threads. Then, use computeIfAbsent
in the registerIfNotExist
method instead of using synchronized
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I didn't utilize ConcurrentHashMap because I don't expect that many instances will be started up in a local run process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair. I see that this won't be used much and likely won't encounter much contention after metrics are initialized.
@@ -171,7 +172,7 @@ public RuntimeEnv convert(String value) { | |||
protected String secretsProviderClassName; | |||
@Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true) | |||
protected String secretsProviderConfig; | |||
@Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server", hidden = true) | |||
@Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server. When running instances as threads, one metrics server is used to host the stats for all instances.", hidden = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be a range because every instance thread expects a metricsPort
? I see that in startProcessMode
we use a range but in startThreadedMode
we use a single value. Further, I see that we're only using metricsPortStart
when starting the metrics server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For running instances in process mode i.e. each instance is a separate JVM processes, the port range is needed as each instance will run its own metrics server on the specified port in its own respective process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for that explanation. I hadn't realized both options were possible. Makes sense.
Once this is merged, it'd be great to get this in |
6d784b8
to
5f69877
Compare
@michaeljmarshall sounds good to me. Can you volunteer to do the backport :) ? |
Motivation
Due to the recent change to Pulsar Function local run in threaded mode that exposed metrics via a metrics server:
#10156
running multiple instances in threaded mode with metrics server turned on created a situation where instances attempted to register the same metric multiple times causing an exception with text
Collector already registered that provides name ...
The above PR also started up a metrics server per threaded instance on a different port which is also not ideal. Ideally, all metrics for all instances are exposed via single port and metrics server.
Modifications
Change the metrics code in functions/sources/sinks to allow registering metrics that already been registered. When that occurs, the existing metric will be returned from the register method.