Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Azure] Add input metrics to the azure-eventhub input #35739

Merged
merged 13 commits into from
Aug 15, 2023
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add device handling to Okta input package for entity analytics. {pull}36049[36049]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}36286[36286]
- [Azure] Add input metrics to the azure-eventhub input. {pull}35739[35739]

*Auditbeat*

Expand Down
19 changes: 19 additions & 0 deletions x-pack/filebeat/docs/inputs/input-azure-eventhub.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,22 @@ https://management.azure.com/ for azure PublicCloud
https://management.usgovcloudapi.net/ for azure USGovernmentCloud
Users can also use this in case of a Hybrid Cloud model, where one may define their own endpoints.

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `received_messages_total` | Number of messages received from the event hub.
| `received_bytes_total` | Number of bytes received from the event hub.
| `sanitized_messages_total` | Number of messages that were sanitized successfully.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I wonder if we should explain what "sanitized" refers to here. I don't think that the difference between "processed" and "sanitized" is straightforward for a user without context.

| `processed_messages_total` | Number of messages that were processed successfully.
| `received_events_total` | Number of events received decoding messages.
| `sent_events_total` | Number of events that were sent successfully.
| `processing_time` | Histogram of the elapsed processing times in nanoseconds.
| `decode_errors_total` | Number of errors that occurred while decoding a message.
|=======
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/azureeventhub/eph.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (a *azureInput) runWithEPH() error {
ok := a.processEvents(e, "")
if !ok {
onEventErr = errors.New("OnEvent function returned false. Stopping input worker")
a.log.Debug(onEventErr.Error())
a.log.Error(onEventErr.Error())
a.Stop()
}
return onEventErr
Expand Down
124 changes: 101 additions & 23 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,59 @@ import (
"sync"
"time"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
)

const (
eventHubConnector = ";EntityPath="
expandEventListFromField = "records"
inputName = "azure-eventhub"
)

func init() {
zmoog marked this conversation as resolved.
Show resolved Hide resolved
err := input.Register(inputName, NewInput)
if err != nil {
panic(fmt.Errorf("failed to register %v input: %w", inputName, err))
}
}

// configID computes a unique ID for the input configuration.
//
// It is used to identify the input in the registry and to detect
// changes in the configuration.
//
// We will remove this function as we upgrade the input to the
// v2 API (there is an ID in the v2 context).
func configID(config *conf.C) (string, error) {
var tmp struct {
ID string `config:"id"`
}
if err := config.Unpack(&tmp); err != nil {
return "", fmt.Errorf("error extracting ID: %w", err)
}
if tmp.ID != "" {
return tmp.ID, nil
}

var h map[string]interface{}
_ = config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return "", fmt.Errorf("can not compute ID from configuration: %w", err)
}

return fmt.Sprintf("%16X", id), nil
}

// azureInput struct for the azure-eventhub input
type azureInput struct {
config azureInputConfig // azure-eventhub configuration
Expand All @@ -40,17 +77,8 @@ type azureInput struct {
workerCancel context.CancelFunc // used to signal that the worker should stop.
workerOnce sync.Once // guarantees that the worker goroutine is only started once.
processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option
}

const (
inputName = "azure-eventhub"
)

func init() {
err := input.Register(inputName, NewInput)
if err != nil {
panic(fmt.Errorf("failed to register %v input: %w", inputName, err))
}
id string // ID of the input; used to identify the input in the input metrics registry only, and will be removed once the input is migrated to v2.
metrics *inputMetrics // Metrics for the input.
}

// NewInput creates a new azure-eventhub input
Expand All @@ -64,6 +92,17 @@ func NewInput(
return nil, fmt.Errorf("reading %s input config: %w", inputName, err)
}

// Since this is a v1 input, we need to set the ID manually.
//
// We need an ID to identify the input in the input metrics
// registry.
//
// This is a temporary workaround until we migrate the input to v2.
inputId, err := configID(cfg)
if err != nil {
return nil, err
}

inputCtx, cancelInputCtx := context.WithCancel(context.Background())
go func() {
defer cancelInputCtx()
Expand All @@ -77,7 +116,8 @@ func NewInput(
// to be recreated with each restart.
workerCtx, workerCancel := context.WithCancel(inputCtx)

in := &azureInput{
in := azureInput{
id: inputId,
config: config,
log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", stripConnectionString(config.ConnectionString)),
context: inputContext,
Expand All @@ -90,7 +130,8 @@ func NewInput(
}
in.outlet = out
in.log.Infof("Initialized %s input.", inputName)
return in, nil

return &in, nil
}

// Run starts the `azure-eventhub` input and then returns.
Expand All @@ -106,6 +147,16 @@ func (a *azureInput) Run() {
// invocation.
a.workerOnce.Do(func() {
a.log.Infof("%s input worker is starting.", inputName)

// We set up the metrics in the `Run()` method and tear them down
// in the `Stop()` method.
//
// The factory method `NewInput` is not a viable solution because
// the Runner invokes it during the configuration check without
// calling the `Stop()` function; this causes panics
// due to multiple metrics registrations.
a.metrics = newInputMetrics(a.id, nil)

err := a.runWithEPH()
if err != nil {
a.log.Errorw("error starting the input worker", "error", err)
Expand All @@ -117,6 +168,7 @@ func (a *azureInput) Run() {

// Stop stops `azure-eventhub` input.
func (a *azureInput) Stop() {
a.log.Infof("%s input worker is stopping.", inputName)
if a.processor != nil {
// Tells the processor to stop processing events and release all
// resources (like scheduler, leaser, checkpointer, and client).
Expand All @@ -126,7 +178,12 @@ func (a *azureInput) Stop() {
}
}

if a.metrics != nil {
a.metrics.Close()
}

a.workerCancel()
a.log.Infof("%s input worker has stopped.", inputName)
}

// Wait stop the current server
Expand All @@ -135,35 +192,50 @@ func (a *azureInput) Wait() {
}

func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bool {
timestamp := time.Now()
processingStartTime := time.Now()
azure := mapstr.M{
// partitionID is only mapped in the non-eph option which is not available yet, this field will be temporary unavailable
//"partition_id": partitionID,
"eventhub": a.config.EventHubName,
"consumer_group": a.config.ConsumerGroup,
}
messages := a.parseMultipleMessages(event.Data)
for _, msg := range messages {

// update the input metrics
a.metrics.receivedMessages.Inc()
a.metrics.receivedBytes.Add(uint64(len(event.Data)))

records := a.parseMultipleRecords(event.Data)

for _, record := range records {
_, _ = azure.Put("offset", event.SystemProperties.Offset)
_, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber)
_, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime)
ok := a.outlet.OnEvent(beat.Event{
Timestamp: timestamp,
// this is the default value for the @timestamp field; usually the ingest
// pipeline replaces it with a value in the payload.
Timestamp: processingStartTime,
Fields: mapstr.M{
"message": msg,
"message": record,
"azure": azure,
},
Private: event.Data,
})
if !ok {
a.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds())
return ok
}

a.metrics.sentEvents.Inc()
}

a.metrics.processedMessages.Inc()
a.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds())

return true
}

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
// parseMultipleRecords will try to split the message into multiple ones based on the group field provided by the configuration
func (a *azureInput) parseMultipleRecords(bMessage []byte) []string {
var mapObject map[string][]interface{}
var messages []string

Expand All @@ -173,6 +245,7 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
// [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps
if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
bMessage = sanitize(bMessage, a.config.SanitizeOptions...)
a.metrics.sanitizedMessages.Inc()
}

// check if the message is a "records" object containing a list of events
Expand All @@ -183,6 +256,7 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
a.metrics.receivedEvents.Inc()
} else {
a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err)
}
Expand All @@ -196,17 +270,21 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
if err != nil {
// return entire message
a.log.Debugf("deserializing multiple messages to an array returning error: %s", err)
a.metrics.decodeErrors.Inc()
return []string{string(bMessage)}
}

for _, ms := range arrayObject {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
a.metrics.receivedEvents.Inc()
} else {
a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err)
}
}
}

return messages
}

Expand Down
29 changes: 22 additions & 7 deletions x-pack/filebeat/input/azureeventhub/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/stretchr/testify/assert"
Expand All @@ -39,9 +40,14 @@ func TestProcessEvents(t *testing.T) {
if err != nil {
t.Fatal(err)
}
reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg)
defer metrics.Close()

input := azureInput{
config: config,
outlet: out,
config: config,
metrics: metrics,
outlet: out,
}
var sn int64 = 12
now := time.Now()
Expand Down Expand Up @@ -73,7 +79,7 @@ func TestProcessEvents(t *testing.T) {
assert.Equal(t, message, single)
}

func TestParseMultipleMessages(t *testing.T) {
func TestParseMultipleRecords(t *testing.T) {
// records object
msg := "{\"records\":[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
Expand All @@ -83,8 +89,17 @@ func TestParseMultipleMessages(t *testing.T) {
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
}
input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))}
messages := input.parseMultipleMessages([]byte(msg))

reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg)
defer metrics.Close()

input := azureInput{
metrics: metrics,
log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)),
}

messages := input.parseMultipleRecords([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
Expand All @@ -95,7 +110,7 @@ func TestParseMultipleMessages(t *testing.T) {
msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]"
messages = input.parseMultipleMessages([]byte(msg1))
messages = input.parseMultipleRecords([]byte(msg1))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
Expand All @@ -104,7 +119,7 @@ func TestParseMultipleMessages(t *testing.T) {

// one event only
msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"
messages = input.parseMultipleMessages([]byte(msg2))
messages = input.parseMultipleRecords([]byte(msg2))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 1)
for _, ms := range messages {
Expand Down