Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live debugging service with prometheus.relabel #797

Merged
merged 12 commits into from
Jun 3, 2024
3 changes: 3 additions & 0 deletions internal/alloy/componenttest/componenttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

Expand Down Expand Up @@ -163,6 +164,8 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) (
switch name {
case labelstore.ServiceName:
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
case livedebugging.ServiceName:
return livedebugging.NewLiveDebugging(), nil
default:
return nil, fmt.Errorf("no service named %s defined", name)
}
Expand Down
13 changes: 9 additions & 4 deletions internal/alloycli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/alloy/internal/service"
httpservice "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
otel_service "github.com/grafana/alloy/internal/service/otel"
remotecfgservice "github.com/grafana/alloy/internal/service/remotecfg"
uiservice "github.com/grafana/alloy/internal/service/ui"
Expand Down Expand Up @@ -272,8 +273,11 @@ func (fr *alloyRun) Run(configPath string) error {
return fmt.Errorf("failed to create the remotecfg service: %w", err)
}

liveDebuggingService := livedebugging.New()

uiService := uiservice.New(uiservice.Options{
UIPrefix: fr.uiPrefix,
UIPrefix: fr.uiPrefix,
DebugCallbackManager: liveDebuggingService.Data().(livedebugging.DebugCallbackManager),
})

otelService := otel_service.New(l)
Expand All @@ -291,12 +295,13 @@ func (fr *alloyRun) Run(configPath string) error {
Reg: reg,
MinStability: fr.minStability,
Services: []service.Service{
httpService,
uiService,
clusterService,
otelService,
httpService,
labelService,
liveDebuggingService,
wildum marked this conversation as resolved.
Show resolved Hide resolved
otelService,
remoteCfgService,
uiService,
},
})

Expand Down
27 changes: 23 additions & 4 deletions internal/component/prometheus/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
lru "github.com/hashicorp/golang-lru/v2"
prometheus_client "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
Expand All @@ -22,9 +23,11 @@ import (
"go.uber.org/atomic"
)

const name = "prometheus.relabel"

func init() {
component.Register(component.Registration{
Name: "prometheus.relabel",
Name: name,
Stability: featuregate.StabilityGenerallyAvailable,
Args: Arguments{},
Exports: Exports{},
Expand Down Expand Up @@ -85,6 +88,8 @@ type Component struct {
exited atomic.Bool
ls labelstore.LabelStore

debugDataPublisher livedebugging.DebugDataPublisher

cacheMut sync.RWMutex
cache *lru.Cache[uint64, *labelAndID]
}
Expand All @@ -99,14 +104,22 @@ func New(o component.Options, args Arguments) (*Component, error) {
if err != nil {
return nil, err
}

debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName)
if err != nil {
return nil, err
}
debugDataPublisher.(livedebugging.DebugDataPublisher).Register(name)

data, err := o.GetServiceData(labelstore.ServiceName)
if err != nil {
return nil, err
}
c := &Component{
opts: o,
cache: cache,
ls: data.(labelstore.LabelStore),
opts: o,
cache: cache,
ls: data.(labelstore.LabelStore),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}
c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{
Name: "alloy_prometheus_relabel_metrics_processed",
Expand Down Expand Up @@ -259,6 +272,12 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
// Set the cache size to the cache.len
// TODO(@mattdurham): Instead of setting this each time could collect on demand for better performance.
c.cacheSize.Set(float64(c.cache.Len()))

componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lbls.String(), relabelled.String()))
}

return relabelled
}

Expand Down
37 changes: 23 additions & 14 deletions internal/component/prometheus/relabel/relabel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package relabel

import (
"fmt"
"math"
"strconv"
"testing"
Expand All @@ -13,6 +14,7 @@ import (
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax"
prom "github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -67,13 +69,11 @@ func TestNil(t *testing.T) {
return ref, nil
}))
relabeller, err := New(component.Options{
ID: "1",
Logger: util.TestAlloyLogger(t),
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil, prom.DefaultRegisterer), nil
},
ID: "1",
Logger: util.TestAlloyLogger(t),
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: getServiceData,
}, Arguments{
ForwardTo: []storage.Appendable{fanout},
MetricRelabelConfigs: []*alloy_relabel.Config{
Expand Down Expand Up @@ -154,13 +154,11 @@ func generateRelabel(t *testing.T) *Component {
return ref, nil
}))
relabeller, err := New(component.Options{
ID: "1",
Logger: util.TestAlloyLogger(t),
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil, prom.DefaultRegisterer), nil
},
ID: "1",
Logger: util.TestAlloyLogger(t),
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: getServiceData,
}, Arguments{
ForwardTo: []storage.Appendable{fanout},
MetricRelabelConfigs: []*alloy_relabel.Config{
Expand Down Expand Up @@ -225,3 +223,14 @@ func TestRuleGetter(t *testing.T) {
require.Equal(t, gotUpdated[0].SourceLabels, gotOriginal[0].SourceLabels)
require.Equal(t, gotUpdated[0].Regex, gotOriginal[0].Regex)
}

func getServiceData(name string) (interface{}, error) {
switch name {
case labelstore.ServiceName:
return labelstore.New(nil, prom.DefaultRegisterer), nil
case livedebugging.ServiceName:
return livedebugging.NewLiveDebugging(), nil
default:
return nil, fmt.Errorf("service not found %s", name)
}
}
88 changes: 88 additions & 0 deletions internal/service/livedebugging/livedebugging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package livedebugging

import "sync"

type ComponentName string
type ComponentID string
type CallbackID string

// DebugCallbackManager is used to manage live debugging callbacks.
type DebugCallbackManager interface {
DebugRegistry
// AddCallback sets a callback for a given componentID.
// The callback is used to send debugging data to live debugging consumers.
AddCallback(callbackID CallbackID, componentID ComponentID, callback func(string))
// DeleteCallback deletes a callback for a given componentID.
DeleteCallback(callbackID CallbackID, componentID ComponentID)
}

// DebugDataPublisher is used by components to push information to live debugging consumers.
type DebugDataPublisher interface {
DebugRegistry
// Publish sends debugging data for a given componentID.
Publish(componentID ComponentID, data string)
// IsActive returns true when at least one consumer is listening for debugging data for the given componentID.
IsActive(componentID ComponentID) bool
}

// DebugRegistry is used to keep track of the components that supports the live debugging functionality.
type DebugRegistry interface {
// Register a component by name.
Register(componentName ComponentName)
// IsRegistered returns true if a component has live debugging support.
IsRegistered(componentName ComponentName) bool
}

type liveDebugging struct {
loadMut sync.RWMutex
callbacks map[ComponentID]map[CallbackID]func(string)
registeredComponents map[ComponentName]struct{}
}

var _ DebugCallbackManager = &liveDebugging{}
var _ DebugDataPublisher = &liveDebugging{}

// NewLiveDebugging creates a new instance of liveDebugging.
func NewLiveDebugging() *liveDebugging {
return &liveDebugging{
callbacks: make(map[ComponentID]map[CallbackID]func(string)),
registeredComponents: make(map[ComponentName]struct{}),
}
}

func (s *liveDebugging) Publish(componentID ComponentID, data string) {
s.loadMut.RLock()
defer s.loadMut.RUnlock()
for _, callback := range s.callbacks[componentID] {
callback(data)
}
}

func (s *liveDebugging) IsActive(componentID ComponentID) bool {
_, exist := s.callbacks[componentID]
return exist
}

func (s *liveDebugging) AddCallback(callbackID CallbackID, componentID ComponentID, callback func(string)) {
s.loadMut.Lock()
defer s.loadMut.Unlock()
if _, ok := s.callbacks[componentID]; !ok {
s.callbacks[componentID] = make(map[CallbackID]func(string))
}
s.callbacks[componentID][callbackID] = callback
}

func (s *liveDebugging) DeleteCallback(callbackID CallbackID, componentID ComponentID) {
s.loadMut.Lock()
defer s.loadMut.Unlock()
delete(s.callbacks[componentID], callbackID)
}

func (s *liveDebugging) Register(componentName ComponentName) {
s.registeredComponents[componentName] = struct{}{}
}

func (s *liveDebugging) IsRegistered(componentName ComponentName) bool {
_, exist := s.registeredComponents[componentName]
return exist
}
92 changes: 92 additions & 0 deletions internal/service/livedebugging/livedebugging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package livedebugging

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestRegister(t *testing.T) {
livedebugging := NewLiveDebugging()
require.False(t, livedebugging.IsRegistered("type1"))
livedebugging.Register("type1")
require.True(t, livedebugging.IsRegistered("type1"))
// registering a component name that has already been registered does not do anything
require.NotPanics(t, func() { livedebugging.Register("type1") })
}

func TestStream(t *testing.T) {
livedebugging := NewLiveDebugging()
componentID := ComponentID("component1")
CallbackID := CallbackID("callback1")

var receivedData string
callback := func(data string) {
receivedData = data
}
require.False(t, livedebugging.IsActive(componentID))
livedebugging.AddCallback(CallbackID, componentID, callback)
require.True(t, livedebugging.IsActive(componentID))
require.Len(t, livedebugging.callbacks[componentID], 1)

livedebugging.Publish(componentID, "test data")
require.Equal(t, "test data", receivedData)
}

func TestStreamEmpty(t *testing.T) {
livedebugging := NewLiveDebugging()
componentID := ComponentID("component1")
require.NotPanics(t, func() { livedebugging.Publish(componentID, "test data") })
}

func TestMultipleStreams(t *testing.T) {
livedebugging := NewLiveDebugging()
componentID := ComponentID("component1")
callbackID1 := CallbackID("callback1")
callbackID2 := CallbackID("callback2")

var receivedData1 string
callback1 := func(data string) {
receivedData1 = data
}

var receivedData2 string
callback2 := func(data string) {
receivedData2 = data
}

livedebugging.AddCallback(callbackID1, componentID, callback1)
livedebugging.AddCallback(callbackID2, componentID, callback2)
require.Len(t, livedebugging.callbacks[componentID], 2)

livedebugging.Publish(componentID, "test data")
require.Equal(t, "test data", receivedData1)
require.Equal(t, "test data", receivedData2)
}

func TestDeleteCallback(t *testing.T) {
livedebugging := NewLiveDebugging()
componentID := ComponentID("component1")
callbackID1 := CallbackID("callback1")
callbackID2 := CallbackID("callback2")

callback1 := func(data string) {}
callback2 := func(data string) {}

livedebugging.AddCallback(callbackID1, componentID, callback1)
livedebugging.AddCallback(callbackID2, componentID, callback2)
require.Len(t, livedebugging.callbacks[componentID], 2)

// Deleting callbacks that don't exist should not panic
require.NotPanics(t, func() { livedebugging.DeleteCallback(callbackID1, "fakeComponentID") })
require.NotPanics(t, func() { livedebugging.DeleteCallback("fakeCallbackID", componentID) })

livedebugging.AddCallback(callbackID1, componentID, callback1)
livedebugging.AddCallback(callbackID2, componentID, callback2)

livedebugging.DeleteCallback(callbackID1, componentID)
require.Len(t, livedebugging.callbacks[componentID], 1)

livedebugging.DeleteCallback(callbackID2, componentID)
require.Empty(t, livedebugging.callbacks[componentID])
}