Skip to content

Commit

Permalink
changefeedccl: add observability metrics into sarama code
Browse files Browse the repository at this point in the history
This patch injects crdb cdc metrics into sarama code to provide more
observability into throttling behaviour from kafka.

Fixes: #117618
Release note: (to add later)

TODO(wenyihu6): discuss if this is a good approach + what other sarama metrics
are useful and should be added as well
  • Loading branch information
wenyihu6 committed Jan 11, 2024
1 parent e08886f commit 1f1fdb9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
17 changes: 17 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const defaultSLIScope = "default"
// AggMetrics are aggregated metrics keeping track of aggregated changefeed performance
// indicators, combined with a limited number of per-changefeed indicators.
type AggMetrics struct {
ThrottlingTimeMs *aggmetric.AggHistogram
EmittedMessages *aggmetric.AggCounter
EmittedBatchSizes *aggmetric.AggHistogram
FilteredMessages *aggmetric.AggCounter
Expand Down Expand Up @@ -116,6 +117,7 @@ func (a *AggMetrics) MetricStruct() {}

// sliMetrics holds all SLI related metrics aggregated into AggMetrics.
type sliMetrics struct {
ThrottlingTimeMs *aggmetric.Histogram
EmittedMessages *aggmetric.Counter
EmittedBatchSizes *aggmetric.Histogram
FilteredMessages *aggmetric.Counter
Expand Down Expand Up @@ -548,6 +550,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaThrottleTimeInMs := metric.Metadata{
// TODO(wenyihu): add ms to ns conversion
Name: "changefeed.throttle_time_in_ms",
Help: "Throttling tims spent in ms due to kafka quota",
Measurement: "Milliseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaChangefeedEmittedBatchSizes := metric.Metadata{
Name: "changefeed.emitted_batch_sizes",
Help: "Size of batches emitted emitted by all feeds",
Expand Down Expand Up @@ -736,6 +745,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
a := &AggMetrics{
ThrottlingTimeMs: b.Histogram(metric.HistogramOptions{
Metadata: metaThrottleTimeInMs,
Duration: histogramWindow,
MaxVal: 16e6, // TODO(wenyihu): check the options here
SigFigs: 1,
BucketConfig: metric.DataCount16MBuckets,
}),
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
EmittedBatchSizes: b.Histogram(metric.HistogramOptions{
Expand Down Expand Up @@ -851,6 +867,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
}

sm := &sliMetrics{
ThrottlingTimeMs: a.ThrottlingTimeMs.AddChild(scope),
EmittedMessages: a.EmittedMessages.AddChild(scope),
EmittedBatchSizes: a.EmittedBatchSizes.AddChild(scope),
FilteredMessages: a.FilteredMessages.AddChild(scope),
Expand Down
27 changes: 27 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -1090,6 +1091,7 @@ func buildKafkaConfig(
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
config.Metadata.Full = false
config.MetricRegistry = newMetricsRegistryInterceptor()

if dialConfig.tlsEnabled {
config.Net.TLS.Enable = true
Expand Down Expand Up @@ -1234,3 +1236,28 @@ func (s *kafkaStats) String() string {
atomic.LoadInt64(&s.largestMessageSize),
)
}

type metricsRegistryInterceptor struct {
metrics.Registry
// do we want to implement the histogram interface so that it gets called when update is called
throttleTimeMs metrics.Histogram
}

var _ metrics.Registry = (*metricsRegistryInterceptor)(nil)

func newMetricsRegistryInterceptor() *metricsRegistryInterceptor {
return &metricsRegistryInterceptor{
Registry: metrics.NewRegistry(),
// TODO: make a way to pass cdc histogram down here?
throttleTimeMs: ,
}
}

func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} {
const throttleTimeMsMetricsPrefix = "throttle-time-in-ms"
if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) {
return mri.throttleTimeMs
} else {
return mri.Registry.GetOrRegister(name, i)
}
}
8 changes: 8 additions & 0 deletions pkg/util/metric/aggmetric/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/google/btree"
prometheusgo "github.com/prometheus/client_model/go"
"github.com/rcrowley/go-metrics"
)

var now = timeutil.Now
Expand Down Expand Up @@ -164,11 +165,14 @@ func (a *AggHistogram) AddChild(labelVals ...string) *Histogram {
// appear with a distinct label, however, when cockroach internally collects
// metrics, only the parent is collected.
type Histogram struct {
metrics.Histogram
parent *AggHistogram
labelValuesSlice
h metric.IHistogram
}

var _ metrics.Histogram = (*Histogram)(nil)

// ToPrometheusMetric constructs a prometheus metric for this Histogram.
func (g *Histogram) ToPrometheusMetric() *prometheusgo.Metric {
return g.h.ToPrometheusMetric()
Expand All @@ -193,3 +197,7 @@ func (g *Histogram) RecordValue(v int64) {
g.h.RecordValue(v)
g.parent.h.RecordValue(v)
}

func (g *Histogram) Update(v int64) {
g.RecordValue(v)
}

0 comments on commit 1f1fdb9

Please sign in to comment.