Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: add observability metrics into sarama code #117693

Merged
merged 1 commit into from Feb 6, 2024

Conversation

wenyihu6
Copy link
Contributor

@wenyihu6 wenyihu6 commented Jan 11, 2024

Now that this patch (#117544) has been merged, sarama now acknowledges and
reacts to kafka server's throttling messages by slowing down. To provide better
observability into sarama code, this patch adds a metrics registry interceptor
and a new metrics changefeed.kafka_throttling_hist_nanos which tracks time (in
nanos) spent in sarama's throttling when cockroachdb exceed the kafka quota.

Fixes: #117618

Release note: changefeed.kafka_throttling_hist_nanos has now been added to
metrics to monitor sarama throttling behavior resulting from exceeding kafka
quota.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@wenyihu6 wenyihu6 changed the title changefeedccl: add observability metrics into sarama code [wip] changefeedccl: add observability metrics into sarama code Jan 11, 2024
@wenyihu6 wenyihu6 force-pushed the ob-sarama branch 2 times, most recently from b851436 to 1f1fdb9 Compare January 11, 2024 21:22
@wenyihu6 wenyihu6 self-assigned this Jan 17, 2024
@wenyihu6 wenyihu6 changed the title [wip] changefeedccl: add observability metrics into sarama code changefeedccl: add observability metrics into sarama code Jan 24, 2024
@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Jan 24, 2024

@miretskiy Do you have any feedback on the high level design for this commit? Feel free to ignore other commits.

@wenyihu6
Copy link
Contributor Author

This is what the metrics currently looks like.

Screenshot 2024-01-24 at 3 26 52 PM

@wenyihu6 wenyihu6 force-pushed the ob-sarama branch 6 times, most recently from 64d6ad8 to 2aeb28a Compare January 29, 2024 19:45
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good. Exactly what we discussed.
@rharding6373 @nicktrav (FYI).

Reviewed 1 of 1 files at r1, 16 of 16 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)


pkg/ccl/changefeedccl/metrics.go line 377 at r2 (raw file):

func (k *kafkaHistogramAdapter) Sum() int64 {
	panic("sum is not expected to be called on kafkaHistogramAdapter")

consider adding cluster settings field to kafkaHistogramAdapter (struct), and instead of panic,
just having these methods be a no-op but include logcrash call: for example:

logcrash.ReportOrPanic(context.Background()  k.sv /* this is cluster settings.values */,  
  "unexpected call to Sum()")

This way: our roachtests will fail; but at least we won't panic production nodes if somebody
calls this method.


pkg/ccl/changefeedccl/metrics.go line 386 at r2 (raw file):

var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil)

func (k *kafkaHistogramAdapter) Update(v int64) {

nit: let's move this method all the way to the top so that those panic-y methods do not bother us that much.


pkg/ccl/changefeedccl/metrics.go line 394 at r2 (raw file):

type KafkaMetricsGetter interface {
	GetThrottlingTimeInMs() *kafkaHistogramAdapter
}

not sure why we need this -- can we just return kafkaHistogramAdapter directly?


pkg/ccl/changefeedccl/metrics.go line 646 at r2 (raw file):

	}
	metaThrottleTimeInMs := metric.Metadata{
		// TODO(wenyihu): add ms to ns conversion

why not do that conversion?


pkg/ccl/changefeedccl/sink_kafka.go line 1246 at r2 (raw file):

type metricsRegistryInterceptor struct {
	metrics.Registry
	// do we want to implement the histogram interface so that it gets called when update is called

let's drop this comment.


pkg/cmd/roachtest/tests/cdc.go line 512 at r2 (raw file):

	tolerateErrors  bool
	sinkURIOverride string
	kafkaQuota      int

what's the unit?


pkg/cmd/roachtest/tests/cdc.go line 2040 at r2 (raw file):

	//	"--entity-type", "users",
	//	"--entity-name", "default")
	//

what's the purpose of these commented blocks?

@wenyihu6 wenyihu6 force-pushed the ob-sarama branch 2 times, most recently from 6a5158e to ac75bd5 Compare January 30, 2024 00:12
wenyihu6 added a commit to wenyihu6/cockroach that referenced this pull request Jan 30, 2024
Previously, `validateExternalConnectionSinkURI` would validate the changefeed
sink URI by creating a fake sink and passing `nil` for `metricsRecorder`. This
is usually not problematic since the sink is never used. A new patch, cockroachdb#117693,
is now changing this and calling `metricsBuilder` interface method inside
`makeKafkaSink`. To resolve this, this patch changes getSink in
`validateExternalConnectionSinkURI` to pass in `(*sliMetrics)(nil)` to avoid
calling methods on a nil interface.

See also: cockroachdb#117693
Release note: none
Epic: none
craig bot pushed a commit that referenced this pull request Jan 30, 2024
118421: changefeedccl: match nil metrics behaviour r=miretskiy a=wenyihu6

Previously, `validateExternalConnectionSinkURI` would validate the changefeed
sink URI by creating a fake sink and passing `nil` for `metricsRecorder`. This
is usually not problematic since the sink is never used. A new patch, #117693,
is now changing this and calling `metricsBuilder` interface method inside
`makeKafkaSink`. To resolve this, this patch changes getSink in
`validateExternalConnectionSinkURI` to pass in `(*sliMetrics)(nil)` to avoid
calling methods on a nil interface.

See also: #117693 
Release note: none
Epic: none


Co-authored-by: Wenyi Hu <wenyi@cockroachlabs.com>
@wenyihu6 wenyihu6 force-pushed the ob-sarama branch 7 times, most recently from f34f4a3 to 6d811c4 Compare February 1, 2024 14:50
@wenyihu6 wenyihu6 removed the request for review from a team February 1, 2024 16:06
Copy link

blathers-crl bot commented Feb 1, 2024

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! There's one comment Yevgeniy left that doesn't look like it was addressed about the getter that I'm also interested in resolving. Otherwise, only questions from me.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @wenyihu6)


pkg/ccl/changefeedccl/metrics.go line 394 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

not sure why we need this -- can we just return kafkaHistogramAdapter directly?

+1 Why can't we return the kafkaHistogramAdapter like we do the parallelIOMetricsRecorderImpl?


pkg/ccl/changefeedccl/metrics.go line 344 at r5 (raw file):

		// valueInMs is passed in from sarama with a unit of milliseconds. To
		// convert this value to nanoseconds, valueInMs * 10^6 is recorded here.
		k.wrapped.RecordValue(valueInMs * 1000000)

Why do we measure the throttling in nanoseconds when the finest grain output we get from sarama is milliseconds?

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 2, 2024

pkg/ccl/changefeedccl/metrics.go line 344 at r5 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Why do we measure the throttling in nanoseconds when the finest grain output we get from sarama is milliseconds?

It's because we only support nanoseconds as measurement for metrics metadata. I can add the measurement support for milliseconds here, but I felt it might lead to inconsistency or confusion when comparing metrics to metrics.

https://github.com/cockroachdb/cockroach/blob/402a0ccd20d811ce37f7ba6f09d737b1c4d8735f/pkg/ccl/changefeedccl/metrics.go#L837

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 2, 2024

pkg/ccl/changefeedccl/metrics.go line 377 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

consider adding cluster settings field to kafkaHistogramAdapter (struct), and instead of panic,
just having these methods be a no-op but include logcrash call: for example:

logcrash.ReportOrPanic(context.Background()  k.sv /* this is cluster settings.values */,  
  "unexpected call to Sum()")

This way: our roachtests will fail; but at least we won't panic production nodes if somebody
calls this method.

I wasn't sure if adding an env var for handling metric panics is the best option here, so I changed it to logging the error instead but am happy to change this. What do you think about this? @rharding6373

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 2, 2024

pkg/ccl/changefeedccl/metrics.go line 386 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nit: let's move this method all the way to the top so that those panic-y methods do not bother us that much.

Done.

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 2, 2024

pkg/ccl/changefeedccl/metrics.go line 394 at r2 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

+1 Why can't we return the kafkaHistogramAdapter like we do the parallelIOMetricsRecorderImpl?

I did this because we might want to return more than one metrics in the future. For example, if there is another sarama metrics we want to add, we can add more methods to this interface.

type KafkaMetricsGetter interface {
	GetKafkaThrottlingNanos() *kafkaHistogramAdapter
        GetMoreMetrics() *kafkaHistogramAdapter
        GetMoreMetrics() *kafkaCounterAdapter
        ....
}

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 2, 2024

pkg/ccl/changefeedccl/metrics.go line 646 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

why not do that conversion?

Done.

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 2, 2024

pkg/ccl/changefeedccl/sink_kafka.go line 1246 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

let's drop this comment.

Done.

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 2, 2024

pkg/cmd/roachtest/tests/cdc.go line 512 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

what's the unit?

I splitted the roachtest pr into another one and addressed this feedback there #118577.

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 2, 2024

pkg/cmd/roachtest/tests/cdc.go line 2040 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

what's the purpose of these commented blocks?

I splitted the roachtest pr into another one and addressed this feedback there #118577.

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/metrics.go line 377 at r2 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

I wasn't sure if adding an env var for handling metric panics is the best option here, so I changed it to logging the error instead but am happy to change this. What do you think about this? @rharding6373

I think that Yevgeniy's suggestion makes sense. If we only log errors, then we won't catch if we're using the kafkaHistogramAdapter inappropriately. We'd like to fail in test environments.


pkg/ccl/changefeedccl/metrics.go line 394 at r2 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

I did this because we might want to return more than one metrics in the future. For example, if there is another sarama metrics we want to add, we can add more methods to this interface.

type KafkaMetricsGetter interface {
	GetKafkaThrottlingNanos() *kafkaHistogramAdapter
        GetMoreMetrics() *kafkaHistogramAdapter
        GetMoreMetrics() *kafkaCounterAdapter
        ....
}

Do we have any more metrics we're adding for sarama in this release? If not, I'd prefer to apply the YAGNI (You Aren't Gonna Need It) principle and avoid having the extra complexity.


pkg/ccl/changefeedccl/metrics.go line 344 at r5 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

It's because we only support nanoseconds as measurement for metrics metadata. I can add the measurement support for milliseconds here, but I felt it might lead to inconsistency or confusion when comparing metrics to metrics.

https://github.com/cockroachdb/cockroach/blob/402a0ccd20d811ce37f7ba6f09d737b1c4d8735f/pkg/ccl/changefeedccl/metrics.go#L837

I figured it may be something like that, but I wasn't sure. Thanks for the clarification!

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 5, 2024

pkg/ccl/changefeedccl/metrics.go line 377 at r2 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

I think that Yevgeniy's suggestion makes sense. If we only log errors, then we won't catch if we're using the kafkaHistogramAdapter inappropriately. We'd like to fail in test environments.

Done.

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 5, 2024

pkg/ccl/changefeedccl/metrics.go line 394 at r2 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Do we have any more metrics we're adding for sarama in this release? If not, I'd prefer to apply the YAGNI (You Aren't Gonna Need It) principle and avoid having the extra complexity.

I changed this PR to return kafkaHistogramAdapter only, and we can discuss if it is valuable to add more metrics in the future work.

I'm thinking about adding https://github.com/IBM/sarama/blob/d8f1dee7204eb120db075509aa1a89db2deb10d4/broker.go#L48 to observe the effect of my work like what we discussed last week. There are some other sarama metrics that could be useful to add as well - listed here https://github.com/IBM/sarama/blob/d43950802a369037c8da6e3d614d7fd359c78836/sarama.go#L23-L84.

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: Great job!

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/metrics.go line 394 at r2 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

I changed this PR to return kafkaHistogramAdapter only, and we can discuss if it is valuable to add more metrics in the future work.

I'm thinking about adding https://github.com/IBM/sarama/blob/d8f1dee7204eb120db075509aa1a89db2deb10d4/broker.go#L48 to observe the effect of my work like what we discussed last week. There are some other sarama metrics that could be useful to add as well - listed here https://github.com/IBM/sarama/blob/d43950802a369037c8da6e3d614d7fd359c78836/sarama.go#L23-L84.

I think that most of the request-related metrics (rate, size, etc.) and some of the response-related metrics could be useful to expose, but I agree, let's save this for future work.

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @miretskiy)

Now that this patch (cockroachdb#117544) has been merged, sarama now acknowledges and
reacts to kafka server's throttling messages by slowing down. To provide better
observability into sarama code, this patch adds a metrics registry interceptor
and a new metrics `changefeed.kafka_throttling_hist_nanos` which tracks time (in
nanos) spent in sarama's throttling when cockroachdb exceed the kafka quota.

Fixes: cockroachdb#117618

Release note: changefeed.kafka_throttling_hist_nanos has now been added to
metrics to monitor sarama throttling behavior resulting from exceeding kafka
quota.
@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 5, 2024

I'm seeing some cases that these other interface functions might get called unexpectedly. For example, they can be called from here if we ever call GetAll on the registry. I don't think we are calling GetAll but I can see how there might be some code underlying which triggers it and we didn't spot.

https://github.com/rcrowley/go-metrics/blob/cf1acfcdf4751e0554ffa765d03e479ec491cad6/registry.go#L144C3-L156C27

This came up as I read the code on https://github.com/rcrowley/go-metrics/blob/cf1acfcdf4751e0554ffa765d03e479ec491cad6/meter.go#L20. At the first glance, no code in the repo directly calls it. But https://github.com/rcrowley/go-metrics/blob/cf1acfcdf4751e0554ffa765d03e479ec491cad6/registry.go#L191 does indirectly calls it underlying by using this interface https://github.com/rcrowley/go-metrics/blob/cf1acfcdf4751e0554ffa765d03e479ec491cad6/registry.go#L242-L245. So I'm no longer confident about these other interface methods getting called any more. But it's good that we only panics under test run. This is not a problem since Stop is not a part of the interface here, but there might be similar cases for other interface methods.

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Feb 5, 2024

TFTRs!!

bors r=rharding6373

@craig craig bot merged commit 804d37e into cockroachdb:master Feb 6, 2024
9 checks passed
@craig
Copy link
Contributor

craig bot commented Feb 6, 2024

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

cdc: add observability metrics to throttling behaviour in kafka
4 participants