diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f422d88ce9a..e8d27255ffeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,7 +52,9 @@ Main (unreleased) for making requests to AWS services via `otelcol` components that support authentication extensions. (@ptodev) - `prometheus.exporter.memcached` collects metrics from a Memcached server. (@spartan0x117) - + - `loki.source.azure_event_hubs` reads messages from Azure Event Hub using Kafka and forwards them to other `loki` + components. (@akselleirv) + - Add support for Flow-specific system packages: - Flow-specific DEB packages. (@rfratto, @robigan) @@ -77,6 +79,8 @@ Main (unreleased) - Update Loki dependency to the k142 branch. (@rfratto) +- Flow: Add OAUTHBEARER mechanism to `loki.source.kafka` using Azure as provider. (@akselleirv) + ### Bugfixes - Flow: fix issue where Flow would return an error when trying to access a key diff --git a/component/all/all.go b/component/all/all.go index 9757818ef402..da905c43bf9b 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -11,6 +11,7 @@ import ( _ "github.com/grafana/agent/component/loki/echo" // Import loki.echo _ "github.com/grafana/agent/component/loki/process" // Import loki.process _ "github.com/grafana/agent/component/loki/relabel" // Import loki.relabel + _ "github.com/grafana/agent/component/loki/source/azure_event_hubs" // Import loki.source.azure_event_hubs _ "github.com/grafana/agent/component/loki/source/cloudflare" // Import loki.source.cloudflare _ "github.com/grafana/agent/component/loki/source/docker" // Import loki.source.docker _ "github.com/grafana/agent/component/loki/source/file" // Import loki.source.file diff --git a/component/loki/source/azure_event_hubs/azure_event_hubs.go b/component/loki/source/azure_event_hubs/azure_event_hubs.go new file mode 100644 index 000000000000..d60f1101ff16 --- /dev/null +++ b/component/loki/source/azure_event_hubs/azure_event_hubs.go @@ -0,0 +1,223 @@ +package azure_event_hubs + +import ( + "context" + "fmt" + "net" + "sync" + + "github.com/Shopify/sarama" + "github.com/go-kit/log/level" + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/common/loki" + flow_relabel "github.com/grafana/agent/component/common/relabel" + "github.com/grafana/agent/component/loki/source/azure_event_hubs/internal/parser" + kt "github.com/grafana/agent/component/loki/source/internal/kafkatarget" + "github.com/grafana/dskit/flagext" + + "github.com/prometheus/common/model" +) + +func init() { + component.Register(component.Registration{ + Name: "loki.source.azure_event_hubs", + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +// Arguments holds values which are used to configure the loki.source.azure_event_hubs component. +type Arguments struct { + FullyQualifiedNamespace string `river:"fully_qualified_namespace,attr"` + EventHubs []string `river:"event_hubs,attr"` + + Authentication AzureEventHubsAuthentication `river:"authentication,block"` + + GroupID string `river:"group_id,attr,optional"` + UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"` + DisallowCustomMessages bool `river:"disallow_custom_messages,attr,optional"` + RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"` + Labels map[string]string `river:"labels,attr,optional"` + Assignor string `river:"assignor,attr,optional"` + + ForwardTo []loki.LogsReceiver `river:"forward_to,attr"` +} + +// AzureEventHubsAuthentication describe the configuration for authentication with Azure Event Hub +type AzureEventHubsAuthentication struct { + Mechanism string `river:"mechanism,attr"` + Scopes []string `river:"scopes,attr,optional"` + ConnectionString string `river:"connection_string,attr,optional"` +} + +func getDefault() Arguments { + return Arguments{ + GroupID: "loki.source.azure_event_hubs", + Labels: map[string]string{"job": "loki.source.azure_event_hubs"}, + Assignor: "range", + } +} + +// UnmarshalRiver implements river.Unmarshaler. +func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error { + *a = getDefault() + type arguments Arguments + if err := f((*arguments)(a)); err != nil { + return err + } + return a.validateAssignor() +} + +// New creates a new loki.source.azure_event_hubs component. +func New(o component.Options, args Arguments) (*Component, error) { + c := &Component{ + mut: sync.RWMutex{}, + opts: o, + handler: make(loki.LogsReceiver), + fanout: args.ForwardTo, + } + + // Call to Update() to start readers and set receivers once at the start. + if err := c.Update(args); err != nil { + return nil, err + } + + return c, nil +} + +// Component implements the loki.source.azure_event_hubs component. +type Component struct { + opts component.Options + mut sync.RWMutex + fanout []loki.LogsReceiver + handler loki.LogsReceiver + target *kt.TargetSyncer +} + +// Run implements component.Component. +func (c *Component) Run(ctx context.Context) error { + defer func() { + level.Info(c.opts.Logger).Log("msg", "loki.source.azure_event_hubs component shutting down, stopping the targets") + c.mut.RLock() + err := c.target.Stop() + if err != nil { + level.Error(c.opts.Logger).Log("msg", "error while stopping azure_event_hubs target", "err", err) + } + c.mut.RUnlock() + }() + + for { + select { + case <-ctx.Done(): + return nil + case entry := <-c.handler: + c.mut.RLock() + for _, receiver := range c.fanout { + receiver <- entry + } + c.mut.RUnlock() + } + } +} + +const ( + AuthenticationMechanismConnectionString = "connection_string" + AuthenticationMechanismOAuth = "oauth" +) + +// Update implements component.Component. +func (c *Component) Update(args component.Arguments) error { + c.mut.Lock() + defer c.mut.Unlock() + + newArgs := args.(Arguments) + c.fanout = newArgs.ForwardTo + + cfg, err := newArgs.Convert() + if err != nil { + return err + } + + entryHandler := loki.NewEntryHandler(c.handler, func() {}) + t, err := kt.NewSyncer(c.opts.Registerer, c.opts.Logger, cfg, entryHandler, &parser.AzureEventHubsTargetMessageParser{ + DisallowCustomMessages: newArgs.DisallowCustomMessages, + }) + if err != nil { + return fmt.Errorf("error starting azure_event_hubs target: %w", err) + } + c.target = t + + return nil +} + +// Convert is used to bridge between the River and Promtail types. +func (a *Arguments) Convert() (kt.Config, error) { + lbls := make(model.LabelSet, len(a.Labels)) + for k, v := range a.Labels { + lbls[model.LabelName(k)] = model.LabelValue(v) + } + + cfg := kt.Config{ + RelabelConfigs: flow_relabel.ComponentToPromRelabelConfigs(a.RelabelRules), + KafkaConfig: kt.TargetConfig{ + Brokers: []string{a.FullyQualifiedNamespace}, + Topics: a.EventHubs, + Labels: lbls, + UseIncomingTimestamp: a.UseIncomingTimestamp, + GroupID: a.GroupID, + Version: sarama.V1_0_0_0.String(), + Assignor: a.Assignor, + }, + } + switch a.Authentication.Mechanism { + case AuthenticationMechanismConnectionString: + if a.Authentication.ConnectionString == "" { + return kt.Config{}, fmt.Errorf("connection string is required when authentication mechanism is %s", a.Authentication.Mechanism) + } + cfg.KafkaConfig.Authentication = kt.Authentication{ + Type: kt.AuthenticationTypeSASL, + SASLConfig: kt.SASLConfig{ + UseTLS: true, + User: "$ConnectionString", + Password: flagext.SecretWithValue(a.Authentication.ConnectionString), + Mechanism: sarama.SASLTypePlaintext, + }, + } + case AuthenticationMechanismOAuth: + if a.Authentication.Scopes == nil { + host, _, err := net.SplitHostPort(a.FullyQualifiedNamespace) + if err != nil { + return kt.Config{}, fmt.Errorf("unable to extract host from fully qualified namespace: %w", err) + } + a.Authentication.Scopes = []string{fmt.Sprintf("https://%s", host)} + } + + cfg.KafkaConfig.Authentication = kt.Authentication{ + Type: kt.AuthenticationTypeSASL, + SASLConfig: kt.SASLConfig{ + UseTLS: true, + Mechanism: sarama.SASLTypeOAuth, + OAuthConfig: kt.OAuthConfig{ + TokenProvider: kt.TokenProviderTypeAzure, + Scopes: a.Authentication.Scopes, + }, + }, + } + default: + return kt.Config{}, fmt.Errorf("authentication mechanism %s is unsupported", a.Authentication.Mechanism) + } + return cfg, nil +} + +func (a *Arguments) validateAssignor() error { + validAssignors := []string{sarama.StickyBalanceStrategyName, sarama.RoundRobinBalanceStrategyName, sarama.RangeBalanceStrategyName} + for _, validAssignor := range validAssignors { + if a.Assignor == validAssignor { + return nil + } + } + return fmt.Errorf("assignor value %s is invalid, must be one of: %v", a.Assignor, validAssignors) +} diff --git a/component/loki/source/azure_event_hubs/azure_event_hubs_test.go b/component/loki/source/azure_event_hubs/azure_event_hubs_test.go new file mode 100644 index 000000000000..f61ae2cb2482 --- /dev/null +++ b/component/loki/source/azure_event_hubs/azure_event_hubs_test.go @@ -0,0 +1,62 @@ +package azure_event_hubs + +import ( + "testing" + + "github.com/grafana/agent/pkg/river" + "github.com/stretchr/testify/require" +) + +func TestRiverConfigOAuth(t *testing.T) { + var exampleRiverConfig = ` + + fully_qualified_namespace = "my-ns.servicebus.windows.net:9093" + event_hubs = ["test"] + forward_to = [] + + authentication { + mechanism = "oauth" + } +` + + var args Arguments + err := river.Unmarshal([]byte(exampleRiverConfig), &args) + require.NoError(t, err) +} + +func TestRiverConfigConnectionString(t *testing.T) { + var exampleRiverConfig = ` + + fully_qualified_namespace = "my-ns.servicebus.windows.net:9093" + event_hubs = ["test"] + forward_to = [] + + authentication { + mechanism = "connection_string" + connection_string = "my-conn-string" + } +` + + var args Arguments + err := river.Unmarshal([]byte(exampleRiverConfig), &args) + require.NoError(t, err) +} + +func TestRiverConfigValidateAssignor(t *testing.T) { + var exampleRiverConfig = ` + + fully_qualified_namespace = "my-ns.servicebus.windows.net:9093" + event_hubs = ["test"] + forward_to = [] + assignor = "invalid-value" + + authentication { + mechanism = "connection_string" + connection_string = "my-conn-string" + } +` + + var args Arguments + err := river.Unmarshal([]byte(exampleRiverConfig), &args) + require.EqualError(t, err, "assignor value invalid-value is invalid, must be one of: [sticky roundrobin range]") +} diff --git a/component/loki/source/azure_event_hubs/internal/parser/parser.go b/component/loki/source/azure_event_hubs/internal/parser/parser.go new file mode 100644 index 000000000000..e82dbd37269d --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/parser.go @@ -0,0 +1,201 @@ +package parser + +// This code is copied from Promtail. The parser package is used to +// enable parsing entries from Azure Event Hubs entries and forward them +// to other loki components. + +import ( + "bytes" + "encoding/json" + "errors" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/grafana/agent/component/common/loki" + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" +) + +type azureMonitorResourceLogs struct { + Records []json.RawMessage `json:"records"` +} + +// validate check if message contains records +func (l azureMonitorResourceLogs) validate() error { + if len(l.Records) == 0 { + return errors.New("records are empty") + } + + return nil +} + +// azureMonitorResourceLog used to unmarshal common schema for Azure resource logs +// https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema +type azureMonitorResourceLog struct { + Time string `json:"time"` + Category string `json:"category"` + ResourceID string `json:"resourceId"` + OperationName string `json:"operationName"` +} + +// validate check if fields marked as required by schema for Azure resource log are not empty +func (l azureMonitorResourceLog) validate() error { + valid := len(l.Time) != 0 && + len(l.Category) != 0 && + len(l.ResourceID) != 0 && + len(l.OperationName) != 0 + + if !valid { + return errors.New("required field or fields is empty") + } + + return nil +} + +type AzureEventHubsTargetMessageParser struct { + DisallowCustomMessages bool +} + +func (e *AzureEventHubsTargetMessageParser) Parse(message *sarama.ConsumerMessage, labelSet model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool) ([]loki.Entry, error) { + messageTime := time.Now() + if useIncomingTimestamp { + messageTime = message.Timestamp + } + + data, err := e.tryUnmarshal(message.Value) + if err == nil { + err = data.validate() + } + + if err != nil { + if e.DisallowCustomMessages { + return []loki.Entry{}, err + } + + return []loki.Entry{e.entryWithCustomPayload(message.Value, labelSet, messageTime)}, nil + } + + return e.processRecords(labelSet, relabels, useIncomingTimestamp, data.Records, messageTime) +} + +// tryUnmarshal tries to unmarshal raw message data, in case of error tries to fix it and unmarshal fixed data. +// If both attempts fail, return the initial unmarshal error. +func (e *AzureEventHubsTargetMessageParser) tryUnmarshal(message []byte) (*azureMonitorResourceLogs, error) { + data := &azureMonitorResourceLogs{} + err := json.Unmarshal(message, data) + if err == nil { + return data, nil + } + + // try fix json as mentioned here: + // https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps?fbclid=IwAR3pK8Nj60GFBtKemqwfpiZyf3rerjowPH_j_qIuNrw_uLDesYvC4mTkfgs + body := bytes.ReplaceAll(message, []byte(`'`), []byte(`"`)) + if json.Unmarshal(body, data) != nil { + // return original error + return nil, err + } + + return data, nil +} + +func (e *AzureEventHubsTargetMessageParser) entryWithCustomPayload(body []byte, labelSet model.LabelSet, messageTime time.Time) loki.Entry { + return loki.Entry{ + Labels: labelSet, + Entry: logproto.Entry{ + Timestamp: messageTime, + Line: string(body), + }, + } +} + +// processRecords handles the case when message is a valid json with a key `records`. It can be either a custom payload or a resource log. +func (e *AzureEventHubsTargetMessageParser) processRecords(labelSet model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool, records []json.RawMessage, messageTime time.Time) ([]loki.Entry, error) { + result := make([]loki.Entry, 0, len(records)) + for _, m := range records { + entry, err := e.parseRecord(m, labelSet, relabels, useIncomingTimestamp, messageTime) + if err != nil { + return nil, err + } + result = append(result, entry) + } + + return result, nil +} + +// parseRecord parses a single value from the "records" in the original message. +// It can also handle a case when the record contains custom data and doesn't match the schema for Azure resource logs. +func (e *AzureEventHubsTargetMessageParser) parseRecord(record []byte, labelSet model.LabelSet, relabelConfig []*relabel.Config, useIncomingTimestamp bool, messageTime time.Time) (loki.Entry, error) { + logRecord := &azureMonitorResourceLog{} + err := json.Unmarshal(record, logRecord) + if err == nil { + err = logRecord.validate() + } + + if err != nil { + if e.DisallowCustomMessages { + return loki.Entry{}, err + } + + return e.entryWithCustomPayload(record, labelSet, messageTime), nil + } + + logLabels := e.getLabels(logRecord, relabelConfig) + ts := e.getTime(messageTime, useIncomingTimestamp, logRecord) + + return loki.Entry{ + Labels: labelSet.Merge(logLabels), + Entry: logproto.Entry{ + Timestamp: ts, + Line: string(record), + }, + }, nil +} + +func (e *AzureEventHubsTargetMessageParser) getTime(messageTime time.Time, useIncomingTimestamp bool, logRecord *azureMonitorResourceLog) time.Time { + if !useIncomingTimestamp || logRecord.Time == "" { + return messageTime + } + + recordTime, err := time.Parse(time.RFC3339, logRecord.Time) + if err != nil { + return messageTime + } + + return recordTime +} + +func (e *AzureEventHubsTargetMessageParser) getLabels(logRecord *azureMonitorResourceLog, relabelConfig []*relabel.Config) model.LabelSet { + lbs := labels.Labels{ + { + Name: "__azure_event_hubs_category", + Value: logRecord.Category, + }, + } + + var processed labels.Labels + // apply relabeling + if len(relabelConfig) > 0 { + processed, _ = relabel.Process(lbs, relabelConfig...) + } else { + processed = lbs + } + + // final labelset that will be sent to loki + resultLabels := make(model.LabelSet) + for _, lbl := range processed { + // ignore internal labels + if strings.HasPrefix(lbl.Name, "__") { + continue + } + // ignore invalid labels + if !model.LabelName(lbl.Name).IsValid() || !model.LabelValue(lbl.Value).IsValid() { + continue + } + resultLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + + return resultLabels +} diff --git a/component/loki/source/azure_event_hubs/internal/parser/parser_test.go b/component/loki/source/azure_event_hubs/internal/parser/parser_test.go new file mode 100644 index 000000000000..e3c2351955dc --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/parser_test.go @@ -0,0 +1,264 @@ +package parser + +import ( + "os" + "runtime" + "strings" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/assert" +) + +func Test_parseMessage_function_app(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/function_app_logs_message.txt"), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 2) + + expectedLine1 := "{ \"time\": \"2023-03-08T12:06:46Z\",\n\t\"resourceId\": \"AZURE-FUNC-APP\",\n\t\"category\": \"FunctionAppLogs\",\n\t\"operationName\": \"Microsoft.Web/sites/functions/log\",\n\t\"level\": \"Informational\",\n\t\"location\": \"My Location\",\n\t\"properties\": {\n\t\"appName\":\"\",\n\t\"roleInstance\":\"123123123123\",\n\t\"message\":\"Loading functions metadata\",\n\t\"category\":\"Host.Startup\",\n\t\"hostVersion\":\"X.XX.X.X\",\n\t\"hostInstanceId\":\"myInstance\",\n\t\"level\":\"Information\",\n\t\"levelId\":2,\n\t\"processId\":155,\n\t\"eventId\":3143,\n\t\"eventName\":\"FunctionMetadataManagerLoadingFunctionsMetadata\"\n\t}\n\t}" + expectedLine2 := "{ \"time\": \"2023-03-08T12:06:47Z\",\n\t\"resourceId\": \"AZURE-FUNC-APP-2\",\n\t\"category\": \"FunctionAppLogs\",\n\t\"operationName\": \"Microsoft.Web/sites/functions/log\",\n\t\"level\": \"Informational\",\n\t\"location\": \"My Location\",\n\t\"properties\": {\n\t\"appName\":\"\",\n\t\"roleInstance\":\"123123123123\",\n\t\"message\":\"Loading functions metadata\",\n\t\"category\":\"Host.Startup\",\n\t\"hostVersion\":\"X.XX.X.X\",\n\t\"hostInstanceId\":\"myInstance\",\n\t\"level\":\"Information\",\n\t\"levelId\":2,\n\t\"processId\":155,\n\t\"eventId\":3143,\n\t\"eventName\":\"FunctionMetadataManagerLoadingFunctionsMetadata\"\n\t}\n\t}" + assert.Equal(t, expectedLine1, removeCRLFIfWindows(entries[0].Line)) + assert.Equal(t, expectedLine2, removeCRLFIfWindows(entries[1].Line)) + + assert.Equal(t, time.Date(2023, time.March, 8, 12, 6, 46, 0, time.UTC), entries[0].Timestamp) + assert.Equal(t, time.Date(2023, time.March, 8, 12, 6, 47, 0, time.UTC), entries[1].Timestamp) +} + +func Test_parseMessage_logic_app(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/logic_app_logs_message.json"), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 2) + + expectedLine1 := "{\n \"time\": \"2023-03-17T08:44:02.8921579Z\",\n \"workflowId\": \"/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP\",\n \"resourceId\": \"/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP/RUNS/11111/TRIGGERS/MANUAL\",\n \"category\": \"WorkflowRuntime\",\n \"level\": \"Information\",\n \"operationName\": \"Microsoft.Logic/workflows/workflowTriggerStarted\",\n \"properties\": {\n \"$schema\": \"2016-06-01\",\n \"startTime\": \"2023-03-17T08:44:02.8358364Z\",\n \"status\": \"Succeeded\",\n \"fired\": true,\n \"resource\": {\n \"subscriptionId\": \"someSubscriptionId\",\n \"resourceGroupName\": \"AzureLogsTesting\",\n \"workflowId\": \"someWorkflowId\",\n \"workflowName\": \"auzre-promtail-testing-app\",\n \"runId\": \"someRunId\",\n \"location\": \"eastus\",\n \"triggerName\": \"manual\"\n },\n \"correlation\": {\n \"clientTrackingId\": \"someClientTrackingId\"\n },\n \"api\": {}\n },\n \"location\": \"eastus\"\n }" + expectedLine2 := "{\n \"time\": \"2023-03-17T08:44:03.2036497Z\",\n \"workflowId\": \"/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP\",\n \"resourceId\": \"/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP/RUNS/11111/TRIGGERS/MANUAL\",\n \"category\": \"WorkflowRuntime\",\n \"level\": \"Information\",\n \"operationName\": \"Microsoft.Logic/workflows/workflowTriggerCompleted\",\n \"properties\": {\n \"$schema\": \"2016-06-01\",\n \"startTime\": \"2023-03-17T08:44:02.8358364Z\",\n \"endTime\": \"2023-03-17T08:44:02.8983217Z\",\n \"status\": \"Succeeded\",\n \"fired\": true,\n \"resource\": {\n \"subscriptionId\": \"someSubscriptionId\",\n \"resourceGroupName\": \"AzureLogsTesting\",\n \"workflowId\": \"someWorkflowId\",\n \"workflowName\": \"auzre-promtail-testing-app\",\n \"runId\": \"someRunId\",\n \"location\": \"eastus\",\n \"triggerName\": \"manual\"\n },\n \"correlation\": {\n \"clientTrackingId\": \"someClientTrackingId\"\n },\n \"api\": {}\n },\n \"location\": \"eastus\"\n }" + assert.Equal(t, expectedLine1, removeCRLFIfWindows(entries[0].Line)) + assert.Equal(t, expectedLine2, removeCRLFIfWindows(entries[1].Line)) + + assert.Equal(t, time.Date(2023, time.March, 17, 8, 44, 02, 892157900, time.UTC), entries[0].Timestamp) + assert.Equal(t, time.Date(2023, time.March, 17, 8, 44, 03, 203649700, time.UTC), entries[1].Timestamp) +} + +func Test_parseMessage_custom_payload_text(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_payload_text.txt"), + Timestamp: time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 1) + + expectedLine := "Message with quotes \" `" + assert.Equal(t, expectedLine, removeCRLFIfWindows(entries[0].Line)) + + assert.Equal(t, time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), entries[0].Timestamp) +} + +func Test_parseMessage_custom_payload_text_error(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{ + DisallowCustomMessages: true, + } + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_payload_text.txt"), + } + + _, err := messageParser.Parse(message, nil, nil, true) + assert.Error(t, err) +} + +func Test_parseMessage_custom_payload_json(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_payload_json.json"), + Timestamp: time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 1) + + expectedLine := `{"json":"I am valid json"}` + assert.Equal(t, expectedLine, removeCRLFIfWindows(entries[0].Line)) + + assert.Equal(t, time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), entries[0].Timestamp) +} + +func Test_parseMessage_custom_payload_json_with_records_string(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_payload_json_with_records_string.json"), + Timestamp: time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 1) + + expectedLine := `{"records":"I am valid json"}` + assert.Equal(t, expectedLine, removeCRLFIfWindows(entries[0].Line)) + + assert.Equal(t, time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), entries[0].Timestamp) +} + +func Test_parseMessage_custom_payload_json_with_records_string_custom_payload_not_allowed(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{ + DisallowCustomMessages: true, + } + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_payload_json_with_records_string.json"), + } + + _, err := messageParser.Parse(message, nil, nil, true) + assert.Error(t, err) +} + +func Test_parseMessage_custom_payload_json_with_records_array(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_payload_json_with_records_array.json"), + Timestamp: time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 3) + + expectedLine1 := `"I am valid json"` + expectedLine2 := `"Me too"` + expectedLine3 := "{\n \"MyKey\": \"MyValue\"\n }" + assert.Equal(t, expectedLine1, removeCRLFIfWindows(entries[0].Line)) + assert.Equal(t, expectedLine2, removeCRLFIfWindows(entries[1].Line)) + assert.Equal(t, expectedLine3, removeCRLFIfWindows(entries[2].Line)) + + assert.Equal(t, time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), entries[0].Timestamp) + assert.Equal(t, time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), entries[1].Timestamp) + assert.Equal(t, time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), entries[2].Timestamp) +} + +func Test_parseMessage_custom_payload_json_with_records_array_custom_payload_not_allowed(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{ + DisallowCustomMessages: true, + } + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_payload_json_with_records_array.json"), + Timestamp: time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), + } + + _, err := messageParser.Parse(message, nil, nil, true) + assert.Error(t, err) +} + +func Test_parseMessage_message_with_invalid_time(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/message_with_invalid_time.json"), + Timestamp: time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 1) + + expectedLine := "{\n \"time\": \"SOME-ERROR-HERE-T08:44:02.8921579Z\",\n \"category\": \"WorkflowRuntime\"\n }" + assert.Equal(t, expectedLine, removeCRLFIfWindows(entries[0].Line)) + + assert.Equal(t, time.Date(2021, time.March, 17, 8, 44, 03, 0, time.UTC), entries[0].Timestamp) +} + +func Test_parseMessage_relable_config(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/function_app_logs_message.txt"), + } + + relableConfigs := []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__azure_event_hubs_category"}, + Regex: relabel.MustNewRegexp("(.*)"), + TargetLabel: "category", + Replacement: "$1", + Action: "replace", + }, + } + + entries, err := messageParser.Parse(message, nil, relableConfigs, true) + assert.NoError(t, err) + assert.Len(t, entries, 2) + + assert.Equal(t, model.LabelSet{"category": "FunctionAppLogs"}, entries[0].Labels) + assert.Equal(t, model.LabelSet{"category": "FunctionAppLogs"}, entries[1].Labels) +} + +func Test_parseMessage_custom_message_and_logic_app_logs(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{} + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_message_and_logic_app_logs.json"), + Timestamp: time.Date(2023, time.March, 17, 8, 44, 02, 0, time.UTC), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 2) + + expectedLine1 := "{\n \"time\": \"2023-03-17T08:44:02.8921579Z\",\n \"workflowId\": \"/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP\",\n \"resourceId\": \"/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP/RUNS/11111/TRIGGERS/MANUAL\",\n \"category\": \"WorkflowRuntime\",\n \"level\": \"Information\",\n \"operationName\": \"Microsoft.Logic/workflows/workflowTriggerStarted\",\n \"properties\": {\n \"$schema\": \"2016-06-01\",\n \"startTime\": \"2023-03-17T08:44:02.8358364Z\",\n \"status\": \"Succeeded\",\n \"fired\": true,\n \"resource\": {\n \"subscriptionId\": \"someSubscriptionId\",\n \"resourceGroupName\": \"AzureLogsTesting\",\n \"workflowId\": \"someWorkflowId\",\n \"workflowName\": \"auzre-promtail-testing-app\",\n \"runId\": \"someRunId\",\n \"location\": \"eastus\",\n \"triggerName\": \"manual\"\n },\n \"correlation\": {\n \"clientTrackingId\": \"someClientTrackingId\"\n },\n \"api\": {}\n },\n \"location\": \"eastus\"\n }" + expectedLine2 := "{\n \"time\": \"2023-03-17T08:44:03.2036497Z\",\n \"category\": \"MyCustomCategory\"\n }" + assert.Equal(t, expectedLine1, removeCRLFIfWindows(entries[0].Line)) + assert.Equal(t, expectedLine2, removeCRLFIfWindows(entries[1].Line)) + + assert.Equal(t, time.Date(2023, time.March, 17, 8, 44, 2, 892157900, time.UTC), entries[0].Timestamp) + assert.Equal(t, time.Date(2023, time.March, 17, 8, 44, 2, 0, time.UTC), entries[1].Timestamp) +} + +func Test_parseMessage_custom_message_and_logic_app_logs_disallowCustomMessages(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{ + DisallowCustomMessages: true, + } + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/custom_message_and_logic_app_logs.json"), + Timestamp: time.Date(2023, time.March, 17, 8, 44, 02, 0, time.UTC), + } + + _, err := messageParser.Parse(message, nil, nil, true) + assert.Error(t, err) +} + +func readFile(t *testing.T, filename string) []byte { + data, err := os.ReadFile(filename) + assert.NoError(t, err) + return data +} + +// removeCRLFIfWindows removes the \r during string comparison +func removeCRLFIfWindows(s string) string { + if runtime.GOOS == "windows" { + s = strings.ReplaceAll(s, "\r", "") + } + return s +} diff --git a/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_message_and_logic_app_logs.json b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_message_and_logic_app_logs.json new file mode 100644 index 000000000000..ae8317ce0a41 --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_message_and_logic_app_logs.json @@ -0,0 +1,36 @@ +{ + "records": [ + { + "time": "2023-03-17T08:44:02.8921579Z", + "workflowId": "/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP", + "resourceId": "/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP/RUNS/11111/TRIGGERS/MANUAL", + "category": "WorkflowRuntime", + "level": "Information", + "operationName": "Microsoft.Logic/workflows/workflowTriggerStarted", + "properties": { + "$schema": "2016-06-01", + "startTime": "2023-03-17T08:44:02.8358364Z", + "status": "Succeeded", + "fired": true, + "resource": { + "subscriptionId": "someSubscriptionId", + "resourceGroupName": "AzureLogsTesting", + "workflowId": "someWorkflowId", + "workflowName": "auzre-promtail-testing-app", + "runId": "someRunId", + "location": "eastus", + "triggerName": "manual" + }, + "correlation": { + "clientTrackingId": "someClientTrackingId" + }, + "api": {} + }, + "location": "eastus" + }, + { + "time": "2023-03-17T08:44:03.2036497Z", + "category": "MyCustomCategory" + } + ] +} \ No newline at end of file diff --git a/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json.json b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json.json new file mode 100644 index 000000000000..80a75421a1ad --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json.json @@ -0,0 +1 @@ +{"json":"I am valid json"} \ No newline at end of file diff --git a/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json_with_records_array.json b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json_with_records_array.json new file mode 100644 index 000000000000..0636096f8af5 --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json_with_records_array.json @@ -0,0 +1,7 @@ +{"records": [ + "I am valid json", + "Me too", + { + "MyKey": "MyValue" + } +]} \ No newline at end of file diff --git a/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json_with_records_string.json b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json_with_records_string.json new file mode 100644 index 000000000000..648a88a5cc84 --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_json_with_records_string.json @@ -0,0 +1 @@ +{"records":"I am valid json"} \ No newline at end of file diff --git a/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_text.txt b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_text.txt new file mode 100644 index 000000000000..94b0cf93f389 --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/testdata/custom_payload_text.txt @@ -0,0 +1 @@ +Message with quotes " ` \ No newline at end of file diff --git a/component/loki/source/azure_event_hubs/internal/parser/testdata/function_app_logs_message.txt b/component/loki/source/azure_event_hubs/internal/parser/testdata/function_app_logs_message.txt new file mode 100644 index 000000000000..c04f24a7d10d --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/testdata/function_app_logs_message.txt @@ -0,0 +1,42 @@ +{"records": [ + { "time": "2023-03-08T12:06:46Z", + "resourceId": "AZURE-FUNC-APP", + "category": "FunctionAppLogs", + "operationName": "Microsoft.Web/sites/functions/log", + "level": "Informational", + "location": "My Location", + "properties": { + 'appName':'', + 'roleInstance':'123123123123', + 'message':'Loading functions metadata', + 'category':'Host.Startup', + 'hostVersion':'X.XX.X.X', + 'hostInstanceId':'myInstance', + 'level':'Information', + 'levelId':2, + 'processId':155, + 'eventId':3143, + 'eventName':'FunctionMetadataManagerLoadingFunctionsMetadata' + } + }, + { "time": "2023-03-08T12:06:47Z", + "resourceId": "AZURE-FUNC-APP-2", + "category": "FunctionAppLogs", + "operationName": "Microsoft.Web/sites/functions/log", + "level": "Informational", + "location": "My Location", + "properties": { + 'appName':'', + 'roleInstance':'123123123123', + 'message':'Loading functions metadata', + 'category':'Host.Startup', + 'hostVersion':'X.XX.X.X', + 'hostInstanceId':'myInstance', + 'level':'Information', + 'levelId':2, + 'processId':155, + 'eventId':3143, + 'eventName':'FunctionMetadataManagerLoadingFunctionsMetadata' + } + } +]} \ No newline at end of file diff --git a/component/loki/source/azure_event_hubs/internal/parser/testdata/logic_app_logs_message.json b/component/loki/source/azure_event_hubs/internal/parser/testdata/logic_app_logs_message.json new file mode 100644 index 000000000000..cf66bca08308 --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/testdata/logic_app_logs_message.json @@ -0,0 +1,61 @@ +{ + "records": [ + { + "time": "2023-03-17T08:44:02.8921579Z", + "workflowId": "/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP", + "resourceId": "/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP/RUNS/11111/TRIGGERS/MANUAL", + "category": "WorkflowRuntime", + "level": "Information", + "operationName": "Microsoft.Logic/workflows/workflowTriggerStarted", + "properties": { + "$schema": "2016-06-01", + "startTime": "2023-03-17T08:44:02.8358364Z", + "status": "Succeeded", + "fired": true, + "resource": { + "subscriptionId": "someSubscriptionId", + "resourceGroupName": "AzureLogsTesting", + "workflowId": "someWorkflowId", + "workflowName": "auzre-promtail-testing-app", + "runId": "someRunId", + "location": "eastus", + "triggerName": "manual" + }, + "correlation": { + "clientTrackingId": "someClientTrackingId" + }, + "api": {} + }, + "location": "eastus" + }, + { + "time": "2023-03-17T08:44:03.2036497Z", + "workflowId": "/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP", + "resourceId": "/WORKFLOWS/AUZRE-PROMTAIL-TESTING-APP/RUNS/11111/TRIGGERS/MANUAL", + "category": "WorkflowRuntime", + "level": "Information", + "operationName": "Microsoft.Logic/workflows/workflowTriggerCompleted", + "properties": { + "$schema": "2016-06-01", + "startTime": "2023-03-17T08:44:02.8358364Z", + "endTime": "2023-03-17T08:44:02.8983217Z", + "status": "Succeeded", + "fired": true, + "resource": { + "subscriptionId": "someSubscriptionId", + "resourceGroupName": "AzureLogsTesting", + "workflowId": "someWorkflowId", + "workflowName": "auzre-promtail-testing-app", + "runId": "someRunId", + "location": "eastus", + "triggerName": "manual" + }, + "correlation": { + "clientTrackingId": "someClientTrackingId" + }, + "api": {} + }, + "location": "eastus" + } + ] +} \ No newline at end of file diff --git a/component/loki/source/azure_event_hubs/internal/parser/testdata/message_with_invalid_time.json b/component/loki/source/azure_event_hubs/internal/parser/testdata/message_with_invalid_time.json new file mode 100644 index 000000000000..a8cb21e44b34 --- /dev/null +++ b/component/loki/source/azure_event_hubs/internal/parser/testdata/message_with_invalid_time.json @@ -0,0 +1,8 @@ +{ + "records": [ + { + "time": "SOME-ERROR-HERE-T08:44:02.8921579Z", + "category": "WorkflowRuntime" + } + ] +} \ No newline at end of file diff --git a/component/loki/source/kafka/internal/fake/client.go b/component/loki/source/internal/kafkafake/client.go similarity index 98% rename from component/loki/source/kafka/internal/fake/client.go rename to component/loki/source/internal/kafkafake/client.go index 2e67446a49af..33ed01065629 100644 --- a/component/loki/source/kafka/internal/fake/client.go +++ b/component/loki/source/internal/kafkafake/client.go @@ -1,4 +1,4 @@ -package fake +package kafkafake // This code is copied from Promtail. The fake package is used to configure // fake client that can be used in testing. diff --git a/component/loki/source/kafka/internal/kafkatarget/authentication.go b/component/loki/source/internal/kafkatarget/authentication.go similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/authentication.go rename to component/loki/source/internal/kafkatarget/authentication.go diff --git a/component/loki/source/kafka/internal/kafkatarget/config.go b/component/loki/source/internal/kafkatarget/config.go similarity index 76% rename from component/loki/source/kafka/internal/kafkatarget/config.go rename to component/loki/source/internal/kafkatarget/config.go index f6a2c7c78a10..a75ab30e6b5d 100644 --- a/component/loki/source/kafka/internal/kafkatarget/config.go +++ b/component/loki/source/internal/kafkatarget/config.go @@ -2,6 +2,7 @@ package kafkatarget import ( "github.com/Shopify/sarama" + "github.com/grafana/agent/component/common/loki" "github.com/grafana/dskit/flagext" promconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -40,6 +41,8 @@ type TargetConfig struct { // Authentication strategy with Kafka brokers Authentication Authentication `yaml:"authentication"` + + MessageParser MessageParser } // AuthenticationType specifies method to authenticate with Kafka brokers @@ -67,6 +70,14 @@ type Authentication struct { SASLConfig SASLConfig `yaml:"sasl_config,omitempty"` } +// TokenProviderType specifies the provider used for resolving the access token +type TokenProviderType string + +const ( + // TokenProviderTypeAzure represents using the Azure as the token provider + TokenProviderTypeAzure TokenProviderType = "azure" +) + // KafkaSASLConfig describe the SASL configuration for authentication with Kafka brokers type SASLConfig struct { // SASL mechanism. Supports PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512 @@ -83,4 +94,19 @@ type SASLConfig struct { // TLSConfig is used for SASL over TLS. It is used only when UseTLS is true TLSConfig promconfig.TLSConfig `yaml:",inline"` + + // OAuthConfig is used for configuring the token provider + OAuthConfig OAuthConfig `yaml:"oauth_provider_config,omitempty"` +} + +type OAuthConfig struct { + // TokenProvider is used for resolving the OAuth access token + TokenProvider TokenProviderType `yaml:"token_provider,omitempty"` + + Scopes []string +} + +// MessageParser defines parsing for each incoming message +type MessageParser interface { + Parse(message *sarama.ConsumerMessage, labels model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool) ([]loki.Entry, error) } diff --git a/component/loki/source/kafka/internal/kafkatarget/consumer.go b/component/loki/source/internal/kafkatarget/consumer.go similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/consumer.go rename to component/loki/source/internal/kafkatarget/consumer.go diff --git a/component/loki/source/kafka/internal/kafkatarget/consumer_test.go b/component/loki/source/internal/kafkatarget/consumer_test.go similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/consumer_test.go rename to component/loki/source/internal/kafkatarget/consumer_test.go diff --git a/component/loki/source/kafka/internal/kafkatarget/formatter.go b/component/loki/source/internal/kafkatarget/formatter.go similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/formatter.go rename to component/loki/source/internal/kafkatarget/formatter.go diff --git a/component/loki/source/kafka/internal/kafkatarget/kafkatarget.go b/component/loki/source/internal/kafkatarget/kafkatarget.go similarity index 87% rename from component/loki/source/kafka/internal/kafkatarget/kafkatarget.go rename to component/loki/source/internal/kafkatarget/kafkatarget.go index edf50799cce4..b0108d72e60b 100644 --- a/component/loki/source/kafka/internal/kafkatarget/kafkatarget.go +++ b/component/loki/source/internal/kafkatarget/kafkatarget.go @@ -8,15 +8,14 @@ import ( "fmt" "time" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" - "github.com/Shopify/sarama" - "github.com/prometheus/common/model" - + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/agent/component/common/loki" "github.com/grafana/loki/clients/pkg/promtail/targets/target" - "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" ) type runnableDroppedTarget struct { @@ -29,6 +28,7 @@ func (d *runnableDroppedTarget) run() { } type KafkaTarget struct { + logger log.Logger discoveredLabels model.LabelSet lbs model.LabelSet details ConsumerDetails @@ -37,18 +37,22 @@ type KafkaTarget struct { client loki.EntryHandler relabelConfig []*relabel.Config useIncomingTimestamp bool + messageParser MessageParser } func NewKafkaTarget( + logger log.Logger, session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, discoveredLabels, lbs model.LabelSet, relabelConfig []*relabel.Config, client loki.EntryHandler, useIncomingTimestamp bool, + messageParser MessageParser, ) *KafkaTarget { return &KafkaTarget{ + logger: logger, discoveredLabels: discoveredLabels, lbs: lbs, details: newDetails(session, claim), @@ -57,6 +61,7 @@ func NewKafkaTarget( client: client, relabelConfig: relabelConfig, useIncomingTimestamp: useIncomingTimestamp, + messageParser: messageParser, } } @@ -84,13 +89,15 @@ func (t *KafkaTarget) run() { if len(lbs) > 0 { out = out.Merge(lbs) } - t.client.Chan() <- loki.Entry{ - Entry: logproto.Entry{ - Line: string(message.Value), - Timestamp: timestamp(t.useIncomingTimestamp, message.Timestamp), - }, - Labels: out, + entries, err := t.messageParser.Parse(message, out, t.relabelConfig, t.useIncomingTimestamp) + if err != nil { + level.Error(t.logger).Log("msg", "message parsing error", "err", err) + } else { + for _, entry := range entries { + t.client.Chan() <- entry + } } + t.session.MarkMessage(message, "") } } diff --git a/component/loki/source/kafka/internal/kafkatarget/kafkatarget_test.go b/component/loki/source/internal/kafkatarget/kafkatarget_test.go similarity index 96% rename from component/loki/source/kafka/internal/kafkatarget/kafkatarget_test.go rename to component/loki/source/internal/kafkatarget/kafkatarget_test.go index a04d11ce55ff..777c6f2e9ceb 100644 --- a/component/loki/source/kafka/internal/kafkatarget/kafkatarget_test.go +++ b/component/loki/source/internal/kafkatarget/kafkatarget_test.go @@ -12,12 +12,11 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/grafana/agent/component/loki/source/internal/kafkafake" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/require" "go.uber.org/atomic" - - "github.com/grafana/agent/component/loki/source/kafka/internal/fake" ) // Consumergroup handler @@ -169,13 +168,13 @@ func Test_TargetRun(t *testing.T) { t.Run(tt.name, func(t *testing.T) { session, claim := &testSession{}, newTestClaim("footopic", 10, 12) var closed bool - fc := fake.New( + fc := kafkafake.New( func() { closed = true }, ) - tg := NewKafkaTarget(session, claim, tt.inDiscoveredLS, tt.inLS, tt.relabels, fc, true) + tg := NewKafkaTarget(nil, session, claim, tt.inDiscoveredLS, tt.inLS, tt.relabels, fc, true, &KafkaTargetMessageParser{}) var wg sync.WaitGroup wg.Add(1) diff --git a/component/loki/source/internal/kafkatarget/oauth_provider.go b/component/loki/source/internal/kafkatarget/oauth_provider.go new file mode 100644 index 000000000000..c2afeede3a45 --- /dev/null +++ b/component/loki/source/internal/kafkatarget/oauth_provider.go @@ -0,0 +1,46 @@ +package kafkatarget + +import ( + "context" + "fmt" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Shopify/sarama" +) + +func NewOAuthProvider(opts OAuthConfig) (sarama.AccessTokenProvider, error) { + switch opts.TokenProvider { + case TokenProviderTypeAzure: + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, err + } + return &TokenProviderAzure{tokenProvider: cred, scopes: opts.Scopes}, nil + default: + return nil, fmt.Errorf("token provider '%s' is not supported", opts.TokenProvider) + } +} + +type azureTokenProvider interface { + GetToken(ctx context.Context, opts policy.TokenRequestOptions) (azcore.AccessToken, error) +} + +// TokenProviderAzure implements sarama.AccessTokenProvider +type TokenProviderAzure struct { + tokenProvider azureTokenProvider + scopes []string +} + +// Token returns a new *sarama.AccessToken or an error +func (t *TokenProviderAzure) Token() (*sarama.AccessToken, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + token, err := t.tokenProvider.GetToken(ctx, policy.TokenRequestOptions{Scopes: t.scopes}) + if err != nil { + return nil, fmt.Errorf("failed to acquire token: %w", err) + } + return &sarama.AccessToken{Token: token.Token}, nil +} diff --git a/component/loki/source/internal/kafkatarget/parser.go b/component/loki/source/internal/kafkatarget/parser.go new file mode 100644 index 000000000000..c6234a93a804 --- /dev/null +++ b/component/loki/source/internal/kafkatarget/parser.go @@ -0,0 +1,24 @@ +package kafkatarget + +import ( + "github.com/Shopify/sarama" + "github.com/grafana/agent/component/common/loki" + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" +) + +// KafkaTargetMessageParser implements MessageParser. It doesn't modify the content of the original `message.Value`. +type KafkaTargetMessageParser struct{} + +func (p *KafkaTargetMessageParser) Parse(message *sarama.ConsumerMessage, labels model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool) ([]loki.Entry, error) { + return []loki.Entry{ + { + Labels: labels, + Entry: logproto.Entry{ + Timestamp: timestamp(useIncomingTimestamp, message.Timestamp), + Line: string(message.Value), + }, + }, + }, nil +} diff --git a/component/loki/source/kafka/internal/kafkatarget/target_syncer.go b/component/loki/source/internal/kafkatarget/target_syncer.go similarity index 95% rename from component/loki/source/kafka/internal/kafkatarget/target_syncer.go rename to component/loki/source/internal/kafkatarget/target_syncer.go index 219a2ae5aa46..26e59efb1188 100644 --- a/component/loki/source/kafka/internal/kafkatarget/target_syncer.go +++ b/component/loki/source/internal/kafkatarget/target_syncer.go @@ -43,6 +43,7 @@ type TargetSyncer struct { cancel context.CancelFunc wg sync.WaitGroup previousTopics []string + messageParser MessageParser } func NewSyncer( @@ -50,6 +51,7 @@ func NewSyncer( logger log.Logger, cfg Config, pushClient loki.EntryHandler, + messageParser MessageParser, ) (*TargetSyncer, error) { if err := validateConfig(&cfg); err != nil { @@ -110,6 +112,7 @@ func NewSyncer( ConsumerGroup: group, logger: logger, }, + messageParser: messageParser, } t.discoverer = t t.loop() @@ -154,6 +157,7 @@ func withSASLAuthentication(cfg sarama.Config, authCfg Authentication) (*sarama. sarama.SASLTypeSCRAMSHA512, sarama.SASLTypeSCRAMSHA256, sarama.SASLTypePlaintext, + sarama.SASLTypeOAuth, } if !StringsContain(supportedMechanism, string(authCfg.SASLConfig.Mechanism)) { return nil, fmt.Errorf("error unsupported sasl mechanism: %s", authCfg.SASLConfig.Mechanism) @@ -173,6 +177,15 @@ func withSASLAuthentication(cfg sarama.Config, authCfg Authentication) (*sarama. } } } + + if cfg.Net.SASL.Mechanism == sarama.SASLTypeOAuth { + accessTokenProvider, err := NewOAuthProvider(authCfg.SASLConfig.OAuthConfig) + if err != nil { + return nil, fmt.Errorf("unable to create new access token provider: %w", err) + } + cfg.Net.SASL.TokenProvider = accessTokenProvider + } + if authCfg.SASLConfig.UseTLS { tc, err := createTLSConfig(authCfg.SASLConfig.TLSConfig) if err != nil { @@ -282,6 +295,7 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar }, nil } t := NewKafkaTarget( + ts.logger, session, claim, discoveredLabels, @@ -289,6 +303,7 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar ts.cfg.RelabelConfigs, ts.client, ts.cfg.KafkaConfig.UseIncomingTimestamp, + ts.messageParser, ) return t, nil diff --git a/component/loki/source/kafka/internal/kafkatarget/target_syncer_test.go b/component/loki/source/internal/kafkatarget/target_syncer_test.go similarity index 98% rename from component/loki/source/kafka/internal/kafkatarget/target_syncer_test.go rename to component/loki/source/internal/kafkatarget/target_syncer_test.go index 070ed6e9968d..a14fffc651ab 100644 --- a/component/loki/source/kafka/internal/kafkatarget/target_syncer_test.go +++ b/component/loki/source/internal/kafkatarget/target_syncer_test.go @@ -18,7 +18,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/grafana/agent/component/loki/source/kafka/internal/fake" + "github.com/grafana/agent/component/loki/source/internal/kafkafake" ) func Test_TopicDiscovery(t *testing.T) { @@ -87,7 +87,7 @@ func Test_NewTarget(t *testing.T) { ts := &TargetSyncer{ logger: log.NewNopLogger(), reg: prometheus.DefaultRegisterer, - client: fake.New(func() {}), + client: kafkafake.New(func() {}), cfg: Config{ RelabelConfigs: []*relabel.Config{ { diff --git a/component/loki/source/kafka/internal/kafkatarget/testdata/example.com-key.pem b/component/loki/source/internal/kafkatarget/testdata/example.com-key.pem similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/testdata/example.com-key.pem rename to component/loki/source/internal/kafkatarget/testdata/example.com-key.pem diff --git a/component/loki/source/kafka/internal/kafkatarget/testdata/example.com.ca.pem b/component/loki/source/internal/kafkatarget/testdata/example.com.ca.pem similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/testdata/example.com.ca.pem rename to component/loki/source/internal/kafkatarget/testdata/example.com.ca.pem diff --git a/component/loki/source/kafka/internal/kafkatarget/testdata/example.com.pem b/component/loki/source/internal/kafkatarget/testdata/example.com.pem similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/testdata/example.com.pem rename to component/loki/source/internal/kafkatarget/testdata/example.com.pem diff --git a/component/loki/source/kafka/internal/kafkatarget/topics.go b/component/loki/source/internal/kafkatarget/topics.go similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/topics.go rename to component/loki/source/internal/kafkatarget/topics.go diff --git a/component/loki/source/kafka/internal/kafkatarget/topics_test.go b/component/loki/source/internal/kafkatarget/topics_test.go similarity index 100% rename from component/loki/source/kafka/internal/kafkatarget/topics_test.go rename to component/loki/source/internal/kafkatarget/topics_test.go diff --git a/component/loki/source/kafka/kafka.go b/component/loki/source/kafka/kafka.go index d1297e212820..ec204d7746bd 100644 --- a/component/loki/source/kafka/kafka.go +++ b/component/loki/source/kafka/kafka.go @@ -6,14 +6,12 @@ import ( "github.com/Shopify/sarama" "github.com/go-kit/log/level" - "github.com/grafana/agent/component" "github.com/grafana/agent/component/common/config" "github.com/grafana/agent/component/common/loki" flow_relabel "github.com/grafana/agent/component/common/relabel" - kt "github.com/grafana/agent/component/loki/source/kafka/internal/kafkatarget" + kt "github.com/grafana/agent/component/loki/source/internal/kafkatarget" "github.com/grafana/dskit/flagext" - "github.com/prometheus/common/model" ) @@ -53,11 +51,17 @@ type KafkaAuthentication struct { // KafkaSASLConfig describe the SASL configuration for authentication with Kafka brokers type KafkaSASLConfig struct { - Mechanism string `river:"mechanism,attr,optional"` - User string `river:"user,attr"` - Password string `river:"password,attr"` - UseTLS bool `river:"use_tls,attr,optional"` - TLSConfig config.TLSConfig `river:"tls_config,block,optional"` + Mechanism string `river:"mechanism,attr,optional"` + User string `river:"user,attr,optional"` + Password string `river:"password,attr,optional"` + UseTLS bool `river:"use_tls,attr,optional"` + TLSConfig config.TLSConfig `river:"tls_config,block,optional"` + OAuthConfig OAuthConfigConfig `river:"oauth_config,block,optional"` +} + +type OAuthConfigConfig struct { + TokenProvider string `river:"token_provider,attr"` + Scopes []string `river:"scopes,attr"` } // DefaultArguments provides the default arguments for a kafka component. @@ -162,7 +166,7 @@ func (c *Component) Update(args component.Arguments) error { } entryHandler := loki.NewEntryHandler(c.handler, func() {}) - t, err := kt.NewSyncer(c.opts.Registerer, c.opts.Logger, newArgs.Convert(), entryHandler) + t, err := kt.NewSyncer(c.opts.Registerer, c.opts.Logger, newArgs.Convert(), entryHandler, &kt.KafkaTargetMessageParser{}) if err != nil { level.Error(c.opts.Logger).Log("msg", "failed to create kafka client with provided config", "err", err) return err @@ -213,6 +217,10 @@ func (auth KafkaAuthentication) Convert() kt.Authentication { Password: secret, UseTLS: auth.SASLConfig.UseTLS, TLSConfig: *auth.SASLConfig.TLSConfig.Convert(), + OAuthConfig: kt.OAuthConfig{ + TokenProvider: kt.TokenProviderType(auth.SASLConfig.OAuthConfig.TokenProvider), + Scopes: auth.SASLConfig.OAuthConfig.Scopes, + }, }, } } diff --git a/component/loki/source/kafka/kafka_test.go b/component/loki/source/kafka/kafka_test.go index 76f57ab2691f..8fa1ac71ff9a 100644 --- a/component/loki/source/kafka/kafka_test.go +++ b/component/loki/source/kafka/kafka_test.go @@ -60,3 +60,27 @@ func TestSASLRiverConfig(t *testing.T) { err := river.Unmarshal([]byte(exampleRiverConfig), &args) require.NoError(t, err) } + +func TestSASLOAuthRiverConfig(t *testing.T) { + var exampleRiverConfig = ` + brokers = ["localhost:9092", "localhost:23456"] + topics = ["quickstart-events"] + + authentication { + type = "sasl" + sasl_config { + mechanism = "OAUTHBEARER" + oauth_config { + token_provider = "azure" + scopes = ["my-scope"] + } + } + } + labels = {component = "loki.source.kafka"} + forward_to = [] +` + + var args Arguments + err := river.Unmarshal([]byte(exampleRiverConfig), &args) + require.NoError(t, err) +} diff --git a/docs/sources/flow/reference/components/loki.source.azure_event_hubs.md b/docs/sources/flow/reference/components/loki.source.azure_event_hubs.md new file mode 100644 index 000000000000..d8f80886df3b --- /dev/null +++ b/docs/sources/flow/reference/components/loki.source.azure_event_hubs.md @@ -0,0 +1,130 @@ +--- +title: loki.source.azure_event_hubs +--- + +# loki.source.azure_event_hubs + +`loki.source.azure_event_hubs` receives Azure Event Hubs messages by making use of an Apache Kafka +endpoint on Event Hubs. For more information, see +the [Azure Event Hubs documentation](https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview). + +To learn more about streaming Azure logs to an Azure Event Hubs, refer to +Microsoft's tutorial on how to [Stream Azure Active Directory logs to an Azure event hub](https://learn.microsoft.com/en-us/azure/active-directory/reports-monitoring/tutorial-azure-monitor-stream-logs-to-event-hub). + +Note that an Apache Kafka endpoint is not available within the Basic pricing plan. For more information, see +the [Event Hubs pricing page](https://azure.microsoft.com/en-us/pricing/details/event-hubs/). + +Multiple `loki.source.azure_event_hubs` components can be specified by giving them +different labels. + +## Usage + +```river +loki.source.azure_event_hubs "LABEL" { + fully_qualified_namespace = "HOST:PORT" + event_hubs = EVENT_HUB_LIST + forward_to = RECEIVER_LIST + + authentication { + mechanism = "AUTHENTICATION_MECHANISM" + } +} +``` + +## Arguments + +`loki.source.azure_event_hubs` supports the following arguments: + + Name | Type | Description | Default | Required +-----------------------------|----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|---------- + `fully_qualified_namespace` | `string` | Event hub namespace. | | yes + `event_hubs` | `list(string)` | Event Hubs to consume. | | yes + `group_id` | `string` | The Kafka consumer group id. | `"loki.source.azure_event_hubs"` | no + `assignor` | `string` | The consumer group rebalancing strategy to use. | `"range"` | no + `use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from Azure Event Hub. | `false` | no + `labels` | `map(string)` | The labels to associate with each received event. | `{}` | no + `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes + `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no + `disallow_custom_messages` | `bool` | Whether to ignore messages that don't match the [schema](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema) for Azure resource logs. | `false` | no + `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no + +The `fully_qualified_namespace` argument must refer to a full `HOST:PORT` that points to your event hub, such as `NAMESPACE.servicebus.windows.net:9093`. +The `assignor` argument must be set to one of `"range"`, `"roundrobin"`, or `"sticky"`. + +The `relabel_rules` field can make use of the `rules` export value from a +`loki.relabel` component to apply one or more relabeling rules to log entries +before they're forwarded to the list of receivers in `forward_to`. + +### Labels + +The `labels` map is applied to every message that the component reads. + +The following internal labels prefixed with `__` are available but are discarded if not relabeled: + +- `__meta_kafka_message_key` +- `__meta_kafka_topic` +- `__meta_kafka_partition` +- `__meta_kafka_member_id` +- `__meta_kafka_group_id` +- `__azure_event_hubs_category` + +## Blocks + +The following blocks are supported inside the definition of `loki.source.azure_event_hubs`: + + Hierarchy | Name | Description | Required +----------------|------------------|----------------------------------------------------|---------- + authentication | [authentication] | Authentication configuration with Azure Event Hub. | yes + +[authentication]: #authentication-block + +### authentication block + +The `authentication` block defines the authentication method when communicating with Azure Event Hub. + + Name | Type | Description | Default | Required +---------------------|----------------|---------------------------------------------------------------------------|---------|---------- + `mechanism` | `string` | Authentication mechanism. | | yes + `connection_string` | `string` | Event Hubs ConnectionString for authentication on Azure Cloud. | | no + `scopes` | `list(string)` | Access token scopes. Default is `fully_qualified_namespace` without port. | | no + +`mechanism` supports the values `"connection_string"` and `"oauth"`. If `"connection_string"` is used, +you must set the `connection_string` attribute. If `"oauth"` is used, you must configure one of the supported credential +types as documented +here: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/azidentity/README.md#credential-types via environment +variables or Azure CLI. + +## Exported fields + +`loki.source.azure_event_hubs` does not export any fields. + +## Component health + +`loki.source.azure_event_hubs` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`loki.source.azure_event_hubs` does not expose additional debug info. + +## Example + +This example consumes messages from Azure Event Hub and uses OAuth to authenticate itself. + +```river +loki.source.azure_event_hubs "example" { + fully_qualified_namespace = "my-ns.servicebus.windows.net:9093" + event_hubs = ["gw-logs"] + forward_to = [loki.write.example.receiver] + + authentication { + mechanism = "oauth" + } +} + +loki.write "example" { + endpoint { + url = "loki:3100/api/v1/push" + } +} +``` \ No newline at end of file diff --git a/docs/sources/flow/reference/components/loki.source.kafka.md b/docs/sources/flow/reference/components/loki.source.kafka.md index 7049d296d119..9946f2c4c6ba 100644 --- a/docs/sources/flow/reference/components/loki.source.kafka.md +++ b/docs/sources/flow/reference/components/loki.source.kafka.md @@ -32,17 +32,17 @@ loki.source.kafka "LABEL" { `loki.source.kafka` supports the following arguments: -Name | Type | Description | Default | Required ------------------------- | ---------------------- | -------------------- | ------- | -------- -`brokers` | `list(string)` | The list of brokers to connect to Kafka. | | yes -`topics` | `list(string)` | The list of Kafka topics to consume. | | yes -`group_id` | `string` | The Kafka consumer group id. | `"loki.source.kafka"` | no -`assignor` | `string` | The consumer group rebalancing strategy to use. | `"range"` | no -`version` | `string` | Kafka version to connect to. | `"2.2.1"` | no -`use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from Kafka. | `false` | no -`labels` | `map(string)` | The labels to associate with each received Kafka event. | `{}` | no -`forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes -`relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no + Name | Type | Description | Default | Required +--------------------------|----------------------|----------------------------------------------------------|-----------------------|---------- + `brokers` | `list(string)` | The list of brokers to connect to Kafka. | | yes + `topics` | `list(string)` | The list of Kafka topics to consume. | | yes + `group_id` | `string` | The Kafka consumer group id. | `"loki.source.kafka"` | no + `assignor` | `string` | The consumer group rebalancing strategy to use. | `"range"` | no + `version` | `string` | Kafka version to connect to. | `"2.2.1"` | no + `use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from Kafka. | `false` | no + `labels` | `map(string)` | The labels to associate with each received Kafka event. | `{}` | no + `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes + `relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no `assignor` values can be either `"range"`, `"roundrobin"`, or `"sticky"`. @@ -70,24 +70,29 @@ keep these labels, relabel them using a [loki.relabel][] component and pass its The following blocks are supported inside the definition of `loki.source.kafka`: -Hierarchy | Name | Description | Required ---------- | ---- | ----------- | -------- -authentication | [authentication] | Optional authentication configuration with Kafka brokers. | no -authentication > tls_config | [tls_config] | Optional authentication configuration with Kafka brokers. | no -authentication > sasl_config | [sasl_config] | Optional authentication configuration with Kafka brokers. | no -authentication > sasl_config > tls_config | [tls_config] | Optional authentication configuration with Kafka brokers. | no + Hierarchy | Name | Description | Required +---------------------------------------------|------------------|-----------------------------------------------------------|---------- + authentication | [authentication] | Optional authentication configuration with Kafka brokers. | no + authentication > tls_config | [tls_config] | Optional authentication configuration with Kafka brokers. | no + authentication > sasl_config | [sasl_config] | Optional authentication configuration with Kafka brokers. | no + authentication > sasl_config > tls_config | [tls_config] | Optional authentication configuration with Kafka brokers. | no + authentication > sasl_config > oauth_config | [oauth_config] | Optional authentication configuration with Kafka brokers. | no [authentication]: #authentication-block + [tls_config]: #tls_config-block + [sasl_config]: #sasl_config-block +[oauth_config]: #oauth_config-block + ### authentication block The `authentication` block defines the authentication method when communicating with the Kafka event brokers. -Name | Type | Description | Default | Required ------------------------- | ------------- | ----------- | ------- | -------- -`type` | `string` | Type of authentication. | `"none"` | no + Name | Type | Description | Default | Required +--------|----------|-------------------------|----------|---------- + `type` | `string` | Type of authentication. | `"none"` | no `type` supports the values `"none"`, `"ssl"`, and `"sasl"`. If `"ssl"` is used, you must set the `tls_config` block. If `"sasl"` is used, you must set the `sasl_config` block. @@ -101,12 +106,21 @@ you must set the `tls_config` block. If `"sasl"` is used, you must set the `sasl The `sasl_config` block defines the listen address and port where the listener expects Kafka messages to be sent to. -Name | Type | Description | Default | Required ------------------------- | ------------- | ----------- | ------- | -------- -`mechanism` | `string` | Specifies the SASL mechanism the client uses to authenticate with the broker. | `"PLAIN""` | no -`user` | `string` | The user name to use for SASL authentication. | `""` | no -`password` | `string` | The password to use for SASL authentication. | `""` | no -`use_tls` | `bool` | If true, SASL authentication is executed over TLS. | `false` | no + Name | Type | Description | Default | Required +-------------|----------|-------------------------------------------------------------------------------|------------|---------- + `mechanism` | `string` | Specifies the SASL mechanism the client uses to authenticate with the broker. | `"PLAIN""` | no + `user` | `string` | The user name to use for SASL authentication. | `""` | no + `password` | `string` | The password to use for SASL authentication. | `""` | no + `use_tls` | `bool` | If true, SASL authentication is executed over TLS. | `false` | no + +### oauth_config block + +The `oauth_config` is required when the SASL mechanism is set to `OAUTHBEARER`. + + Name | Type | Description | Default | Required +------------------|----------------|------------------------------------------------------------------------|---------|---------- + `token_provider` | `string` | The OAuth provider to be used. The only supported provider is `azure`. | `""` | yes + `scopes` | `list(string)` | The scopes to set in the access token | `[]` | yes ## Exported fields diff --git a/go.mod b/go.mod index 1a637fd17a73..b5ce631d9631 100644 --- a/go.mod +++ b/go.mod @@ -200,7 +200,7 @@ require ( github.com/AlekSi/pointer v1.1.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/azure-sdk-for-go v66.0.0+incompatible // indirect - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.8.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourcegraph v0.6.0 // indirect