Skip to content

ReindexAll() improvements #2462

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 7, 2016
Merged

ReindexAll() improvements #2462

merged 2 commits into from
Dec 7, 2016

Conversation

Mpdreamz
Copy link
Member

@Mpdreamz Mpdreamz commented Dec 6, 2016

ReindexAll() is now composed over the IObservables from ScrollAll()
and BulkAll() taking advantage of both's build in concurrency.

Since the rate at which the scrolls produces far exceeds the rate at which we can consume them through bulks ReindexAll() also implements a producerconsumer rate limiter.
This is controlled by a backPressureFactor which controls the max amplification factor of running scrolls with the safe guard of:

searchSize * maxConcurrency * backPressureFactor >= bulkSize

Otherwise not enough scrolls would be spawned to feed a single bulk
request. Obviously if the scroll observable calls on complete it will
flush out any pending scrolls and feed it to BulkAll().

@Mpdreamz
Copy link
Member Author

Mpdreamz commented Dec 6, 2016

remote: Resolving deltas: 100% (10/10), completed with 10 local objects.
remote: Unexpected system error after push was received.
remote: These changes may not be reflected on github.com!
remote: Your unique error code: df69348cdee013b3cc32ec98aa672431

This PR seems to be missing my force push :( never seen this before

@Mpdreamz
Copy link
Member Author

Mpdreamz commented Dec 6, 2016

OK that message can happen when github fails to update its caches on serverside commit hook. If you are looking at a diff on github.com that has limiter misspelled you are not reviewing the right commit. Please review locally. (commit list lists the correct sha numbers on here though).

Copy link
Contributor

@russcam russcam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just a few minor nitpicks 😃

  • Some of the types introduced are not in the Nest namespace
  • Reindex API has a few warnings with xml comments
  • Reindex API does not adhere to Tests.CodeStandards.ElasticClientStandards.ConsistentFluentParameterNames

@@ -31,6 +34,7 @@ public Inferrer(IConnectionSettingsValues connectionSettings)
this.CreateMultiHitDelegates = new ConcurrentDictionary<Type, Action<MultiGetHitJsonConverter.MultiHitTuple, JsonSerializer, ICollection<IMultiGetHit<object>>>>();
this.CreateSearchResponseDelegates = new ConcurrentDictionary<Type, Action<MultiSearchResponseJsonConverter.SearchHitTuple, JsonSerializer, IDictionary<string, object>>>();
}
public string Resolve(IUrlParameter urlParameter) => urlParameter.GetString(this._connectionSettings);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -1,8 +1,15 @@
using System;


namespace Nest
namespace Nest.CommonAbstractions.Reactive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace should be Nest

using System.Collections.Generic;
using System.Threading;

namespace Nest.CommonAbstractions.Reactive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace should be Nest

using System.Collections.Generic;
using System.Linq;

namespace Nest.CommonAbstractions.Reactive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace should be Nest

using System.Threading;
using System.Threading.Tasks;

namespace Nest.CommonAbstractions.Reactive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace should be Nest

{
/// <summary>
/// Simple back pressure implementation that makes sure the minimum max concurrency between producer and consumer
/// is not amplified by the greedier of the by more then the backPressureFactor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...greedier of the two..."

/// <see cref="MaxDegreeOfParallelism"/> if set. Not that the consumer has to call <see cref="ProducerConsumerBackPressure.Release"/>
/// on the same instance every time it is done.
/// <param name="maxConcurrency">The minimum maximum concurrency which would be the bottleneck of the producer consumer pipeline</param>
/// <param name="backPressureFactor">The maximum amplification back pressure of the greedier part of the producer consumer pipeline</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should point out what the default backPressureFactor is if unspecified.

ReindexAll() is now composed over the IObservables from `ScrollAll()`
and `BulkAll()` taking advantage of both's build in concurrency.

Since the rate at which the scroll's will come in far exceeds the rate
at which we can bulk the `ReindexAll()` also implements a producer
consumer rate limitter. This is controlled by a `backPressureFactor`
which controls the max amplification factor of running scrolls with
the safe guard of:

searchSize * maxConcurrency * backPressureFactor >= bulkSize

Otherwise not enough scrolls would be spawned to feed a single bulk
request. Obviously if the scroll observable calls on complete it will
flush out any pending scrolls and feed it to `BulkAll()`.
@Mpdreamz Mpdreamz force-pushed the feature/reindex-next-level branch from 6d07c32 to 92ec781 Compare December 7, 2016 08:32
@Mpdreamz Mpdreamz changed the base branch from feature/scroll-all to master December 7, 2016 08:33
…lt for backPressureFactor and small documentation touch ups
@Mpdreamz Mpdreamz merged commit 3e58310 into master Dec 7, 2016
Mpdreamz added a commit that referenced this pull request Dec 7, 2016
* ReindexAll() improvements

ReindexAll() is now composed over the IObservables from `ScrollAll()`
and `BulkAll()` taking advantage of both's build in concurrency.

Since the rate at which the scroll's will come in far exceeds the rate
at which we can bulk the `ReindexAll()` also implements a producer
consumer rate limitter. This is controlled by a `backPressureFactor`
which controls the max amplification factor of running scrolls with
the safe guard of:

searchSize * maxConcurrency * backPressureFactor >= bulkSize

Otherwise not enough scrolls would be spawned to feed a single bulk
request. Obviously if the scroll observable calls on complete it will
flush out any pending scrolls and feed it to `BulkAll()`.

* implemented pr review feedback, namespace changes, document the default for backPressureFactor and small documentation touch ups
@Mpdreamz
Copy link
Member Author

Mpdreamz commented Dec 7, 2016

ported to 5.x but was to eager and did not fix the xml doc warning and the test will do so now on both branches

@Mpdreamz Mpdreamz deleted the feature/reindex-next-level branch December 7, 2016 08:42
@russcam russcam added the v5.0.0 label Dec 13, 2016
awelburn pushed a commit to Artesian/elasticsearch-net that referenced this pull request Nov 6, 2017
* ReindexAll() improvements

ReindexAll() is now composed over the IObservables from `ScrollAll()`
and `BulkAll()` taking advantage of both's build in concurrency.

Since the rate at which the scroll's will come in far exceeds the rate
at which we can bulk the `ReindexAll()` also implements a producer
consumer rate limitter. This is controlled by a `backPressureFactor`
which controls the max amplification factor of running scrolls with
the safe guard of:

searchSize * maxConcurrency * backPressureFactor >= bulkSize

Otherwise not enough scrolls would be spawned to feed a single bulk
request. Obviously if the scroll observable calls on complete it will
flush out any pending scrolls and feed it to `BulkAll()`.

* implemented pr review feedback, namespace changes, document the default for backPressureFactor and small documentation touch ups
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants