Skip to content

Commit

Permalink
[Azure] Add input metrics to the azure-eventhub input (elastic#35739)
Browse files Browse the repository at this point in the history
* Draft input metrics about events and records

In this first iteration, the metrics keep track of the following data
types:

- events
- records

Events are the event delivered by the event hub; each event usually
contains a list of records.

Records are the actual logs from Azure; the input creates one document
in Elasticsearch for each record.

This draft keeps track of the following conditions.

Events:

- received: the input received from an event from the Event Hub
- sanitized: the event contains invalid JSON, and the input tried fixing it
- deserialization failed: the event contains invalid JSON; sanitization was ineffective
- processed: the input processed all the records in the event

Records:

- received: the input  unpacked a record from an event
- serialization failed: failed to serialize the record for dispatching
- processed: the input dispatched the record to the outlet

* Add test cases for input metrics

* Fix linter objections

* Fix typos and stale comments

* Set input ID with the config hash

On v1 inputs, using the hash function of the config is a common
practice.

* Simplify and adopt conventions from other inputs

* Add input metrics docs

* Remove redundant log messages and leftovers

* Move input ID and metrics closer

This way it's more evident the two go along together. I am also adding
comments for underline the ID will go away as we migrate the input
to the input V2 API.

* Add a guard to unregister()

* Log major lifecycle events

Adds a few more log message during major lifecycle events like
failures in event publishing and input stopping.

* Update CHANGELOG

---------

Co-authored-by: Davide Girardi <1390902+girodav@users.noreply.github.com>
  • Loading branch information
2 people authored and Scholar-Li committed Feb 5, 2024
1 parent feca7a0 commit 214fae2
Show file tree
Hide file tree
Showing 8 changed files with 417 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add device handling to Okta input package for entity analytics. {pull}36049[36049]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}36286[36286]
- [Azure] Add input metrics to the azure-eventhub input. {pull}35739[35739]

*Auditbeat*

Expand Down
19 changes: 19 additions & 0 deletions x-pack/filebeat/docs/inputs/input-azure-eventhub.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,22 @@ https://management.azure.com/ for azure PublicCloud
https://management.usgovcloudapi.net/ for azure USGovernmentCloud
Users can also use this in case of a Hybrid Cloud model, where one may define their own endpoints.

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `received_messages_total` | Number of messages received from the event hub.
| `received_bytes_total` | Number of bytes received from the event hub.
| `sanitized_messages_total` | Number of messages that were sanitized successfully.
| `processed_messages_total` | Number of messages that were processed successfully.
| `received_events_total` | Number of events received decoding messages.
| `sent_events_total` | Number of events that were sent successfully.
| `processing_time` | Histogram of the elapsed processing times in nanoseconds.
| `decode_errors_total` | Number of errors that occurred while decoding a message.
|=======
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/azureeventhub/eph.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (a *azureInput) runWithEPH() error {
ok := a.processEvents(e, "")
if !ok {
onEventErr = errors.New("OnEvent function returned false. Stopping input worker")
a.log.Debug(onEventErr.Error())
a.log.Error(onEventErr.Error())
a.Stop()
}
return onEventErr
Expand Down
124 changes: 101 additions & 23 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,59 @@ import (
"sync"
"time"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
)

const (
eventHubConnector = ";EntityPath="
expandEventListFromField = "records"
inputName = "azure-eventhub"
)

func init() {
err := input.Register(inputName, NewInput)
if err != nil {
panic(fmt.Errorf("failed to register %v input: %w", inputName, err))
}
}

// configID computes a unique ID for the input configuration.
//
// It is used to identify the input in the registry and to detect
// changes in the configuration.
//
// We will remove this function as we upgrade the input to the
// v2 API (there is an ID in the v2 context).
func configID(config *conf.C) (string, error) {
var tmp struct {
ID string `config:"id"`
}
if err := config.Unpack(&tmp); err != nil {
return "", fmt.Errorf("error extracting ID: %w", err)
}
if tmp.ID != "" {
return tmp.ID, nil
}

var h map[string]interface{}
_ = config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return "", fmt.Errorf("can not compute ID from configuration: %w", err)
}

return fmt.Sprintf("%16X", id), nil
}

// azureInput struct for the azure-eventhub input
type azureInput struct {
config azureInputConfig // azure-eventhub configuration
Expand All @@ -40,17 +77,8 @@ type azureInput struct {
workerCancel context.CancelFunc // used to signal that the worker should stop.
workerOnce sync.Once // guarantees that the worker goroutine is only started once.
processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option
}

const (
inputName = "azure-eventhub"
)

func init() {
err := input.Register(inputName, NewInput)
if err != nil {
panic(fmt.Errorf("failed to register %v input: %w", inputName, err))
}
id string // ID of the input; used to identify the input in the input metrics registry only, and will be removed once the input is migrated to v2.
metrics *inputMetrics // Metrics for the input.
}

// NewInput creates a new azure-eventhub input
Expand All @@ -64,6 +92,17 @@ func NewInput(
return nil, fmt.Errorf("reading %s input config: %w", inputName, err)
}

// Since this is a v1 input, we need to set the ID manually.
//
// We need an ID to identify the input in the input metrics
// registry.
//
// This is a temporary workaround until we migrate the input to v2.
inputId, err := configID(cfg)
if err != nil {
return nil, err
}

inputCtx, cancelInputCtx := context.WithCancel(context.Background())
go func() {
defer cancelInputCtx()
Expand All @@ -77,7 +116,8 @@ func NewInput(
// to be recreated with each restart.
workerCtx, workerCancel := context.WithCancel(inputCtx)

in := &azureInput{
in := azureInput{
id: inputId,
config: config,
log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", stripConnectionString(config.ConnectionString)),
context: inputContext,
Expand All @@ -90,7 +130,8 @@ func NewInput(
}
in.outlet = out
in.log.Infof("Initialized %s input.", inputName)
return in, nil

return &in, nil
}

// Run starts the `azure-eventhub` input and then returns.
Expand All @@ -106,6 +147,16 @@ func (a *azureInput) Run() {
// invocation.
a.workerOnce.Do(func() {
a.log.Infof("%s input worker is starting.", inputName)

// We set up the metrics in the `Run()` method and tear them down
// in the `Stop()` method.
//
// The factory method `NewInput` is not a viable solution because
// the Runner invokes it during the configuration check without
// calling the `Stop()` function; this causes panics
// due to multiple metrics registrations.
a.metrics = newInputMetrics(a.id, nil)

err := a.runWithEPH()
if err != nil {
a.log.Errorw("error starting the input worker", "error", err)
Expand All @@ -117,6 +168,7 @@ func (a *azureInput) Run() {

// Stop stops `azure-eventhub` input.
func (a *azureInput) Stop() {
a.log.Infof("%s input worker is stopping.", inputName)
if a.processor != nil {
// Tells the processor to stop processing events and release all
// resources (like scheduler, leaser, checkpointer, and client).
Expand All @@ -126,7 +178,12 @@ func (a *azureInput) Stop() {
}
}

if a.metrics != nil {
a.metrics.Close()
}

a.workerCancel()
a.log.Infof("%s input worker has stopped.", inputName)
}

// Wait stop the current server
Expand All @@ -135,35 +192,50 @@ func (a *azureInput) Wait() {
}

func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bool {
timestamp := time.Now()
processingStartTime := time.Now()
azure := mapstr.M{
// partitionID is only mapped in the non-eph option which is not available yet, this field will be temporary unavailable
//"partition_id": partitionID,
"eventhub": a.config.EventHubName,
"consumer_group": a.config.ConsumerGroup,
}
messages := a.parseMultipleMessages(event.Data)
for _, msg := range messages {

// update the input metrics
a.metrics.receivedMessages.Inc()
a.metrics.receivedBytes.Add(uint64(len(event.Data)))

records := a.parseMultipleRecords(event.Data)

for _, record := range records {
_, _ = azure.Put("offset", event.SystemProperties.Offset)
_, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber)
_, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime)
ok := a.outlet.OnEvent(beat.Event{
Timestamp: timestamp,
// this is the default value for the @timestamp field; usually the ingest
// pipeline replaces it with a value in the payload.
Timestamp: processingStartTime,
Fields: mapstr.M{
"message": msg,
"message": record,
"azure": azure,
},
Private: event.Data,
})
if !ok {
a.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds())
return ok
}

a.metrics.sentEvents.Inc()
}

a.metrics.processedMessages.Inc()
a.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds())

return true
}

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
// parseMultipleRecords will try to split the message into multiple ones based on the group field provided by the configuration
func (a *azureInput) parseMultipleRecords(bMessage []byte) []string {
var mapObject map[string][]interface{}
var messages []string

Expand All @@ -173,6 +245,7 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
// [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps
if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
bMessage = sanitize(bMessage, a.config.SanitizeOptions...)
a.metrics.sanitizedMessages.Inc()
}

// check if the message is a "records" object containing a list of events
Expand All @@ -183,6 +256,7 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
a.metrics.receivedEvents.Inc()
} else {
a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err)
}
Expand All @@ -196,17 +270,21 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
if err != nil {
// return entire message
a.log.Debugf("deserializing multiple messages to an array returning error: %s", err)
a.metrics.decodeErrors.Inc()
return []string{string(bMessage)}
}

for _, ms := range arrayObject {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
a.metrics.receivedEvents.Inc()
} else {
a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err)
}
}
}

return messages
}

Expand Down
29 changes: 22 additions & 7 deletions x-pack/filebeat/input/azureeventhub/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/stretchr/testify/assert"
Expand All @@ -39,9 +40,14 @@ func TestProcessEvents(t *testing.T) {
if err != nil {
t.Fatal(err)
}
reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg)
defer metrics.Close()

input := azureInput{
config: config,
outlet: out,
config: config,
metrics: metrics,
outlet: out,
}
var sn int64 = 12
now := time.Now()
Expand Down Expand Up @@ -73,7 +79,7 @@ func TestProcessEvents(t *testing.T) {
assert.Equal(t, message, single)
}

func TestParseMultipleMessages(t *testing.T) {
func TestParseMultipleRecords(t *testing.T) {
// records object
msg := "{\"records\":[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
Expand All @@ -83,8 +89,17 @@ func TestParseMultipleMessages(t *testing.T) {
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
}
input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))}
messages := input.parseMultipleMessages([]byte(msg))

reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg)
defer metrics.Close()

input := azureInput{
metrics: metrics,
log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)),
}

messages := input.parseMultipleRecords([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
Expand All @@ -95,7 +110,7 @@ func TestParseMultipleMessages(t *testing.T) {
msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]"
messages = input.parseMultipleMessages([]byte(msg1))
messages = input.parseMultipleRecords([]byte(msg1))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
Expand All @@ -104,7 +119,7 @@ func TestParseMultipleMessages(t *testing.T) {

// one event only
msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"
messages = input.parseMultipleMessages([]byte(msg2))
messages = input.parseMultipleRecords([]byte(msg2))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 1)
for _, ms := range messages {
Expand Down

0 comments on commit 214fae2

Please sign in to comment.