diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c8d3140dbfc..e5b7e706db8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -100,6 +100,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748] - Fix errors and panics due to re-used processors {pull}34761[34761] - Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689] +- [Gcs Input] - Added missing locks for safe concurrency {pull}34914[34914] - Fix the ignore_inactive option being ignored in Filebeat's filestream input {pull}34770[34770] *Heartbeat* diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index 3ca5b48a2d7..118e89287ac 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -15,6 +15,7 @@ import ( "errors" "fmt" "io" + "sync" "time" "unicode" @@ -27,6 +28,8 @@ import ( ) type job struct { + // Mutex lock for concurrent publishes + mu sync.Mutex // gcs bucket handle bucket *storage.BucketHandle // gcs object attribute struct @@ -107,9 +110,12 @@ func (j *job) do(ctx context.Context, id string) { } event.SetID(objectID(j.hash, 0)) j.state.save(j.object.Name, j.object.Updated) + // locks while data is being published to avoid concurrent map read/writes + j.mu.Lock() if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } + j.mu.Unlock() } } @@ -217,9 +223,12 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // partially saves read state using offset j.state.savePartial(j.object.Name, offset+relativeOffset) } + // locks while data is being published to avoid concurrent map read/writes + j.mu.Lock() if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } + j.mu.Unlock() } return nil } diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 880771247ff..7feb57f7c1e 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -86,7 +86,7 @@ func (l *limiter) wait() { l.wg.Wait() } -// release puts pack a worker thread. +// release puts back a worker thread. func (l *limiter) release() { <-l.limit l.wg.Done() @@ -167,7 +167,7 @@ func (s *scheduler) fetchObjectPager(ctx context.Context, pageSize int) *iterato } // moveToLastSeenJob, moves to the latest job position past the last seen job -// Jobs are stored in lexicographical order always , hence the latest position can be found either on the basis of job name or timestamp +// Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { var latestJobs []*job jobsToReturn := make([]*job, 0) diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index 6b26dfa2dfa..6b2a269481f 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -73,12 +73,16 @@ func (s *state) save(name string, lastModifiedOn time.Time) { // setRootArray, sets boolean true for objects that have their roots defined as an array type func (s *state) setRootArray(name string) { + s.mu.Lock() s.cp.IsRootArray[name] = true + s.mu.Unlock() } // savePartial, partially saves/updates the current state for cursor checkpoint func (s *state) savePartial(name string, offset int64) { + s.mu.Lock() s.cp.LastProcessedOffset[name] = offset + s.mu.Unlock() } // updateFailedJobs, adds a job name to a failedJobs map, which helps @@ -87,11 +91,11 @@ func (s *state) savePartial(name string, offset int64) { // A failed job will be re-tried a maximum of 3 times after which the // entry is removed from the map func (s *state) updateFailedJobs(jobName string) { + s.mu.Lock() // we do not store partially processed jobs as failed jobs if _, ok := s.cp.LastProcessedOffset[jobName]; ok { return } - s.mu.Lock() s.cp.FailedJobs[jobName]++ if s.cp.FailedJobs[jobName] > maxFailedJobRetries { delete(s.cp.FailedJobs, jobName) @@ -100,7 +104,18 @@ func (s *state) updateFailedJobs(jobName string) { } // setCheckpoint, sets checkpoint from source to current state instance +// If for some reason the current state is empty, assigns new states as +// a fail safe mechanism func (s *state) setCheckpoint(chkpt *Checkpoint) { + if chkpt.FailedJobs == nil { + chkpt.FailedJobs = make(map[string]int) + } + if chkpt.IsRootArray == nil { + chkpt.IsRootArray = make(map[string]bool) + } + if chkpt.LastProcessedOffset == nil { + chkpt.LastProcessedOffset = make(map[string]int64) + } s.cp = chkpt }