Skip to content

Commit

Permalink
obsservice: creating Insights Pipeline
Browse files Browse the repository at this point in the history
Create basic structure for Insights Pipeline.
Currently the Validator (always return that is valid)
and Processor (just printing the value) are very simple.

This commit is to introduce that structure so can be iterated on.

Part Of CC-26215

Release note: None
  • Loading branch information
maryliag committed Nov 21, 2023
1 parent 78ce9b1 commit 21d61e3
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 12 deletions.
5 changes: 5 additions & 0 deletions pkg/obsservice/cmd/obsservice/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ go_library(
"//pkg/obsservice/obslib/httpproxy",
"//pkg/obsservice/obslib/ingest",
"//pkg/obsservice/obslib/obsutil",
"//pkg/obsservice/obslib/process",
"//pkg/obsservice/obslib/produce",
"//pkg/obsservice/obslib/queue",
"//pkg/obsservice/obslib/router",
"//pkg/obsservice/obslib/transform",
"//pkg/obsservice/obslib/validate",
"//pkg/obsservice/obspb",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/ui/distoss",
Expand Down
49 changes: 48 additions & 1 deletion pkg/obsservice/cmd/obsservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/httpproxy"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/obsutil"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/process"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/produce"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/queue"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/router"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/transform"
"github.com/cockroachdb/cockroach/pkg/obsservice/obslib/validate"
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1"
_ "github.com/cockroachdb/cockroach/pkg/ui/distoss" // web UI init hooks
Expand All @@ -48,6 +53,10 @@ var drainSignals = []os.Signal{unix.SIGINT, unix.SIGTERM}
// shutdown (i.e. second occurrence does not incur hard shutdown).
var termSignal os.Signal = unix.SIGTERM

// maxMemoryBytes is the max memory bytes to be used by memory queue.
// TODO(maryliag): make performance testing to decide on the final value.
var maxMemoryBytes int = 500 * 1024 * 1024 // 500Mb

// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
Use: "obsservice",
Expand Down Expand Up @@ -83,11 +92,19 @@ from one or more CockroachDB clusters.`,

stopper := stop.NewStopper()

stmtInsightsPipeline, stmtInsightsProcessor, err := makeStatementInsightsPipeline()
if err != nil {
return errors.Wrapf(err, "failed to create Statement Insights Pipeline")
}
// Run the event ingestion in the background.
eventRouter := router.NewEventRouter(map[obspb.EventType]obslib.EventConsumer{
obspb.EventlogEvent: &obsutil.StdOutConsumer{},
obspb.StatementInsightsStatsEvent: &obsutil.StdOutConsumer{},
obspb.StatementInsightsStatsEvent: stmtInsightsPipeline,
})
err = stmtInsightsProcessor.Start(ctx, stopper)
if err != nil {
return errors.Wrapf(err, "failed to start Statement Insights Processor")
}
ingester := ingest.MakeEventIngester(ctx, eventRouter, nil)

// Instantiate the net listener & gRPC server.
Expand Down Expand Up @@ -237,3 +254,33 @@ func handleSignalDuringShutdown(sig os.Signal) {
// Block while we wait for the signal to be delivered.
select {}
}

func makeStatementInsightsPipeline() (obslib.EventConsumer, *process.MemQueueProcessor[*obspb.StatementInsightsStatistics], error) {
memQueue := queue.NewMemoryQueue[*obspb.StatementInsightsStatistics](
maxMemoryBytes, func(statistics *obspb.StatementInsightsStatistics) int {
return statistics.Size()
}, "StmtInsightsStatisticsMemQueue")
memQueueProducer := produce.NewMemQueueProducer[*obspb.StatementInsightsStatistics](memQueue)
insightsTransformer := &transform.StmtInsightTransformer{}
insightsValidator := &validate.StmtInsightValidator{}

producerGroup, err := produce.NewProducerGroup[*obspb.StatementInsightsStatistics](
"StmtInsightsStatisticsProducerGroup",
obslib.Observability,
insightsTransformer,
[]validate.Validator[*obspb.StatementInsightsStatistics]{insightsValidator},
[]produce.EventProducer[*obspb.StatementInsightsStatistics]{memQueueProducer})

if err != nil {
return nil, nil, err
}

// TODO: replace process.InsightsStdoutProcessor for a real Insights processor
// and delete the file process/stdout.go
processor, err := process.NewMemQueueProcessor[*obspb.StatementInsightsStatistics](memQueue, &process.InsightsStdoutProcessor{})
if err != nil {
return nil, nil, err
}

return producerGroup, processor, nil
}
2 changes: 1 addition & 1 deletion pkg/obsservice/obslib/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ type OwnerTeam string

const (
// ObsInfra is the Observability Infrastructure team.
ObsInfra OwnerTeam = "obs-infra"
Observability OwnerTeam = "observability"
)
2 changes: 2 additions & 0 deletions pkg/obsservice/obslib/process/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ go_library(
srcs = [
"mem_queue_processor.go",
"processor.go",
"stdout.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/process",
visibility = ["//visibility:public"],
deps = [
"//pkg/obsservice/obslib",
"//pkg/obsservice/obslib/queue",
"//pkg/obsservice/obslib/validate",
"//pkg/obsservice/obspb",
"//pkg/util/log",
"//pkg/util/stop",
"@com_github_cockroachdb_errors//:errors",
Expand Down
27 changes: 27 additions & 0 deletions pkg/obsservice/obslib/process/stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package process

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
)

// TODO: Delete this file once a proper Processor is created for Statement Insights.

type InsightsStdoutProcessor struct{}

func (t *InsightsStdoutProcessor) Process(_ context.Context, stmtInsight *obspb.StatementInsightsStatistics) error {
fmt.Println(stmtInsight)
return nil
}

var _ EventProcessor[*obspb.StatementInsightsStatistics] = (*InsightsStdoutProcessor)(nil)
12 changes: 3 additions & 9 deletions pkg/obsservice/obslib/queue/mem_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// SizeFn returns the size, in bytes, of type T. Used for the MemoryQueue
// to determine the size of enqueued/dequeued elements.
type SizeFn[T any] func(T) (int, error)
type SizeFn[T any] func(T) int

// MemoryQueue is a FIFO, in-memory event queue designed for use by the
// observability service for buffering events awaiting processing.
Expand Down Expand Up @@ -71,10 +71,7 @@ func (q *MemoryQueue[T]) Alias() string {
// If buffering the provided element would exceed the configured
// max size, an error is returned and the element is not buffered.
func (q *MemoryQueue[T]) Enqueue(e T) error {
size, err := q.sizeFn(e)
if err != nil {
return errors.Wrap(err, "sizing element")
}
size := q.sizeFn(e)
q.mu.Lock()
defer q.mu.Unlock()
if q.mu.curSize+size > q.mu.maxSize {
Expand Down Expand Up @@ -104,10 +101,7 @@ func (q *MemoryQueue[T]) Dequeue() (T, bool) {
if !ok {
panic(errors.AssertionFailedf("unable to assert type on Dequeue() for %s queue: %v", q.alias, e.Value))
}
size, err := q.sizeFn(ret)
if err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "sizing element on dequeue for %q", q.alias))
}
size := q.sizeFn(ret)
// TODO(abarganier): Gauge metric(s) to track queue size & length.
q.mu.curSize = q.mu.curSize - size
return ret, true
Expand Down
2 changes: 2 additions & 0 deletions pkg/obsservice/obslib/transform/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "transform",
srcs = [
"log_record_to_event.go",
"stmt_insight_transformer.go",
"transformer.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/transform",
Expand All @@ -13,5 +14,6 @@ go_library(
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource",
"//pkg/util/protoutil",
],
)
27 changes: 27 additions & 0 deletions pkg/obsservice/obslib/transform/stmt_insight_transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package transform

import (
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)

type StmtInsightTransformer struct {
}

var _ EventTransformer[*obspb.StatementInsightsStatistics] = (*StmtInsightTransformer)(nil)

func (t *StmtInsightTransformer) Transform(event *obspb.Event) (*obspb.StatementInsightsStatistics, error) {
var insight obspb.StatementInsightsStatistics
if err := protoutil.Unmarshal(event.LogRecord.Body.GetBytesValue(), &insight); err != nil {
return nil, err
}
return &insight, nil
}
6 changes: 5 additions & 1 deletion pkg/obsservice/obslib/validate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "validate",
srcs = ["validate.go"],
srcs = [
"stmt_insights_validator.go",
"validate.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/validate",
visibility = ["//visibility:public"],
deps = ["//pkg/obsservice/obspb"],
)
21 changes: 21 additions & 0 deletions pkg/obsservice/obslib/validate/stmt_insights_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0

package validate

import "github.com/cockroachdb/cockroach/pkg/obsservice/obspb"

type StmtInsightValidator struct {
}

var _ Validator[*obspb.StatementInsightsStatistics] = (*StmtInsightValidator)(nil)

func (t *StmtInsightValidator) Validate(_ *obspb.StatementInsightsStatistics) error {
//TODO: Add proper validation
return nil
}

0 comments on commit 21d61e3

Please sign in to comment.