Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Waiting for index range calculation before switching deflector alias. #2278

Merged
merged 18 commits into from Jun 9, 2016
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions graylog2-server/src/main/java/org/graylog2/Configuration.java
Expand Up @@ -156,6 +156,9 @@ public class Configuration extends BaseConfiguration {
@Parameter(value = "index_ranges_cleanup_interval", validator = PositiveDurationValidator.class)
private Duration indexRangesCleanupInterval = Duration.hours(1L);

@Parameter(value = "deflector_index_read_only_timeout", validator = PositiveDurationValidator.class)
private Duration deflectorIndexReadOnlyTimeout = Duration.hours(1L);

@Parameter(value = "trusted_proxies", converter = IPSubnetConverter.class)
private Set<IpSubnet> trustedProxies = Collections.emptySet();

Expand Down
37 changes: 29 additions & 8 deletions graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java
Expand Up @@ -16,10 +16,13 @@
*/
package org.graylog2.indexer;

import com.github.joschi.jadconfig.util.Duration;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.graylog2.indexer.indices.Indices;
import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob;
import org.graylog2.indexer.ranges.IndexRange;
import org.graylog2.indexer.ranges.IndexRangeService;
import org.graylog2.shared.system.activities.Activity;
import org.graylog2.shared.system.activities.ActivityWriter;
import org.graylog2.system.jobs.SystemJob;
Expand All @@ -34,7 +37,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -58,6 +63,8 @@ public class Deflector { // extends Ablenkblech
private final SystemJobManager systemJobManager;
private final ActivityWriter activityWriter;
private final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory;
private final IndexRangeService indexRangeService;
private final Duration deflectorIndexReadOnlyTimeout;
private final String indexPrefix;
private final String deflectorName;
private final Indices indices;
Expand All @@ -71,13 +78,17 @@ public Deflector(final SystemJobManager systemJobManager,
final ActivityWriter activityWriter,
final SetIndexReadOnlyJob.Factory indexReadOnlyJobFactory,
final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory,
final Indices indices) {
final Indices indices,
final IndexRangeService indexRangeService,
@Named("deflector_index_read_only_timeout") final Duration deflectorIndexReadOnlyTimeout) {
this.indexPrefix = indexPrefix;

this.systemJobManager = systemJobManager;
this.activityWriter = activityWriter;
this.indexReadOnlyJobFactory = indexReadOnlyJobFactory;
this.createNewSingleIndexRangeJobFactory = createNewSingleIndexRangeJobFactory;
this.indexRangeService = indexRangeService;
this.deflectorIndexReadOnlyTimeout = deflectorIndexReadOnlyTimeout;

this.deflectorName = buildName(indexPrefix);
this.indices = indices;
Expand Down Expand Up @@ -144,6 +155,7 @@ public void cycle() {
ClusterHealthStatus healthStatus = indices.waitForRecovery(newTarget);
LOG.debug("Health status of index <{}>: {}", newTarget, healthStatus);

addDeflectorIndexRange(newTarget);
LOG.info("Done!");

// Point deflector to new index.
Expand All @@ -156,29 +168,36 @@ public void cycle() {
activity.setMessage("Cycled deflector from <none> to <" + newTarget + ">");
} else {
// Re-pointing from existing old index to the new one.
LOG.debug("Now switching over deflector alias.");
pointTo(newTarget, oldTarget);
addSingleIndexRanges(oldTarget);

// perform these steps after a delay, so we don't race with indexing into the alias
// it can happen that an index request still writes to the old deflector target, while we cycled it above.
// setting the index to readOnly would result in ClusterBlockExceptions in the indexing request.
// waiting 30 seconds to perform the background task should completely get rid of these errors.
final SystemJob makeReadOnlyJob = indexReadOnlyJobFactory.create(oldTarget);
try {
systemJobManager.submitWithDelay(makeReadOnlyJob, 30, TimeUnit.SECONDS);
} catch (SystemJobConcurrencyException e) {
final SystemJobManager.ScheduleResult scheduleResult = systemJobManager.submitWithDelayForResult(makeReadOnlyJob, 30, TimeUnit.SECONDS);
if (scheduleResult != null) {
scheduleResult.getFuture().get(deflectorIndexReadOnlyTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the worst case this would block the thread for the duration of deflectorIndexReadOnlyTimeout (1 hour by default). What would happen if there is the need to do another cycle during that time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I have seen in the code, this would mean that another cycle would be done, which would be waiting at the same point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't feel good adding this "waiting on system job" logic in here.

Would it be enough to trigger an index range calculation job in the "make-read-only" job after it's done?

  • Do not call addSingleIndexRanges(oldTarget) in line 160
  • Modify the SetIndexReadOnlyJob to trigger a CreateNewSingleIndexRangeJob for the index once the read-only status is set

With this we wouldn't need the timeout logic and all the changes to the system jobs to return a future. We can also be sure that the index rage is correct because the index cannot be written anymore. Until the index is read-only, it will be included in the searches because it has the "deflector" index range.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coupling the job which sets indices read only to the index range calculation job sounds wrong to me, because they are not logically coupled and executing the first one should not need you to know (and be surprised about) that the second one is triggered implicitly too. The coupling should be done from the outside (as it is done here) via composition without any inherent coupling.

The logic you are describing is exactly what is happening in the PR, why it should be better to give up a separation of concerns to shift the waiting logic one level lower, while gaining nothing, is beyond me, tbh.

Copy link
Contributor

@hc4 hc4 Jun 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is at line 182.
Why should we wait for job finished?
Is it possible to schedule one job after another (sorry, don't knwo deeply Java)

Copy link
Contributor

@hc4 hc4 Jun 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only problem I see - you will wait here at least for 30 seconds (submit delay) and will block scheduler (single thread) for that time.
however, I'am not sure if it is real problem :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I've asked if the thread is blocked and what happened if a new deflector cycle was necessary while waiting for the last one. 😉

In the end, this would in the worst case consume all (executor) threads in SystemJobManager and block all subsequent deflector cycles (until some threads are freed again).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but from what I've seen this is already the case before this change. I am not sure if you are implying that there is a regression in this PR or that we should also change the current behavior (which I would perceive as scope creep).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduleResult.getFuture().get(deflectorIndexReadOnlyTimeout.toMilliseconds(), TimeUnit.MILLISECONDS) was introduced by this commit.
In matster there is no any waiting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coupling the job which sets indices read only to the index range calculation job sounds wrong to me, because they are not logically coupled and executing the first one should not need you to know (and be surprised about) that the second one is triggered implicitly too. The coupling should be done from the outside (as it is done here) via composition without any inherent coupling.

I agree that the coupling introduced by this wouldn't be nice. We could introduce a separate job class that takes care of both. (i.e. SetIndexReadOnlyAndCalculateRange) That would make it clear what the job is supposed to do.
I also agree that it's nicer to be able to compose smaller jobs to avoid having lots of specialized jobs, but in this case I would vote for having one to avoid the introduction of timeout logic in this class and to avoid the async/sync issue you described later.

Another option would be to extend the system jobs subsystem to be able to run another job (or more) after one has finished. This would make it possible to compose a chain of system jobs without having to use timeouts or block the calling thread.

The logic you are describing is exactly what is happening in the PR, why it should be better to give up a separation of concerns to shift the waiting logic one level lower, while gaining nothing, is beyond me, tbh.

I would happily give up a separation of concern when I can avoid using a timeout. 😉 The problem I see with using timeouts is that it's really hard to find a good default for them and that the user probably needs to tune them to make them work with their setup which makes using the product harder.

}
} catch (SystemJobConcurrencyException | InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Cannot set index <" + oldTarget + "> to read only. It won't be optimized.", e);
}
addSingleIndexRanges(oldTarget);
activity.setMessage("Cycled deflector from <" + oldTarget + "> to <" + newTarget + ">");
}

addSingleIndexRanges(newTarget);

LOG.info("Done!");

activityWriter.write(activity);
}

private void addDeflectorIndexRange(String newTarget) {
final IndexRange deflectorRange = indexRangeService.createForDeflector(newTarget);
indexRangeService.save(deflectorRange);
}

public int getNewestTargetNumber() throws NoTargetIndexException {
final Set<String> indexNames = indices.getIndexNamesAndAliases(getDeflectorWildcard()).keySet();

Expand Down Expand Up @@ -267,14 +286,16 @@ public void pointTo(final String newIndex) {
indices.cycleAlias(getName(), newIndex);
}

private void addSingleIndexRanges(String indexName) {
@Nullable
private SystemJobManager.ScheduleResult addSingleIndexRanges(String indexName) {
try {
systemJobManager.submit(createNewSingleIndexRangeJobFactory.create(this, indexName));
return systemJobManager.submitForResult(createNewSingleIndexRangeJobFactory.create(this, indexName));
} catch (SystemJobConcurrencyException e) {
final String msg = "Could not calculate index ranges for index " + indexName + " after cycling deflector: Maximum concurrency of job is reached.";
activityWriter.write(new Activity(msg, Deflector.class));
LOG.error(msg, e);
}
return null;
}

@Nullable
Expand Down
Expand Up @@ -106,17 +106,6 @@ public static Set<IndexRange> determineAffectedIndicesWithRanges(IndexRangeServi
indices.add(indexRange);
}

// Always include the deflector target
final String targetIndex = deflector.getCurrentActualTargetIndex();
if (targetIndex != null) {
try {
final IndexRange deflectorIndexRange = indexRangeService.get(targetIndex);
indices.add(deflectorIndexRange);
} catch (NotFoundException e) {
LOG.warn("Couldn't find latest deflector target index", e);
}
}

return indices.build();
}
}
Expand Up @@ -178,6 +178,11 @@ public IndexRange calculateRange(String index) {
throw new UnsupportedOperationException();
}

@Override
public IndexRange createForDeflector(String index) {
throw new UnsupportedOperationException();
}

@Override
public void save(IndexRange indexRange) {
throw new UnsupportedOperationException();
Expand Down
Expand Up @@ -31,4 +31,5 @@ public interface IndexRangeService {
void save(IndexRange indexRange);

IndexRange calculateRange(String index);
IndexRange createForDeflector(String index);
Copy link
Contributor

@joschi joschi Jun 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think createEmptyRange would be a better name as it's not inherently bound to the deflector alias (it's not even being used in the method implementations).

Maybe adding it as a default method (https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html) would make sense here as its implementation doesn't rely on any internals of the specific implementation of IndexRangeService.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea! 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turning out it's not as easy as it seems, as it currently relies on the choice of the value object in the service implementation. Using MongoIndexRange in a default implementation in the interface would smell broken to me. I will rename the method to a more generic name though.

}
Expand Up @@ -108,6 +108,11 @@ public IndexRange calculateRange(String index) {
throw new UnsupportedOperationException();
}

@Override
public IndexRange createForDeflector(String index) {
throw new UnsupportedOperationException();
}

public int delete(String index) {
return destroy(new BasicDBObject(FIELD_INDEX, index), COLLECTION_NAME);
}
Expand Down
Expand Up @@ -66,27 +66,27 @@ public MongoIndexRangeService(MongoConnection mongoConnection,
EventBus eventBus) {
this.indices = indices;
this.collection = JacksonDBCollection.wrap(
mongoConnection.getDatabase().getCollection(COLLECTION_NAME),
MongoIndexRange.class,
ObjectId.class,
objectMapperProvider.get());
mongoConnection.getDatabase().getCollection(COLLECTION_NAME),
MongoIndexRange.class,
ObjectId.class,
objectMapperProvider.get());

// This sucks. We need to bridge Elasticsearch's and our own Guice injector.
IndexChangeMonitor.setEventBus(eventBus);
eventBus.register(this);

collection.createIndex(new BasicDBObject(MongoIndexRange.FIELD_INDEX_NAME, 1));
collection.createIndex(BasicDBObjectBuilder.start()
.add(MongoIndexRange.FIELD_BEGIN, 1)
.add(MongoIndexRange.FIELD_END, 1)
.get());
.add(MongoIndexRange.FIELD_BEGIN, 1)
.add(MongoIndexRange.FIELD_END, 1)
.get());
}

@Override
public IndexRange get(String index) throws NotFoundException {
final DBQuery.Query query = DBQuery.and(
DBQuery.notExists("start"),
DBQuery.is(IndexRange.FIELD_INDEX_NAME, index));
DBQuery.notExists("start"),
DBQuery.is(IndexRange.FIELD_INDEX_NAME, index));
final MongoIndexRange indexRange = collection.findOne(query);
if (indexRange == null) {
throw new NotFoundException("Index range for index <" + index + "> not found.");
Expand All @@ -98,11 +98,18 @@ public IndexRange get(String index) throws NotFoundException {
@Override
public SortedSet<IndexRange> find(DateTime begin, DateTime end) {
final DBCursor<MongoIndexRange> indexRanges = collection.find(
DBQuery.or(
DBQuery.and(
DBQuery.notExists("start"), // "start" has been used by the old index ranges in MongoDB
DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, end.getMillis()),
DBQuery.greaterThanEquals(IndexRange.FIELD_END, begin.getMillis())
DBQuery.notExists("start"), // "start" has been used by the old index ranges in MongoDB
DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, end.getMillis()),
DBQuery.greaterThanEquals(IndexRange.FIELD_END, begin.getMillis())
),
DBQuery.and(
DBQuery.notExists("start"), // "start" has been used by the old index ranges in MongoDB
DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, 0L),
DBQuery.greaterThanEquals(IndexRange.FIELD_END, 0L)
)
)
);

return ImmutableSortedSet.copyOf(IndexRange.COMPARATOR, (Iterator<? extends IndexRange>) indexRanges);
Expand All @@ -125,6 +132,11 @@ public IndexRange calculateRange(String index) {
return MongoIndexRange.create(index, stats.min(), stats.max(), now, duration);
}

@Override
public IndexRange createForDeflector(String index) {
return MongoIndexRange.create(index, new DateTime(0L), new DateTime(0L), DateTime.now(), 0);
}

@Override
public void save(IndexRange indexRange) {
collection.remove(DBQuery.in(IndexRange.FIELD_INDEX_NAME, indexRange.indexName()));
Expand Down Expand Up @@ -158,10 +170,10 @@ public void handleIndexReopening(IndicesReopenedEvent event) {
indices.waitForRecovery(index);

final Retryer<IndexRange> retryer = RetryerBuilder.<IndexRange>newBuilder()
.retryIfException(input -> !(input instanceof IndexClosedException))
.withWaitStrategy(WaitStrategies.exponentialWait())
.withStopStrategy(StopStrategies.stopAfterDelay(5, TimeUnit.MINUTES))
.build();
.retryIfException(input -> !(input instanceof IndexClosedException))
.withWaitStrategy(WaitStrategies.exponentialWait())
.withStopStrategy(StopStrategies.stopAfterDelay(5, TimeUnit.MINUTES))
.build();

final IndexRange indexRange;
try {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class RebuildIndexRangesJob extends SystemJob {
Expand Down Expand Up @@ -85,7 +86,11 @@ public void execute() {
indicesToCalculate = indices.length;

Stopwatch sw = Stopwatch.createStarted();
final String deflectorIndexName = deflector.getName();
for (String index : indices) {
if (deflectorIndexName.equals(index)) {
continue;
}
if (cancelRequested) {
info("Stop requested. Not calculating next index range, not updating ranges.");
sw.stop();
Expand Down
Expand Up @@ -26,17 +26,39 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.validation.constraints.Null;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static com.codahale.metrics.MetricRegistry.name;

public class SystemJobManager {

public class ScheduleResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a static nested class or no nested class at all (as it's also used by other classes). It would also be suitable to be converted to an Auto Value class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to make it static? It does not access any members of the enclosing class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ You're right.

private final String jobId;
private final ScheduledFuture<?> future;

ScheduleResult(final String jobId, final ScheduledFuture<?> future) {
this.jobId = jobId;
this.future = future;
}

public String getJobId() {
return jobId;
}

public ScheduledFuture<?> getFuture() {
return future;
}
}

private static final Logger LOG = LoggerFactory.getLogger(SystemJobManager.class);
private static final int THREAD_POOL_SIZE = 15;

Expand All @@ -60,10 +82,18 @@ private ScheduledExecutorService executorService(final MetricRegistry metricRegi
}

public String submit(final SystemJob job) throws SystemJobConcurrencyException {
return submitWithDelay(job, 0, TimeUnit.SECONDS);
return submitForResult(job).getJobId();
}

public ScheduleResult submitForResult(final SystemJob job) throws SystemJobConcurrencyException {
return submitWithDelayForResult(job, 0, TimeUnit.SECONDS);
}

public String submitWithDelay(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException {
return submitWithDelayForResult(job, delay, timeUnit).getJobId();
}

public ScheduleResult submitWithDelayForResult(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException {
// for immediate jobs, check allowed concurrency right now
if (delay == 0) {
checkAllowedConcurrency(job);
Expand All @@ -74,7 +104,7 @@ public String submitWithDelay(final SystemJob job, final long delay, TimeUnit ti
job.setId(new UUID().toString());
jobs.put(job.getId(), job);

executor.schedule(new Runnable() {
final ScheduledFuture<?> future = executor.schedule(new Runnable() {
@Override
public void run() {
try {
Expand All @@ -89,7 +119,7 @@ public void run() {
x.stop();

final String msg = "SystemJob <" + job.getId() + "> [" + jobClass + "] finished in " + x.elapsed(
TimeUnit.MILLISECONDS) + "ms.";
TimeUnit.MILLISECONDS) + "ms.";
LOG.info(msg);
activityWriter.write(new Activity(msg, SystemJobManager.class));
} catch (SystemJobConcurrencyException ignored) {
Expand All @@ -102,7 +132,7 @@ public void run() {
}, delay, timeUnit);

LOG.info("Submitted SystemJob <{}> [{}]", job.getId(), jobClass);
return job.getId();
return new ScheduleResult(job.getId(), future);
}

protected void checkAllowedConcurrency(SystemJob job) throws SystemJobConcurrencyException {
Expand Down