Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: add observability metrics into sarama code #117693

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@
<tr><td>APPLICATION</td><td>changefeed.forwarded_resolved_messages</td><td>Resolved timestamps forwarded from the change aggregator to the change frontier</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.frontier_updates</td><td>Number of change frontier updates across all feeds</td><td>Updates</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.internal_retry_message_count</td><td>Number of messages for which an attempt to retry them within an aggregator node was made</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.kafka_throttling_hist_nanos</td><td>Time spent in throttling due to exceeding kafka quota</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.lagging_ranges</td><td>The number of ranges considered to be lagging behind</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>(Deprecated in favor of checkpoint_progress) The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.message_size_hist</td><td>Message size histogram</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ go_library(
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
Expand Down
111 changes: 111 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/rcrowley/go-metrics"
)

const (
Expand All @@ -35,6 +38,7 @@ const (
changefeedIOQueueMaxLatency = 5 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
kafkaThrottlingTimeMaxValue = 5 * time.Minute
)

// max length for the scope name.
Expand Down Expand Up @@ -76,6 +80,7 @@ type AggMetrics struct {
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
Expand Down Expand Up @@ -106,6 +111,7 @@ type metricsRecorder interface {
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand Down Expand Up @@ -145,6 +151,7 @@ type sliMetrics struct {
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -325,6 +332,80 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}

type kafkaHistogramAdapter struct {
settings *cluster.Settings
wrapped *aggmetric.Histogram
}

var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil)

func (k *kafkaHistogramAdapter) Update(valueInMs int64) {
if k != nil {
// valueInMs is passed in from sarama with a unit of milliseconds. To
// convert this value to nanoseconds, valueInMs * 10^6 is recorded here.
k.wrapped.RecordValue(valueInMs * 1000000)
}
}

func (k *kafkaHistogramAdapter) Clear() {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Count() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Count on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Max() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Max on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Mean() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Mean on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Min() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Min on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentile on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentiles on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sample on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Snapshot on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) StdDev() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to StdDev on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Sum() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Variance() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Variance on kafkaHistogramAdapter")
return
}

type parallelIOMetricsRecorder interface {
recordPendingQueuePush(numKeys int64)
recordPendingQueuePop(numKeys int64, latency time.Duration)
Expand Down Expand Up @@ -380,6 +461,16 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
}
}

func (m *sliMetrics) getKafkaThrottlingMetrics(settings *cluster.Settings) metrics.Histogram {
if m == nil {
return (*kafkaHistogramAdapter)(nil)
}
return &kafkaHistogramAdapter{
settings: settings,
wrapped: m.KafkaThrottlingNanos,
}
}

func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
if m == nil {
return
Expand Down Expand Up @@ -470,6 +561,12 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric
return w.inner.newParallelIOMetricsRecorder()
}

func (w *wrappingCostController) getKafkaThrottlingMetrics(
settings *cluster.Settings,
) metrics.Histogram {
return w.inner.getKafkaThrottlingMetrics(settings)
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down Expand Up @@ -721,6 +818,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaChangefeedKafkaThrottlingNanos := metric.Metadata{
Name: "changefeed.kafka_throttling_hist_nanos",
Help: "Time spent in throttling due to exceeding kafka quota",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand Down Expand Up @@ -813,6 +916,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedKafkaThrottlingNanos,
Duration: histogramWindow,
MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(),
SigFigs: 2,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -878,6 +988,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
Expand Down
35 changes: 32 additions & 3 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
}

func buildKafkaConfig(
ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig,
ctx context.Context,
u sinkURL,
jsonStr changefeedbase.SinkSpecificJSONConfig,
kafkaThrottlingMetrics metrics.Histogram,
) (*sarama.Config, error) {
dialConfig, err := buildDialConfig(u)
if err != nil {
Expand All @@ -1090,6 +1094,7 @@ func buildKafkaConfig(
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
config.Metadata.Full = false
config.MetricRegistry = newMetricsRegistryInterceptor(kafkaThrottlingMetrics)

if dialConfig.tlsEnabled {
config.Net.TLS.Enable = true
Expand Down Expand Up @@ -1176,7 +1181,8 @@ func makeKafkaSink(
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
}

config, err := buildKafkaConfig(ctx, u, jsonStr)
m := mb(requiresResourceAccounting)
config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings))
if err != nil {
return nil, err
}
Expand All @@ -1195,7 +1201,7 @@ func makeKafkaSink(
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
metrics: mb(requiresResourceAccounting),
metrics: m,
topics: topics,
disableInternalRetry: !internalRetryEnabled,
}
Expand Down Expand Up @@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string {
atomic.LoadInt64(&s.largestMessageSize),
)
}

type metricsRegistryInterceptor struct {
metrics.Registry
kafkaThrottlingNanos metrics.Histogram
}

var _ metrics.Registry = (*metricsRegistryInterceptor)(nil)

func newMetricsRegistryInterceptor(kafkaMetrics metrics.Histogram) *metricsRegistryInterceptor {
return &metricsRegistryInterceptor{
Registry: metrics.NewRegistry(),
kafkaThrottlingNanos: kafkaMetrics,
}
}

func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} {
const throttleTimeMsMetricsPrefix = "throttle-time-in-ms"
if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) {
return mri.kafkaThrottlingNanos
} else {
return mri.Registry.GetOrRegister(name, i)
}
}
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/rcrowley/go-metrics"
)

type sinkTelemetryData struct {
Expand Down Expand Up @@ -216,6 +217,12 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr
return r.inner.newParallelIOMetricsRecorder()
}

func (r *telemetryMetricsRecorder) getKafkaThrottlingMetrics(
settings *cluster.Settings,
) metrics.Histogram {
return r.inner.getKafkaThrottlingMetrics(settings)
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
Expand Down