Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic changes to max_merge_count are now picked up by index throttling #8136

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -272,7 +272,7 @@ public void start() throws EngineException {
try {
this.indexWriter = createWriter();
mergeScheduler.removeListener(this.throttle);
this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), logger);
this.throttle = new IndexThrottle(mergeScheduler, logger);
mergeScheduler.addListener(throttle);
} catch (IOException e) {
maybeFailEngine(e, "start");
Expand Down Expand Up @@ -844,7 +844,7 @@ public void flush(Flush flush) throws EngineException {
currentIndexWriter().close(false);
indexWriter = createWriter();
mergeScheduler.removeListener(this.throttle);
this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), this.logger);
this.throttle = new IndexThrottle(mergeScheduler, this.logger);
mergeScheduler.addListener(throttle);
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
Expand Down Expand Up @@ -1722,13 +1722,13 @@ private static final class IndexThrottle implements MergeSchedulerProvider.Liste
private final InternalLock lockReference = new InternalLock(new ReentrantLock());
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();
private final int maxNumMerges;
private final MergeSchedulerProvider mergeScheduler;
private final ESLogger logger;

private volatile InternalLock lock = NOOP_LOCK;

public IndexThrottle(int maxNumMerges, ESLogger logger) {
this.maxNumMerges = maxNumMerges;
public IndexThrottle(MergeSchedulerProvider mergeScheduler, ESLogger logger) {
this.mergeScheduler = mergeScheduler;
this.logger = logger;
}

Expand All @@ -1738,6 +1738,7 @@ public Releasable acquireThrottle() {

@Override
public synchronized void beforeMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMerges();
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
if (isThrottling.getAndSet(true) == false) {
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
Expand All @@ -1748,6 +1749,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {

@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMerges();
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
Expand Down