Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asynchronous flush support #82

Merged
merged 1 commit into from
Aug 6, 2021
Merged

Conversation

jaredcnance
Copy link
Member

@jaredcnance jaredcnance commented Jul 27, 2021

Adds asynchronous flushing, automatic retries for transient socket errors, and graceful shutdown.

Design Notes / Other Considerations:

  • I considered abstracting the asynchronous bits out into a separate type or into an abstract base class. However, the flush operation and error handling is currently tightly coupled to the TCP / UDP clients and there didn't seem to be much value in creating an abstraction just yet which is why I built it directly into the AgentSink.
  • The shutdown interface returns a CompletableFuture. An alternative to this is providing an (long timeout, TimeUnit unit) signature similar to awaitTermination which would allow us to use that interface directly when shutting down the executor. However, this creates an awkward API that doesn't easily support composition with other tasks to run in parallel.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

build.gradle Outdated Show resolved Hide resolved
@jaredcnance jaredcnance changed the title First pass at adding asynchronous flush support Add asynchronous flush support Jul 27, 2021
@jaredcnance jaredcnance force-pushed the jared/add-async-flushing branch 5 times, most recently from 9a934b4 to f7533b8 Compare July 30, 2021 02:13
SocketClientFactory clientFactory) {
SocketClientFactory clientFactory,
int asyncQueueDepth,
Supplier<RetryStrategy> retryStrategy) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👆Breaking change.

* @return a future that completes when the shutdown has completed successfully and all pending
* messages have been sent to the destination.
*/
CompletableFuture<Void> shutdown();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👆Breaking change


/**
* The max number of messages to hold in memory in case of transient socket errors. The maximum
* message size is 256 KB meaning the maximum size of this buffer would be 25,600 MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

25,600 KB?

@jaredcnance jaredcnance merged commit aaa10c6 into master Aug 6, 2021
@jaredcnance jaredcnance deleted the jared/add-async-flushing branch August 6, 2021 01:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants