Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow: add loki.source.azure_event_hubs component #3412

Merged
merged 17 commits into from Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Expand Up @@ -52,7 +52,9 @@ Main (unreleased)
for making requests to AWS services via `otelcol` components that support
authentication extensions. (@ptodev)
- `prometheus.exporter.memcached` collects metrics from a Memcached server. (@spartan0x117)

- `loki.source.azure_event_hubs` reads messages from Azure Event Hub using Kafka and forwards them to other `loki`
components. (@akselleirv)

- Add support for Flow-specific system packages:

- Flow-specific DEB packages. (@rfratto, @robigan)
Expand All @@ -77,6 +79,8 @@ Main (unreleased)

- Update Loki dependency to the k142 branch. (@rfratto)

- Flow: Add OAUTHBEARER mechanism to `loki.source.kafka` using Azure as provider. (@akselleirv)

### Bugfixes

- Flow: fix issue where Flow would return an error when trying to access a key
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Expand Up @@ -11,6 +11,7 @@ import (
_ "github.com/grafana/agent/component/loki/echo" // Import loki.echo
_ "github.com/grafana/agent/component/loki/process" // Import loki.process
_ "github.com/grafana/agent/component/loki/relabel" // Import loki.relabel
_ "github.com/grafana/agent/component/loki/source/azure_event_hubs" // Import loki.source.azure_event_hubs
_ "github.com/grafana/agent/component/loki/source/cloudflare" // Import loki.source.cloudflare
_ "github.com/grafana/agent/component/loki/source/docker" // Import loki.source.docker
_ "github.com/grafana/agent/component/loki/source/file" // Import loki.source.file
Expand Down
223 changes: 223 additions & 0 deletions component/loki/source/azure_event_hubs/azure_event_hubs.go
@@ -0,0 +1,223 @@
package azure_event_hubs

import (
"context"
"fmt"
"net"
"sync"

"github.com/Shopify/sarama"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/azure_event_hubs/internal/parser"
kt "github.com/grafana/agent/component/loki/source/internal/kafkatarget"
"github.com/grafana/dskit/flagext"

"github.com/prometheus/common/model"
)

func init() {
component.Register(component.Registration{
Name: "loki.source.azure_event_hubs",
Args: Arguments{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}

// Arguments holds values which are used to configure the loki.source.azure_event_hubs component.
type Arguments struct {
FullyQualifiedNamespace string `river:"fully_qualified_namespace,attr"`
EventHubs []string `river:"event_hubs,attr"`

Authentication AzureEventHubsAuthentication `river:"authentication,block"`

GroupID string `river:"group_id,attr,optional"`
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
DisallowCustomMessages bool `river:"disallow_custom_messages,attr,optional"`
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
Labels map[string]string `river:"labels,attr,optional"`
Assignor string `river:"assignor,attr,optional"`

ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
}

// AzureEventHubsAuthentication describe the configuration for authentication with Azure Event Hub
type AzureEventHubsAuthentication struct {
Mechanism string `river:"mechanism,attr"`
Scopes []string `river:"scopes,attr,optional"`
ConnectionString string `river:"connection_string,attr,optional"`
}

func getDefault() Arguments {
return Arguments{
GroupID: "loki.source.azure_event_hubs",
Labels: map[string]string{"job": "loki.source.azure_event_hubs"},
Assignor: "range",
}
}

// UnmarshalRiver implements river.Unmarshaler.
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*a = getDefault()
type arguments Arguments
if err := f((*arguments)(a)); err != nil {
return err
}
return a.validateAssignor()
}

// New creates a new loki.source.azure_event_hubs component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
mut: sync.RWMutex{},
opts: o,
handler: make(loki.LogsReceiver),
fanout: args.ForwardTo,
}

// Call to Update() to start readers and set receivers once at the start.
if err := c.Update(args); err != nil {
return nil, err
}

return c, nil
}

// Component implements the loki.source.azure_event_hubs component.
type Component struct {
opts component.Options
mut sync.RWMutex
fanout []loki.LogsReceiver
handler loki.LogsReceiver
target *kt.TargetSyncer
}

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
defer func() {
level.Info(c.opts.Logger).Log("msg", "loki.source.azure_event_hubs component shutting down, stopping the targets")
c.mut.RLock()
err := c.target.Stop()
if err != nil {
level.Error(c.opts.Logger).Log("msg", "error while stopping azure_event_hubs target", "err", err)
}
c.mut.RUnlock()
}()

for {
select {
case <-ctx.Done():
return nil
case entry := <-c.handler:
c.mut.RLock()
for _, receiver := range c.fanout {
receiver <- entry
}
c.mut.RUnlock()
}
}
}

const (
AuthenticationMechanismConnectionString = "connection_string"
AuthenticationMechanismOAuth = "oauth"
)

// Update implements component.Component.
func (c *Component) Update(args component.Arguments) error {
c.mut.Lock()
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.fanout = newArgs.ForwardTo

cfg, err := newArgs.Convert()
if err != nil {
return err
}

entryHandler := loki.NewEntryHandler(c.handler, func() {})
t, err := kt.NewSyncer(c.opts.Registerer, c.opts.Logger, cfg, entryHandler, &parser.AzureEventHubsTargetMessageParser{
DisallowCustomMessages: newArgs.DisallowCustomMessages,
})
if err != nil {
return fmt.Errorf("error starting azure_event_hubs target: %w", err)
}
c.target = t

return nil
}

// Convert is used to bridge between the River and Promtail types.
func (a *Arguments) Convert() (kt.Config, error) {
lbls := make(model.LabelSet, len(a.Labels))
for k, v := range a.Labels {
lbls[model.LabelName(k)] = model.LabelValue(v)
}

cfg := kt.Config{
RelabelConfigs: flow_relabel.ComponentToPromRelabelConfigs(a.RelabelRules),
KafkaConfig: kt.TargetConfig{
Brokers: []string{a.FullyQualifiedNamespace},
Topics: a.EventHubs,
Labels: lbls,
UseIncomingTimestamp: a.UseIncomingTimestamp,
GroupID: a.GroupID,
Version: sarama.V1_0_0_0.String(),
Assignor: a.Assignor,
},
}
switch a.Authentication.Mechanism {
case AuthenticationMechanismConnectionString:
if a.Authentication.ConnectionString == "" {
return kt.Config{}, fmt.Errorf("connection string is required when authentication mechanism is %s", a.Authentication.Mechanism)
}
cfg.KafkaConfig.Authentication = kt.Authentication{
Type: kt.AuthenticationTypeSASL,
SASLConfig: kt.SASLConfig{
UseTLS: true,
User: "$ConnectionString",
Password: flagext.SecretWithValue(a.Authentication.ConnectionString),
Mechanism: sarama.SASLTypePlaintext,
},
}
case AuthenticationMechanismOAuth:
if a.Authentication.Scopes == nil {
host, _, err := net.SplitHostPort(a.FullyQualifiedNamespace)
if err != nil {
return kt.Config{}, fmt.Errorf("unable to extract host from fully qualified namespace: %w", err)
}
a.Authentication.Scopes = []string{fmt.Sprintf("https://%s", host)}
}

cfg.KafkaConfig.Authentication = kt.Authentication{
Type: kt.AuthenticationTypeSASL,
SASLConfig: kt.SASLConfig{
UseTLS: true,
Mechanism: sarama.SASLTypeOAuth,
OAuthConfig: kt.OAuthConfig{
TokenProvider: kt.TokenProviderTypeAzure,
Scopes: a.Authentication.Scopes,
},
},
}
default:
return kt.Config{}, fmt.Errorf("authentication mechanism %s is unsupported", a.Authentication.Mechanism)
}
return cfg, nil
}

func (a *Arguments) validateAssignor() error {
validAssignors := []string{sarama.StickyBalanceStrategyName, sarama.RoundRobinBalanceStrategyName, sarama.RangeBalanceStrategyName}
for _, validAssignor := range validAssignors {
if a.Assignor == validAssignor {
return nil
}
}
return fmt.Errorf("assignor value %s is invalid, must be one of: %v", a.Assignor, validAssignors)
}
62 changes: 62 additions & 0 deletions component/loki/source/azure_event_hubs/azure_event_hubs_test.go
@@ -0,0 +1,62 @@
package azure_event_hubs

import (
"testing"

"github.com/grafana/agent/pkg/river"
"github.com/stretchr/testify/require"
)

func TestRiverConfigOAuth(t *testing.T) {
var exampleRiverConfig = `

fully_qualified_namespace = "my-ns.servicebus.windows.net:9093"
event_hubs = ["test"]
forward_to = []

authentication {
mechanism = "oauth"
}
`

var args Arguments
err := river.Unmarshal([]byte(exampleRiverConfig), &args)
require.NoError(t, err)
}

func TestRiverConfigConnectionString(t *testing.T) {
var exampleRiverConfig = `

fully_qualified_namespace = "my-ns.servicebus.windows.net:9093"
event_hubs = ["test"]
forward_to = []

authentication {
mechanism = "connection_string"
connection_string = "my-conn-string"
}
`

var args Arguments
err := river.Unmarshal([]byte(exampleRiverConfig), &args)
require.NoError(t, err)
}

func TestRiverConfigValidateAssignor(t *testing.T) {
var exampleRiverConfig = `

fully_qualified_namespace = "my-ns.servicebus.windows.net:9093"
event_hubs = ["test"]
forward_to = []
assignor = "invalid-value"

authentication {
mechanism = "connection_string"
connection_string = "my-conn-string"
}
`

var args Arguments
err := river.Unmarshal([]byte(exampleRiverConfig), &args)
require.EqualError(t, err, "assignor value invalid-value is invalid, must be one of: [sticky roundrobin range]")
}