Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

obsservice: creating Insights Pipeline #114802

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/obsservice/cmd/obsservice/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ go_library(
"//pkg/obsservice/obslib/httpproxy",
"//pkg/obsservice/obslib/ingest",
"//pkg/obsservice/obslib/obsutil",
"//pkg/obsservice/obslib/process",
"//pkg/obsservice/obslib/produce",
"//pkg/obsservice/obslib/queue",
"//pkg/obsservice/obslib/router",
"//pkg/obsservice/obslib/transform",
"//pkg/obsservice/obslib/validate",
"//pkg/obsservice/obspb",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/ui/distoss",
Expand Down
53 changes: 52 additions & 1 deletion pkg/obsservice/cmd/obsservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/httpproxy"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obsutil"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/process"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/produce"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/queue"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/router"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/transform"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/validate"
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1"
_ "github.com/cockroachdb/cockroach/pkg/ui/distoss" // web UI init hooks
Expand All @@ -48,6 +53,10 @@ var drainSignals = []os.Signal{unix.SIGINT, unix.SIGTERM}
// shutdown (i.e. second occurrence does not incur hard shutdown).
var termSignal os.Signal = unix.SIGTERM

// maxMemoryBytes is the max memory bytes to be used by memory queue.
// TODO(maryliag): make performance testing to decide on the final value.
var maxMemoryBytes int = 500 * 1024 * 1024 // 500Mb

// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
Use: "obsservice",
Expand Down Expand Up @@ -83,11 +92,19 @@ from one or more CockroachDB clusters.`,

stopper := stop.NewStopper()

stmtInsightsPipeline, stmtInsightsProcessor, err := makeStatementInsightsPipeline()
if err != nil {
return errors.Wrapf(err, "failed to create Statement Insights Pipeline")
}
// Run the event ingestion in the background.
eventRouter := router.NewEventRouter(map[obspb.EventType]obslib.EventConsumer{
obspb.EventlogEvent: &obsutil.StdOutConsumer{},
obspb.StatementInsightsStatsEvent: &obsutil.StdOutConsumer{},
obspb.StatementInsightsStatsEvent: stmtInsightsPipeline,
})
err = stmtInsightsProcessor.Start(ctx, stopper)
if err != nil {
return errors.Wrapf(err, "failed to start Statement Insights Processor")
}
ingester := ingest.MakeEventIngester(ctx, eventRouter, nil)

// Instantiate the net listener & gRPC server.
Expand Down Expand Up @@ -237,3 +254,37 @@ func handleSignalDuringShutdown(sig os.Signal) {
// Block while we wait for the signal to be delivered.
select {}
}

func makeStatementInsightsPipeline() (
obslib.EventConsumer,
*process.MemQueueProcessor[*obspb.StatementInsightsStatistics],
error,
) {
memQueue := queue.NewMemoryQueue[*obspb.StatementInsightsStatistics](
maxMemoryBytes, func(statistics *obspb.StatementInsightsStatistics) int {
return statistics.Size()
}, "StmtInsightsStatisticsMemQueue")
memQueueProducer := produce.NewMemQueueProducer[*obspb.StatementInsightsStatistics](memQueue)
insightsTransformer := &transform.StmtInsightTransformer{}
insightsValidator := &validate.StmtInsightValidator{}

producerGroup, err := produce.NewProducerGroup[*obspb.StatementInsightsStatistics](
"StmtInsightsStatisticsProducerGroup",
obslib.Observability,
insightsTransformer,
[]validate.Validator[*obspb.StatementInsightsStatistics]{insightsValidator},
[]produce.EventProducer[*obspb.StatementInsightsStatistics]{memQueueProducer})

if err != nil {
return nil, nil, err
}

// TODO: replace process.InsightsStdoutProcessor for a real Insights processor
// and delete the file process/stdout.go
processor, err := process.NewMemQueueProcessor[*obspb.StatementInsightsStatistics](memQueue, &process.InsightsStdoutProcessor{})
if err != nil {
return nil, nil, err
}

return producerGroup, processor, nil
}
2 changes: 1 addition & 1 deletion pkg/obsservice/obslib/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ type OwnerTeam string

const (
// ObsInfra is the Observability Infrastructure team.
ObsInfra OwnerTeam = "obs-infra"
Observability OwnerTeam = "observability"
)
2 changes: 2 additions & 0 deletions pkg/obsservice/obslib/process/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ go_library(
srcs = [
"mem_queue_processor.go",
"processor.go",
"stdout.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/process",
visibility = ["//visibility:public"],
deps = [
"//pkg/obsservice/obslib",
"//pkg/obsservice/obslib/queue",
"//pkg/obsservice/obslib/validate",
"//pkg/obsservice/obspb",
"//pkg/util/log",
"//pkg/util/stop",
"@com_github_cockroachdb_errors//:errors",
Expand Down
4 changes: 2 additions & 2 deletions pkg/obsservice/obslib/process/mem_queue_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/stretchr/testify/require"
)

var testStrSizeFn = queue.SizeFn[string](func(s string) (int, error) {
return len(s), nil
var testStrSizeFn = queue.SizeFn[string](func(s string) int {
return len(s)
})

func TestMemQueueProcessor(t *testing.T) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/obsservice/obslib/process/stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package process

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
)

// TODO: Delete this file once a proper Processor is created for Statement Insights.

type InsightsStdoutProcessor struct{}

func (t *InsightsStdoutProcessor) Process(
_ context.Context, stmtInsight *obspb.StatementInsightsStatistics,
) error {
fmt.Println(stmtInsight)
return nil
}

var _ EventProcessor[*obspb.StatementInsightsStatistics] = (*InsightsStdoutProcessor)(nil)
12 changes: 3 additions & 9 deletions pkg/obsservice/obslib/queue/mem_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// SizeFn returns the size, in bytes, of type T. Used for the MemoryQueue
// to determine the size of enqueued/dequeued elements.
type SizeFn[T any] func(T) (int, error)
type SizeFn[T any] func(T) int

// MemoryQueue is a FIFO, in-memory event queue designed for use by the
// observability service for buffering events awaiting processing.
Expand Down Expand Up @@ -71,10 +71,7 @@ func (q *MemoryQueue[T]) Alias() string {
// If buffering the provided element would exceed the configured
// max size, an error is returned and the element is not buffered.
func (q *MemoryQueue[T]) Enqueue(e T) error {
size, err := q.sizeFn(e)
if err != nil {
return errors.Wrap(err, "sizing element")
}
size := q.sizeFn(e)
q.mu.Lock()
defer q.mu.Unlock()
if q.mu.curSize+size > q.mu.maxSize {
Expand Down Expand Up @@ -104,10 +101,7 @@ func (q *MemoryQueue[T]) Dequeue() (T, bool) {
if !ok {
panic(errors.AssertionFailedf("unable to assert type on Dequeue() for %s queue: %v", q.alias, e.Value))
}
size, err := q.sizeFn(ret)
if err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "sizing element on dequeue for %q", q.alias))
}
size := q.sizeFn(ret)
// TODO(abarganier): Gauge metric(s) to track queue size & length.
q.mu.curSize = q.mu.curSize - size
return ret, true
Expand Down
8 changes: 4 additions & 4 deletions pkg/obsservice/obslib/queue/mem_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
func TestMemoryQueue_Enqueue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
sizeFn := func(e *testElement) (int, error) {
return e.Size(), nil
sizeFn := func(e *testElement) int {
return e.Size()
}

tests := []struct {
Expand Down Expand Up @@ -85,8 +85,8 @@ func TestMemoryQueue_Enqueue(t *testing.T) {
func TestMemoryQueue_Dequeue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
sizeFn := func(e *testElement) (int, error) {
return e.Size(), nil
sizeFn := func(e *testElement) int {
return e.Size()
}

element1 := newTestEl(10)
Expand Down
2 changes: 2 additions & 0 deletions pkg/obsservice/obslib/transform/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "transform",
srcs = [
"log_record_to_event.go",
"stmt_insight_transformer.go",
"transformer.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/transform",
Expand All @@ -13,5 +14,6 @@ go_library(
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource",
"//pkg/util/protoutil",
],
)
29 changes: 29 additions & 0 deletions pkg/obsservice/obslib/transform/stmt_insight_transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package transform

import (
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)

type StmtInsightTransformer struct {
}

var _ EventTransformer[*obspb.StatementInsightsStatistics] = (*StmtInsightTransformer)(nil)

func (t *StmtInsightTransformer) Transform(
event *obspb.Event,
) (*obspb.StatementInsightsStatistics, error) {
var insight obspb.StatementInsightsStatistics
if err := protoutil.Unmarshal(event.LogRecord.Body.GetBytesValue(), &insight); err != nil {
return nil, err
}
return &insight, nil
}
6 changes: 5 additions & 1 deletion pkg/obsservice/obslib/validate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "validate",
srcs = ["validate.go"],
srcs = [
"stmt_insights_validator.go",
"validate.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/validate",
visibility = ["//visibility:public"],
deps = ["//pkg/obsservice/obspb"],
)
21 changes: 21 additions & 0 deletions pkg/obsservice/obslib/validate/stmt_insights_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package validate

import "github.com/cockroachdb/cockroach/pkg/obsservice/obspb"

type StmtInsightValidator struct {
}

var _ Validator[*obspb.StatementInsightsStatistics] = (*StmtInsightValidator)(nil)

func (t *StmtInsightValidator) Validate(_ *obspb.StatementInsightsStatistics) error {
//TODO: Add proper validation
return nil
}
2 changes: 1 addition & 1 deletion pkg/obsservice/obspb/obsservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ message StatementInsightsStatistics {
double run_lat_seconds = 31;
// ServiceLatSeconds is the time in seconds to service the query, from start of parse to end of execute.
double service_lat_seconds = 32;
repeated ContentionEvent contention_events = 33 [(gogoproto.nullable) = true];
repeated ContentionEvent contention_events = 33;
}

message ContentionEvent {
Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func NewServer(
) *Server {
metrics := makeMetrics(false /* internal */)
serverMetrics := makeServerMetrics(cfg)
insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics)
insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics, eventsExporter)
reportedSQLStats := sslocal.New(
cfg.Settings,
sqlstats.MaxMemReportedSQLStatsStmtFingerprints,
Expand Down Expand Up @@ -480,7 +480,7 @@ func NewServer(
FlushCounter: serverMetrics.StatsMetrics.SQLStatsFlushStarted,
FailureCounter: serverMetrics.StatsMetrics.SQLStatsFlushFailure,
FlushDuration: serverMetrics.StatsMetrics.SQLStatsFlushDuration,
}, memSQLStats, eventsExporter)
}, memSQLStats)

s.sqlStats = persistedSQLStats
s.sqlStatsController = persistedSQLStats.GetController(cfg.SQLStatusServer)
Expand Down Expand Up @@ -605,6 +605,7 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// should be accounted for in their costs.
ctx = multitenant.WithTenantCostControlExemption(ctx)

s.insights.Start(ctx, stopper)
s.sqlStats.Start(ctx, stopper)

s.schemaTelemetryController.Start(ctx, stopper)
Expand All @@ -614,8 +615,6 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// Usually it is telemetry's reporter's job to clear the reporting SQL Stats.
s.reportedStats.Start(ctx, stopper)

s.insights.Start(ctx, stopper)

s.txnIDCache.Start(ctx, stopper)
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/sqlstats/insights/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights",
visibility = ["//visibility:public"],
deps = [
"//pkg/cli/cliflags",
"//pkg/obs",
"//pkg/obsservice/obspb",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/appstatspb",
Expand All @@ -29,9 +33,11 @@ go_library(
"//pkg/util/intsets",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/quantile",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_redact//:redact",
"@com_github_prometheus_client_model//go",
],
Expand All @@ -46,12 +52,12 @@ go_test(
"insights_test.go",
"provider_test.go",
"registry_test.go",
"sink_test.go",
"store_test.go",
],
data = glob(["testdata/**"]),
embed = [":insights"],
deps = [
"//pkg/obs",
"//pkg/obsservice/obspb",
"//pkg/settings/cluster",
"//pkg/sql/appstatspb",
Expand Down
Loading