Skip to content

Commit

Permalink
[Filebeat] ETW input - use errgroup (#38009)
Browse files Browse the repository at this point in the history
Use errgroup to wait on the ETW consumer routine.

Use sync.OnceFunc to wrap the Close() func for the ETW session.

Clarify a few log messages (follow-up to #36915)
- #36915 (comment)
- #36915 (comment)
  • Loading branch information
andrewkroh committed Feb 21, 2024
1 parent 8214f9f commit 85e4e46
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 65 deletions.
95 changes: 40 additions & 55 deletions x-pack/filebeat/input/etw/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"golang.org/x/sync/errgroup"
"golang.org/x/sys/windows"
)

Expand Down Expand Up @@ -66,6 +67,7 @@ type etwInput struct {
log *logp.Logger
config config
etwSession *etw.Session
publisher stateless.Publisher
operator sessionOperator
}

Expand Down Expand Up @@ -105,10 +107,13 @@ func (e *etwInput) Run(ctx input.Context, publisher stateless.Publisher) error {
if err != nil {
return fmt.Errorf("error initializing ETW session: %w", err)
}
e.etwSession.Callback = e.consumeEvent
e.publisher = publisher

// Set up logger with session information
e.log = ctx.Logger.With("session", e.etwSession.Name)
e.log.Info("Starting " + inputName + " input")
defer e.log.Info(inputName + " input stopped")

// Handle realtime session creation or attachment
if e.etwSession.Realtime {
Expand All @@ -125,71 +130,31 @@ func (e *etwInput) Run(ctx input.Context, publisher stateless.Publisher) error {
if err != nil {
return fmt.Errorf("realtime session could not be created: %w", err)
}
e.log.Debug("created session")
e.log.Debug("created new session")
}
}
// Defer the cleanup and closing of resources
var wg sync.WaitGroup
var once sync.Once

// Create an error channel to communicate errors from the goroutine
errChan := make(chan error, 1)
stopConsumer := sync.OnceFunc(e.Close)
defer stopConsumer()

defer func() {
once.Do(e.Close)
e.log.Info(inputName + " input stopped")
// Stop the consumer upon input cancellation (shutdown).
go func() {
<-ctx.Cancelation.Done()
stopConsumer()
}()

// eventReceivedCallback processes each ETW event
eventReceivedCallback := func(record *etw.EventRecord) uintptr {
if record == nil {
e.log.Error("received null event record")
return 1
}

e.log.Debugf("received event %d with length %d", record.EventHeader.EventDescriptor.Id, record.UserDataLength)

data, err := etw.GetEventProperties(record)
if err != nil {
e.log.Errorw("failed to read event properties", "error", err)
return 1
}

evt := buildEvent(data, record.EventHeader, e.etwSession, e.config)
publisher.Publish(evt)

return 0
}

// Set the callback function for the ETW session
e.etwSession.Callback = eventReceivedCallback

// Start a goroutine to consume ETW events
wg.Add(1)
go func() {
defer wg.Done()
e.log.Debug("starting to listen ETW events")
g := new(errgroup.Group)
g.Go(func() error {
e.log.Debug("starting ETW consumer")
defer e.log.Debug("stopped ETW consumer")
if err = e.operator.startConsumer(e.etwSession); err != nil {
errChan <- fmt.Errorf("failed to start consumer: %w", err) // Send error to channel
return
return fmt.Errorf("failed running ETW consumer: %w", err)
}
e.log.Debug("stopped to read ETW events from session")
errChan <- nil
}()
return nil
})

// We ensure resources are closed when receiving a cancellation signal
go func() {
<-ctx.Cancelation.Done()
once.Do(e.Close)
}()

wg.Wait() // Ensure all goroutines have finished before closing
close(errChan)
if err, ok := <-errChan; ok && err != nil {
return err
}

return nil
return g.Wait()
}

var (
Expand Down Expand Up @@ -271,6 +236,26 @@ func convertFileTimeToGoTime(fileTime64 uint64) time.Time {
return time.Unix(0, fileTime.Nanoseconds()).UTC()
}

func (e *etwInput) consumeEvent(record *etw.EventRecord) uintptr {
if record == nil {
e.log.Error("received null event record")
return 1
}

e.log.Debugf("received event with ID %d and user-data length %d", record.EventHeader.EventDescriptor.Id, record.UserDataLength)

data, err := etw.GetEventProperties(record)
if err != nil {
e.log.Errorw("failed to read event properties", "error", err)
return 1
}

evt := buildEvent(data, record.EventHeader, e.etwSession, e.config)
e.publisher.Publish(evt)

return 0
}

// Close stops the ETW session and logs the outcome.
func (e *etwInput) Close() {
if err := e.operator.stopSession(e.etwSession); err != nil {
Expand Down
17 changes: 10 additions & 7 deletions x-pack/filebeat/input/etw/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func Test_RunEtwInput_AttachToExistingSessionError(t *testing.T) {
mockSession := &etw.Session{
Name: "MySession",
Realtime: true,
NewSession: false}
NewSession: false,
}
return mockSession, nil
}
// Setup the mock behavior for AttachToExistingSession
Expand Down Expand Up @@ -146,7 +147,8 @@ func Test_RunEtwInput_CreateRealtimeSessionError(t *testing.T) {
mockSession := &etw.Session{
Name: "MySession",
Realtime: true,
NewSession: true}
NewSession: true,
}
return mockSession, nil
}
// Setup the mock behavior for AttachToExistingSession
Expand Down Expand Up @@ -189,7 +191,8 @@ func Test_RunEtwInput_StartConsumerError(t *testing.T) {
mockSession := &etw.Session{
Name: "MySession",
Realtime: true,
NewSession: true}
NewSession: true,
}
return mockSession, nil
}
// Setup the mock behavior for AttachToExistingSession
Expand Down Expand Up @@ -232,7 +235,7 @@ func Test_RunEtwInput_StartConsumerError(t *testing.T) {

// Run test
err := etwInput.Run(inputCtx, nil)
assert.EqualError(t, err, "failed to start consumer: mock error")
assert.EqualError(t, err, "failed running ETW consumer: mock error")
}

func Test_RunEtwInput_Success(t *testing.T) {
Expand All @@ -244,7 +247,8 @@ func Test_RunEtwInput_Success(t *testing.T) {
mockSession := &etw.Session{
Name: "MySession",
Realtime: true,
NewSession: true}
NewSession: true,
}
return mockSession, nil
}
// Setup the mock behavior for AttachToExistingSession
Expand Down Expand Up @@ -471,7 +475,6 @@ func Test_buildEvent(t *testing.T) {
assert.Equal(t, tt.expected["event.severity"], mapEv["event.severity"])
assert.Equal(t, tt.expected["log.file.path"], mapEv["log.file.path"])
assert.Equal(t, tt.expected["log.level"], mapEv["log.level"])

})
}
}
Expand All @@ -495,7 +498,7 @@ func Test_convertFileTimeToGoTime(t *testing.T) {
{
name: "TestActualDate",
fileTime: 133515900000000000, // February 05, 2024, 7:00:00 AM
want: time.Date(2024, 02, 05, 7, 0, 0, 0, time.UTC),
want: time.Date(2024, 0o2, 0o5, 7, 0, 0, 0, time.UTC),
},
}

Expand Down
4 changes: 1 addition & 3 deletions x-pack/libbeat/reader/etw/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,8 @@ func (s *Session) StartConsumer() error {
// Open an ETW trace processing handle for consuming events
// from an ETW real-time trace session or an ETW log file.
s.traceHandler, err = s.openTrace(&elf)

switch {
case err == nil:

// Handle specific errors for trace opening.
case errors.Is(err, ERROR_BAD_PATHNAME):
return fmt.Errorf("invalid log source when opening trace: %w", err)
Expand All @@ -241,10 +239,10 @@ func (s *Session) StartConsumer() error {
default:
return fmt.Errorf("failed to open trace: %w", err)
}

// Process the trace. This function blocks until processing ends.
if err := s.processTrace(&s.traceHandler, 1, nil, nil); err != nil {
return fmt.Errorf("failed to process trace: %w", err)
}

return nil
}

0 comments on commit 85e4e46

Please sign in to comment.