Skip to content

Commit

Permalink
Waiting for index range calculation before switching deflector alias.
Browse files Browse the repository at this point in the history
Fixes #2264.
  • Loading branch information
dennisoelkers committed May 24, 2016
1 parent 7b48573 commit 2fc09b8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
3 changes: 3 additions & 0 deletions graylog2-server/src/main/java/org/graylog2/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ public class Configuration extends BaseConfiguration {
@Parameter(value = "index_ranges_cleanup_interval", validator = PositiveDurationValidator.class)
private Duration indexRangesCleanupInterval = Duration.hours(1L);

@Parameter(value = "deflector_index_range_calculation_timeout", validator = PositiveDurationValidator.class)
private Duration deflectorIndexRangeCalculationTimeout = Duration.hours(1L);

public boolean isMaster() {
return isMaster;
}
Expand Down
29 changes: 25 additions & 4 deletions graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -58,6 +61,7 @@ public class Deflector { // extends Ablenkblech
private final SystemJobManager systemJobManager;
private final ActivityWriter activityWriter;
private final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory;
private final Duration deflectorIndexRangeCalculationTimeout;
private final String indexPrefix;
private final String deflectorName;
private final Indices indices;
Expand All @@ -71,13 +75,15 @@ public Deflector(final SystemJobManager systemJobManager,
final ActivityWriter activityWriter,
final SetIndexReadOnlyJob.Factory indexReadOnlyJobFactory,
final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory,
final Indices indices) {
final Indices indices,
@Named("deflector_index_range_calculation_timeout") final Duration deflectorIndexRangeCalculationTimeout) {
this.indexPrefix = indexPrefix;

this.systemJobManager = systemJobManager;
this.activityWriter = activityWriter;
this.indexReadOnlyJobFactory = indexReadOnlyJobFactory;
this.createNewSingleIndexRangeJobFactory = createNewSingleIndexRangeJobFactory;
this.deflectorIndexRangeCalculationTimeout = deflectorIndexRangeCalculationTimeout;

this.deflectorName = buildName(indexPrefix);
this.indices = indices;
Expand Down Expand Up @@ -155,9 +161,22 @@ public void cycle() {
pointTo(newTarget);
activity.setMessage("Cycled deflector from <none> to <" + newTarget + ">");
} else {
final SystemJobManager.ScheduleResult scheduleResult = addSingleIndexRanges(oldTarget);
if (scheduleResult != null) {
try {
LOG.debug("Waiting for index calculation to complete");
scheduleResult.getFuture().get(deflectorIndexRangeCalculationTimeout.toMillis(), TimeUnit.MILLISECONDS);
LOG.debug("Completed!");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Unable to calculate index range of index which is to be cycled: ", e);
}
} else {
LOG.warn("Unable to schedule job calculating index range of index which is to be cycled.");
}

// Re-pointing from existing old index to the new one.
LOG.debug("Now switching over deflector alias.");
pointTo(newTarget, oldTarget);
addSingleIndexRanges(oldTarget);

// perform these steps after a delay, so we don't race with indexing into the alias
// it can happen that an index request still writes to the old deflector target, while we cycled it above.
Expand Down Expand Up @@ -267,14 +286,16 @@ public void pointTo(final String newIndex) {
indices.cycleAlias(getName(), newIndex);
}

private void addSingleIndexRanges(String indexName) {
@Nullable
private SystemJobManager.ScheduleResult addSingleIndexRanges(String indexName) {
try {
systemJobManager.submit(createNewSingleIndexRangeJobFactory.create(this, indexName));
return systemJobManager.submit(createNewSingleIndexRangeJobFactory.create(this, indexName));
} catch (SystemJobConcurrencyException e) {
final String msg = "Could not calculate index ranges for index " + indexName + " after cycling deflector: Maximum concurrency of job is reached.";
activityWriter.write(new Activity(msg, Deflector.class));
LOG.error(msg, e);
}
return null;
}

@Nullable
Expand Down

0 comments on commit 2fc09b8

Please sign in to comment.