Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Beats shipper output shouldn't keep pending events in the queue #97

Closed
Tracked by #9
faec opened this issue Aug 17, 2022 · 2 comments · Fixed by elastic/beats#34377
Closed
Tracked by #9

Beats shipper output shouldn't keep pending events in the queue #97

faec opened this issue Aug 17, 2022 · 2 comments · Fixed by elastic/beats#34377
Assignees
Labels
Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Comments

@faec
Copy link
Contributor

faec commented Aug 17, 2022

ACK handling is being implemented in the shipper and its clients. This means Beats inputs that feed into the shipper output don't receive their acknowledgment callbacks until the shipper has sent the events upstream.

This means that currently, pending events are enqueued both in Beats and in the shipper. Neither one can safely remove them, because it isn't known until the response comes back whether it will need to retry.

There isn't a real need for Beats to do that in this case, though, because as soon as the event has been handed off to the shipper it's already in a local queue with proper retry behavior. The Beats pipeline should accommodate this case by separating producer acknowledgments (which are how inputs track progress) from queue allocations.

One way to do this that would probably require minimal modification of existing APIs is to create a proxy object implementing queue.Queue and used only for the shipper output, that instead of genuinely queueing would just synchronously Publish as batches are assembled, discarding the event data after a successful handoff, but still propagating event acknowledgment back to its producers afterwards. (This is essentially the same pattern we expect standalone shipper clients to follow.)

One complication is that batch assembly typically preserves pointers to the events inside the batch, so even if it isn't explicitly enqueued, the memory can still be used. That will probably require some extra handling or special call in the shipper output to separate freeing the memory from acknowledgment.

@cmacknz
Copy link
Member

cmacknz commented Aug 17, 2022

This issue originated in this comment chain: elastic/beats#32708 (comment)

I think within the Beats' shipper client we will additionally make listening to persistent index changes asynchronous with respect to publishing events to the shipper. The current implementation waits for an acknowledgement before returning from the Publish call. See the comment here for one possible way to achieve this elastic/beats#32708 (comment):

I think it's a matter of returning immediately from the publish call (without updating the persisted index or acking / cancelling the batch) -- then save the outstanding batches in shipper and run a PersistedIndex listener in parallel, and when an update comes in then call batch.ACK() on the batches that are covered. That way the beats pipeline will keep sending us batches after we finish the publish RPC.

Edit: This work is not captured in #98

@cmacknz
Copy link
Member

cmacknz commented Jan 11, 2023

@faec and I spoke today and the current plan is to implement a "proxy queue" that implements the queue interface but doesn't actually enqueue anything, delegating the queueing to the shipper.

The tricky part will be knowing when to configure this. I think we can trigger configuration of this whenever the Beat is configured to use the shipper output type. We should be able to detect this happening by hooking into the output reload logic that is invoked whenever the agent adds/modifies/removes the configured output type: https://github.com/elastic/beats/blob/3cbbb8a203e238f6b9b73d73cd7a0901d36527d8/x-pack/libbeat/management/managerV2.go#L485-L515

func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error {
	// Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go
	output := cm.registry.GetReloadableOutput()
	if output == nil {
		return fmt.Errorf("failed to find beat reloadable type 'output'")
	}

	if unit == nil {
		// output is being stopped
		err := output.Reload(nil)
		if err != nil {
			return fmt.Errorf("failed to reload output: %w", err)
		}
		cm.lastOutputCfg = nil
		return nil
	}

	if cm.stopOnOutputReload && cm.outputIsConfigured {
		cm.logger.Info("beat is restarting because output changed")
		_ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil)
		cm.Stop()
		return nil
	}

	_, _, rawConfig := unit.Expected()
	if rawConfig == nil {
		return fmt.Errorf("output unit has no config")
	}
	cm.logger.Debugf("Got output unit config '%s'", rawConfig.GetId())

	reloadConfig, err := groupByOutputs(rawConfig)
	if err != nil {
		return fmt.Errorf("failed to generate config for output: %w", err)
	}

Note that right now the Beat will restart itself whenever the output needs to change, such that the output it should be running with is the first output configured after the Beat starts. This is a work around for a collection of old bugs in the output reload logic: elastic/elastic-agent#1913

An additional complication here will be that queue type is configured at init time when loading the pipeline, but the code path above is only hit after the Beat has been initialized and connected to the Elastic agent some arbitrary time after initialization has completed.

https://github.com/elastic/beats/blob/ce10bc4c84a497b197d67c7b219d358c0392c4ad/libbeat/cmd/instance/beat.go#L347-L354

	if settings.InputQueueSize > 0 {
		publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings)
	} else {
		publisher, err = pipeline.Load(b.Info, monitors, b.Config.Pipeline, b.processing, outputFactory)
	}
	if err != nil {
		return nil, fmt.Errorf("error initializing publisher: %w", err)
	}

When the Beat is started with management enabled because it was started by agent, it configures an empty config file and waits for the initial configuration to be sent by the Elastic agent: https://github.com/elastic/beats/blob/ce10bc4c84a497b197d67c7b219d358c0392c4ad/libbeat/cfgfile/cfgfile.go#L145-L167

	if !fleetmode.Enabled() {
		if path == "" {
			list := []string{}
			for _, cfg := range configfiles.List() {
				if !filepath.IsAbs(cfg) {
					list = append(list, filepath.Join(cfgpath, cfg))
				} else {
					list = append(list, cfg)
				}
			}
			c, err = common.LoadFiles(list...)
		} else {
			if !filepath.IsAbs(path) {
				path = filepath.Join(cfgpath, path)
			}
			c, err = common.LoadFile(path)
		}
		if err != nil {
			return nil, err
		}
	} else {
		c = config.NewConfig()
	}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants