Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk support for extensions logging integration. #99

Merged
merged 12 commits into from
Aug 11, 2020

Conversation

Mpdreamz
Copy link
Member

Continuation of #97

Introduces batching of log events before indexing into Elasticsearch. Powered by System.Threading.Channels. It supports sending every N items or every M interval after the first item was added.

This also introduces the ability to have multiple consumers drain the channel concurrently.

A callback now exists when items are being dropped because the channel is full and when a _bulk request has occurred.

We still need to add

  • _bulk failure handling
  • _bulk retry handling (exponential backoff)
  • _bulk rejection callback for LogEvent

Since this PR and #97 are big enough already will tackle the remained when we pull in the flight PR's.

This also includes a modified version of the example project from @sgryphon 's PR: #69

It also includes a high volume worker that can be started with dotnet run -- high.

@apmmachine
Copy link
Contributor

apmmachine commented Jul 20, 2020

💚 Build Succeeded

Pipeline View Test View Changes Artifacts preview

Expand to view the summary

Build stats

  • Build Cause: [Pull request #99 updated]

  • Start Time: 2020-08-05T08:57:27.801+0000

  • Duration: 10 min 45 sec

Test stats 🧪

Test Results
Failed 0
Passed 62
Skipped 4
Total 66

Copy link
Contributor

@russcam russcam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone through and left some comments

if (_options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(_options.IndexOffset.Value);

var index = string.Format(_options.Index, indexTime);
var indexHeader = new { index = new { _index = index } };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would there be some benefit to introducing a type for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There would! Was holding off on dedicated types when following this up with retries, backoff and response failure callbacks.

I'm in two minds to spin off ElasticsearchDataShipper as its own package which would benefit from some more typing and control for the end user as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as these integrations continue to develop, a separate shipper package would probably be a good idea, to share across integrations, and allow users to use the shipper for their own integrations.

src/Elasticsearch.Extensions.Logging/ConsumerBuffer.cs Outdated Show resolved Hide resolved
@Mpdreamz Mpdreamz requested a review from russcam August 3, 2020 14:45
Copy link
Contributor

@russcam russcam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few more comments

/// If <see cref="MaxInFlightMessages"/> is reached, <see cref="LogEvent"/>'s will fail to be published to the channel. You can be notified of dropped
/// events with this callback
/// </summary>
public Action<LogEvent> PublishRejectionCallback { get; set; } = e => { };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public Action<LogEvent> PublishRejectionCallback { get; set; } = e => { };
public Action<LogEvent> WriteFailedCallback { get; set; } = e => { };

Maybe could use an event for this?

if (_options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(_options.IndexOffset.Value);

var index = string.Format(_options.Index, indexTime);
var indexHeader = new { index = new { _index = index } };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as these integrations continue to develop, a separate shipper package would probably be a good idea, to share across integrations, and allow users to use the shipper for their own integrations.

// DropWrite will make `TryWrite` always return true, which is not what we want.
FullMode = BoundedChannelFullMode.Wait
});
async Task ConsumeMessages() =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking along the lines of

private readonly List<Task> _backgroundTasks =  new List<Task>();
for (var i = 0; i < maxConsumers; i++)
    _backgroundTasks.Add(Task.Factory.StartNew(() => Consume(options), TaskCreationOptions.LongRunning));

with Consume taking ElasticsearchLoggerOptions so that changes to options will affect consumers (might be unexpected if changes to options.Throttles don't affect existing consumers).

Base automatically changed from feature/ext-logging-improvements to master August 5, 2020 08:41
Mpdreamz and others added 10 commits August 5, 2020 10:46
- Add some basic integration tests to assert the logs make it in the right index
- Rename `ElasticsearchDataProcessor` to `ElasticsearchDataShipper`
- Move enrichtment of `LogEvent` out of the shipper and into static helper class.
- Rely on elasticsearch to generate ids
- Move to PostData.StreamHandler to serialize directly to the IO stream.
Co-authored-by: Russ Cam <russ.cam@elastic.co>
@Mpdreamz
Copy link
Member Author

Mpdreamz commented Aug 5, 2020

@russcam I think this is ready for another round. Your comment about throttles and consume task options is a good one but the throttle options are documented as not picking changes. In general I would like to minimizing support live config changes as much as we need to.

I don't want to support scaling consumers and buffer sizes just yet, most likely worth their own PR 😄

@Mpdreamz Mpdreamz merged commit b4b207b into master Aug 11, 2020
@Mpdreamz Mpdreamz deleted the feature/ext-logging-bulk branch August 11, 2020 07:50
@russcam russcam removed the v1.5.3 label Jun 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request v1.6.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants