-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding channel watcher to metricMap queue #319
Adding channel watcher to metricMap queue #319
Conversation
METRICS.md
Outdated
@@ -84,6 +84,7 @@ the samples are reset. | |||
| Channel name | Additional tags | Description | |||
| ------------------------- | --------------- | ----------- | |||
| dispatch_aggregator | aggregator_id | Channel to dispatch metrics to a specific aggregator. | |||
| dispatch_receiver | aggregator_id | Channel to dispatch singular metrics to a given aggregator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I borked up the meaning of this channel, let me go fix it,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest changing the channel names for both, dispatch_aggregator
is less accurate now because the term could cover either. dispatch_aggregator_batch
and dispatch_aggregator_map
maybe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me 👍
Though, the usage of the two channels is still a mystery to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's two paths to the aggregators:
- UDP receiver -> []*Metric -> a bunch of PipelineHandler.DispatchMetrics -> Aggregator via dispatch_aggregator_batch
- HTTP receiver -> *MetricMap -> a bunch of PipelineHandler.DispatchMetricMap -> Aggregator via dispatch_aggregator_map
There's also the Consolidator who's job it is to receive []*Metric and *MetricMaps, bundle them all up in to a single *MetricMap, and then send them out somewhere. Currently it's only used for HTTP forwarding, but it could easily be placed in front of the UDP receiver, which is part 1 of removing DispatchMetrics from the pipeline entirely.
Part 2 is dealing with the cloud provider, although it's basically the same thing, just drop a Consolidator in there.
pkg/statsd/aggregator.go
Outdated
func (a *MetricAggregator) Receive(ms ...*gostatsd.Metric) { | ||
a.metricsReceived += uint64(len(ms)) | ||
for _, m := range ms { | ||
a.metricMap.Receive(m) | ||
} | ||
} | ||
|
||
// ReceiveMap takes a single Me |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regular Me or Mini-Me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The me-me
On a tangent @tiedotguy , looks like the protobuf marshaler and pb definitions are using a deprecated package. (from github) and will have to move to the google provided package. |
I saw the PB thing when I was looking at gRPC. The docs were full of references to "use this package", and then the package was like "no". I dunno if they've been updated since. |
I made an issue. |
@@ -47,13 +47,21 @@ func (w *worker) executeProcess(cmd *processCommand) { | |||
} | |||
|
|||
func (w *worker) RunMetrics(ctx context.Context, statser stats.Statser) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method, like other similar ones that do work concurrently, should block until the goroutine below finishes. wait.Group
can help here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does @ash2k , it just forces one to run within the background on a different routine and will cancel once context is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I have missed something on how contexts work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'm saying is this method will return without waiting for go stats.NewChannelStatsWatcher().Run(ctx)
to terminate. Other places that start goroutines block until the spawned goroutines finish. This is a very important property that allows to write deterministic tests without data races.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to put both channels into the wait.StartWithContext
then wait and the end?
Or just move the singular one there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MovieStoreGuy something like this:
func (w *worker) RunMetrics(ctx context.Context, statser stats.Statser) {
stgr := stager.New()
csw := stats.NewChannelStatsWatcher(...chan1..., statser)
stgr.NextStage().StartWithContext(csw.Run)
csw = stats.NewChannelStatsWatcher(...chan2..., statser)
stgr.NextStage().StartWithContext(csw.Run)
<-ctx.Done()
stgr.Shutdown()
}
When the context.Done is signaled, stgr.Shutdown() will cancel each stage, in reverse order, and wait for them to complete. There's a wait.Group behind the scenes which ensures ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended going for the former since it make the invocation of the watchers the same and neater control flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me.
METRICS.md
Outdated
@@ -83,7 +83,8 @@ the samples are reset. | |||
|
|||
| Channel name | Additional tags | Description | |||
| ------------------------- | --------------- | ----------- | |||
| dispatch_aggregator | aggregator_id | Channel to dispatch metrics to a specific aggregator. | |||
| dispatch_aggregator_batch | aggregator_id | Channel to dispatch metrics to a specific aggregator. | |||
| dispatch_aggregator_map | aggregator_id | Channel to dispatch singular metrics to a given aggregator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"metric maps", not "singular metrics"
Adding new modules to the latest
04b0a19
to
588afc3
Compare
@@ -47,13 +48,22 @@ func (w *worker) executeProcess(cmd *processCommand) { | |||
} | |||
|
|||
func (w *worker) RunMetrics(ctx context.Context, statser stats.Statser) { | |||
csw := stats.NewChannelStatsWatcher( | |||
wg := &wait.Group{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is fine, but usually it's just var wg wait.Group
We've reached some place were where are blocking and it has to determine the issue while missing continuous profiling in our production environment.
I am adding a channel watcher to metricMap queue that is created be worker.