Skip to content

Commit

Permalink
Merge pull request #1547 from Graylog2/issue-1533
Browse files Browse the repository at this point in the history
Set indices options to lenient for search queries
(cherry picked from commit 1dde511)
  • Loading branch information
bernd committed Nov 20, 2015
1 parent c697c5e commit fc8fd88
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 79 deletions.
Expand Up @@ -16,17 +16,23 @@
*/
package org.graylog2.indexer.esplugin;

import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.HashSet;
import java.util.Set;

import static org.elasticsearch.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -57,11 +63,47 @@ public void clusterChanged(ClusterChangedEvent event) {
}

if (eventBus != null) {
final List<String> indicesDeleted = event.indicesDeleted();
final Set<String> indicesDeleted = new HashSet<>(event.indicesDeleted());
if (!indicesDeleted.isEmpty()) {
eventBus.post(IndicesDeletedEvent.create(indicesDeleted));
}

final Set<String> indicesClosed = calculateClosedIndices(event.state(), event.previousState());
if (!indicesClosed.isEmpty()) {
eventBus.post(IndicesClosedEvent.create(indicesClosed));
}

final Set<String> indicesReopened = calculateReopenedIndices(event.state(), event.previousState());
if (!indicesReopened.isEmpty()) {
eventBus.post(IndicesReopenedEvent.create(indicesReopened));
}
}
}

private Set<String> calculateClosedIndices(ClusterState currentState, @Nullable ClusterState previousState) {
if (previousState == null || previousState.metaData() == currentState.metaData()) {
return Collections.emptySet();
}

final Set<String> currentClosedIndices = getClosedIndices(currentState.getMetaData());
final Set<String> previousClosedIndices = getClosedIndices(previousState.getMetaData());

return Sets.difference(currentClosedIndices, previousClosedIndices);
}

private Set<String> calculateReopenedIndices(ClusterState currentState, @Nullable ClusterState previousState) {
if (previousState == null || previousState.metaData() == currentState.metaData()) {
return Collections.emptySet();
}

final Set<String> currentClosedIndices = getClosedIndices(currentState.getMetaData());
final Set<String> previousClosedIndices = getClosedIndices(previousState.getMetaData());

return Sets.difference(previousClosedIndices, currentClosedIndices);
}

private HashSet<String> getClosedIndices(MetaData currentMetaData) {
return new HashSet<>(Arrays.asList(currentMetaData.concreteAllClosedIndices()));
}

@Override
Expand Down
@@ -0,0 +1,31 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.indexer.esplugin;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

@AutoValue
public abstract class IndicesClosedEvent {
public abstract Set<String> indices();

public static IndicesClosedEvent create(Set<String> indices) {
return new AutoValue_IndicesClosedEvent(ImmutableSet.copyOf(indices));
}
}
Expand Up @@ -17,15 +17,15 @@
package org.graylog2.indexer.esplugin;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import java.util.List;
import java.util.Set;

@AutoValue
public abstract class IndicesDeletedEvent {
public abstract List<String> indices();
public abstract Set<String> indices();

public static IndicesDeletedEvent create(List<String> indices) {
return new AutoValue_IndicesDeletedEvent(ImmutableList.copyOf(indices));
public static IndicesDeletedEvent create(Set<String> indices) {
return new AutoValue_IndicesDeletedEvent(ImmutableSet.copyOf(indices));
}
}
@@ -0,0 +1,31 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.indexer.esplugin;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

@AutoValue
public abstract class IndicesReopenedEvent {
public abstract Set<String> indices();

public static IndicesReopenedEvent create(Set<String> indices) {
return new AutoValue_IndicesReopenedEvent(ImmutableSet.copyOf(indices));
}
}
Expand Up @@ -17,15 +17,13 @@
package org.graylog2.indexer.ranges;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Stopwatch;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.primitives.Ints;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetRequestBuilder;
Expand All @@ -36,12 +34,9 @@
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.Deflector;
import org.graylog2.indexer.esplugin.IndicesDeletedEvent;
import org.graylog2.indexer.indices.Indices;
import org.graylog2.indexer.searches.TimestampStats;
import org.graylog2.metrics.CacheStatsSet;
import org.graylog2.shared.metrics.MetricUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,7 +46,6 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

Expand All @@ -63,18 +57,15 @@ public class EsIndexRangeService implements IndexRangeService {
private final LoadingCache<String, IndexRange> cache;
private final Client client;
private final Deflector deflector;
private final Indices indices;

@Inject
public EsIndexRangeService(Client client,
Deflector deflector,
Indices indices,
EventBus eventBus,
@ClusterEventBus EventBus clusterEventBus,
MetricRegistry metricRegistry) {
this.client = requireNonNull(client);
this.deflector = requireNonNull(deflector);
this.indices = requireNonNull(indices);

final CacheLoader<String, IndexRange> cacheLoader = new CacheLoader<String, IndexRange>() {
@Override
Expand Down Expand Up @@ -190,13 +181,7 @@ public SortedSet<IndexRange> findAll() {

@Override
public IndexRange calculateRange(String index) {
final Stopwatch sw = Stopwatch.createStarted();
final DateTime now = DateTime.now(DateTimeZone.UTC);
final TimestampStats stats = indices.timestampStatsOfIndex(index);
final int duration = Ints.saturatedCast(sw.stop().elapsed(TimeUnit.MILLISECONDS));

LOG.info("Calculated range of [{}] in [{}ms].", index, duration);
return EsIndexRange.create(index, stats.min(), stats.max(), now, duration);
throw new UnsupportedOperationException();
}

@Override
Expand Down
Expand Up @@ -16,6 +16,11 @@
*/
package org.graylog2.indexer.ranges;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.eventbus.AllowConcurrentEvents;
Expand All @@ -30,7 +35,9 @@
import org.graylog2.database.NotFoundException;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.esplugin.IndexChangeMonitor;
import org.graylog2.indexer.esplugin.IndicesClosedEvent;
import org.graylog2.indexer.esplugin.IndicesDeletedEvent;
import org.graylog2.indexer.esplugin.IndicesReopenedEvent;
import org.graylog2.indexer.indices.Indices;
import org.graylog2.indexer.searches.TimestampStats;
import org.joda.time.DateTime;
Expand All @@ -44,6 +51,8 @@
import javax.inject.Inject;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class MongoIndexRangeService implements IndexRangeService {
Expand Down Expand Up @@ -134,7 +143,51 @@ public void save(IndexRange indexRange) {
@AllowConcurrentEvents
public void handleIndexDeletion(IndicesDeletedEvent event) {
for (String index : event.indices()) {
LOG.debug("Index \"{}\" has been deleted. Removing index range.");
collection.remove(DBQuery.in(IndexRange.FIELD_INDEX_NAME, index));
}
}

@Subscribe
@AllowConcurrentEvents
public void handleIndexClosing(IndicesClosedEvent event) {
for (String index : event.indices()) {
LOG.debug("Index \"{}\" has been closed. Removing index range.");
collection.remove(DBQuery.in(IndexRange.FIELD_INDEX_NAME, index));
}
}

@Subscribe
@AllowConcurrentEvents
public void handleIndexReopening(IndicesReopenedEvent event) {
for (final String index : event.indices()) {
LOG.debug("Index \"{}\" has been reopened. Calculating index range.", index);

indices.waitForRecovery(index);

final Retryer<IndexRange> retryer = RetryerBuilder.<IndexRange>newBuilder()
.retryIfException()
.withWaitStrategy(WaitStrategies.exponentialWait())
.withStopStrategy(StopStrategies.stopAfterDelay(5, TimeUnit.MINUTES))
.build();

final IndexRange indexRange;
try {
indexRange = retryer.call(new Callable<IndexRange>() {
@Override
public IndexRange call() throws Exception {
return calculateRange(index);
}
});
} catch (ExecutionException e) {
LOG.error("Couldn't calculate index range for index \"" + index + "\"", e.getCause());
throw new RuntimeException("Couldn't calculate index range for index \"" + index + "\"", e);
} catch (RetryException e) {
LOG.error("Couldn't calculate index range for index \"" + index + "\"", e);
throw new RuntimeException("Couldn't calculate index range for index \"" + index + "\"", e);
}

save(indexRange);
}
}
}
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand Down Expand Up @@ -457,11 +458,11 @@ public HistogramResult histogram(String query, DateHistogramInterval interval, S
QueryStringQueryBuilder qs = queryStringQuery(query);
qs.allowLeadingWildcard(configuration.isAllowLeadingWildcardSearches());

SearchRequestBuilder srb = c.prepareSearch();
final Set<String> affectedIndices = IndexHelper.determineAffectedIndices(indexRangeService, deflector, range);
srb.setIndices(affectedIndices.toArray(new String[affectedIndices.size()]));
srb.setQuery(qs);
srb.addAggregation(builder);
final SearchRequestBuilder srb = c.prepareSearch(affectedIndices.toArray(new String[affectedIndices.size()]))
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(qs)
.addAggregation(builder);

final SearchRequest request = srb.request();
SearchResponse r = c.search(request).actionGet();
Expand Down Expand Up @@ -584,8 +585,8 @@ private SearchRequestBuilder standardSearchRequest(
query = "*";
}

SearchRequestBuilder srb = c.prepareSearch();
srb.setIndices(indices.toArray(new String[indices.size()]));
final SearchRequestBuilder srb = c.prepareSearch(indices.toArray(new String[indices.size()]))
.setIndicesOptions(IndicesOptions.lenientExpandOpen());

final QueryBuilder queryBuilder;

Expand Down
Expand Up @@ -32,17 +32,13 @@
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.indices.IndexStatistics;
import org.graylog2.indexer.indices.Indices;
import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob;
import org.graylog2.rest.models.system.indexer.responses.AllIndicesInfo;
import org.graylog2.rest.models.system.indexer.responses.ClosedIndices;
import org.graylog2.rest.models.system.indexer.responses.IndexInfo;
import org.graylog2.rest.models.system.indexer.responses.IndexStats;
import org.graylog2.rest.models.system.indexer.responses.ShardRouting;
import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.shared.security.RestPermissions;
import org.graylog2.system.jobs.SystemJob;
import org.graylog2.system.jobs.SystemJobConcurrencyException;
import org.graylog2.system.jobs.SystemJobManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,16 +65,12 @@
public class IndicesResource extends RestResource {
private static final Logger LOG = LoggerFactory.getLogger(IndicesResource.class);

@Inject
private CreateNewSingleIndexRangeJob.Factory rebuildIndexRangesJobFactory;
@Inject
private Indices indices;
@Inject
private Cluster cluster;
@Inject
private Deflector deflector;
@Inject
private SystemJobManager systemJobManager;

@GET
@Timed
Expand Down Expand Up @@ -195,16 +187,6 @@ public void reopen(@ApiParam(name = "index") @PathParam("index") String index) {
}

indices.reopenIndex(index);

// Trigger index ranges rebuild job.
final SystemJob rebuildJob = rebuildIndexRangesJobFactory.create(deflector, index);
try {
systemJobManager.submit(rebuildJob);
} catch (SystemJobConcurrencyException e) {
final String msg = "Concurrency level of this job reached: " + e.getMessage();
LOG.error(msg);
throw new ForbiddenException(msg);
}
}

@POST
Expand Down

0 comments on commit fc8fd88

Please sign in to comment.