Skip to content

Commit

Permalink
Add (configurable) cache to EsIndexRangeService
Browse files Browse the repository at this point in the history
  • Loading branch information
Jochen Schalanda committed Aug 26, 2015
1 parent 95cb423 commit a4df23a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 24 deletions.
Expand Up @@ -123,6 +123,9 @@ public class ElasticsearchConfiguration {
@Parameter(value = "elasticsearch_request_timeout", validator = PositiveDurationValidator.class) @Parameter(value = "elasticsearch_request_timeout", validator = PositiveDurationValidator.class)
private Duration requestTimeout = Duration.minutes(1L); private Duration requestTimeout = Duration.minutes(1L);


@Parameter(value = "elasticsearch_index_range_expiration", validator = PositiveDurationValidator.class)
private Duration indexRangeExpiration = Duration.minutes(1L);

public String getClusterName() { public String getClusterName() {
return clusterName; return clusterName;
} }
Expand Down Expand Up @@ -254,4 +257,8 @@ public String getPathData() {
public Duration getRequestTimeout() { public Duration getRequestTimeout() {
return requestTimeout; return requestTimeout;
} }

public Duration getIndexRangeExpiration() {
return indexRangeExpiration;
}
} }
Expand Up @@ -18,19 +18,20 @@


import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.joschi.jadconfig.util.Duration;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.ImmutableSortedSet;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
Expand Down Expand Up @@ -59,29 +60,64 @@


import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named;
import java.util.Map; import java.util.Map;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


public class EsIndexRangeService implements IndexRangeService { public class EsIndexRangeService implements IndexRangeService {
private static final Logger LOG = LoggerFactory.getLogger(EsIndexRangeService.class); private static final Logger LOG = LoggerFactory.getLogger(EsIndexRangeService.class);


private final LoadingCache<String, IndexRange> cache;
private final Client client; private final Client client;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final Indices indices; private final Indices indices;
private final Deflector deflector; private final Deflector deflector;


@Inject @Inject
public EsIndexRangeService(Client client, ObjectMapper objectMapper, Indices indices, Deflector deflector) { public EsIndexRangeService(Client client,
ObjectMapper objectMapper,
Indices indices,
Deflector deflector,
@Named("elasticsearch_index_range_expiration") Duration indexRangeExpiration) {
this.client = client; this.client = client;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.indices = indices; this.indices = indices;
this.deflector = deflector; this.deflector = deflector;

final CacheLoader<String, IndexRange> cacheLoader = new CacheLoader<String, IndexRange>() {
@Override
public IndexRange load(String indexName) throws Exception {
final IndexRange indexRange = loadIndexRange(indexName);

if (indexRange == null) {
throw new NotFoundException("Couldn't load index range for index " + indexName);
}

return indexRange;
}
};
this.cache = CacheBuilder.<String, IndexRange>newBuilder()
.expireAfterWrite(indexRangeExpiration.getQuantity(), indexRangeExpiration.getUnit())
.build(cacheLoader);
} }


@Override @Override
@Nullable
public IndexRange get(String index) throws NotFoundException { public IndexRange get(String index) throws NotFoundException {
try {
return cache.get(index);
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
if (cause instanceof NotFoundException) {
throw (NotFoundException) cause;
} else {
throw new NotFoundException(e.getCause());
}
}
}

private IndexRange loadIndexRange(String index) throws NotFoundException {
final GetRequest request = new GetRequestBuilder(client, index) final GetRequest request = new GetRequestBuilder(client, index)
.setType(IndexMapping.TYPE_INDEX_RANGE) .setType(IndexMapping.TYPE_INDEX_RANGE)
.setId(index) .setId(index)
Expand All @@ -98,7 +134,12 @@ public IndexRange get(String index) throws NotFoundException {
throw new NotFoundException("Index [" + index + "] not found."); throw new NotFoundException("Index [" + index + "] not found.");
} }


return parseSource(r.getIndex(), r.getSource()); final IndexRange indexRange = parseSource(r.getIndex(), r.getSource());
if (indexRange == null) {
throw new NotFoundException("Index [" + index + "] not found.");
}

return indexRange;
} }


@Nullable @Nullable
Expand Down Expand Up @@ -135,22 +176,12 @@ public SortedSet<IndexRange> find(DateTime begin, DateTime end) {


@Override @Override
public SortedSet<IndexRange> findAll() { public SortedSet<IndexRange> findAll() {
final MultiGetRequestBuilder requestBuilder = client.prepareMultiGet();
for (String index : deflector.getAllDeflectorIndexNames()) {
requestBuilder.add(index, IndexMapping.TYPE_INDEX_RANGE, index);
}

final MultiGetResponse response = client.multiGet(requestBuilder.request()).actionGet();
final ImmutableSortedSet.Builder<IndexRange> indexRanges = ImmutableSortedSet.orderedBy(IndexRange.COMPARATOR); final ImmutableSortedSet.Builder<IndexRange> indexRanges = ImmutableSortedSet.orderedBy(IndexRange.COMPARATOR);
for (MultiGetItemResponse itemResponse : response) { for (String index : deflector.getAllDeflectorIndexNames()) {
if (itemResponse.getFailure() != null) { try {
LOG.warn("Couldn't get index range of index <{}>. Reason:", itemResponse.getIndex(), itemResponse.getFailure().getMessage()); indexRanges.add(cache.get(index));
continue; } catch (ExecutionException e) {
} LOG.warn("Couldn't load index range for index " + index, e.getCause());

final IndexRange indexRange = parseSource(itemResponse.getIndex(), itemResponse.getResponse().getSource());
if (indexRange != null) {
indexRanges.add(indexRange);
} }
} }


Expand Down Expand Up @@ -244,5 +275,7 @@ public void save(IndexRange indexRange) {
} else { } else {
LOG.debug("Successfully updated index range: {}", indexRange); LOG.debug("Successfully updated index range: {}", indexRange);
} }

cache.put(indexName, indexRange);
} }
} }
Expand Up @@ -16,6 +16,8 @@
*/ */
package org.graylog2.indexer.ranges; package org.graylog2.indexer.ranges;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.joschi.jadconfig.util.Duration;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.lordofthejars.nosqlunit.annotation.UsingDataSet; import com.lordofthejars.nosqlunit.annotation.UsingDataSet;
import com.lordofthejars.nosqlunit.core.LoadStrategyEnum; import com.lordofthejars.nosqlunit.core.LoadStrategyEnum;
Expand Down Expand Up @@ -68,10 +70,10 @@ public String getIndexPrefix() {
@Rule @Rule
public ElasticsearchRule elasticsearchRule; public ElasticsearchRule elasticsearchRule;


private final ObjectMapper objectMapper = new ObjectMapperProvider().get();
@Inject @Inject
private Client client; private Client client;
private Indices indices; private Indices indices;
private Deflector deflector;


private EsIndexRangeService indexRangeService; private EsIndexRangeService indexRangeService;


Expand All @@ -83,8 +85,8 @@ public EsIndexRangeServiceTest() {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
indices = new Indices(client, ELASTICSEARCH_CONFIGURATION, new IndexMapping(client)); indices = new Indices(client, ELASTICSEARCH_CONFIGURATION, new IndexMapping(client));
deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION, new NullActivityWriter(), null, null, null, indices); final Deflector deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION, new NullActivityWriter(), null, null, null, indices);
indexRangeService = new EsIndexRangeService(client, new ObjectMapperProvider().get(), indices, deflector); indexRangeService = new EsIndexRangeService(client, objectMapper, indices, deflector, Duration.minutes(1L));
} }


@Test @Test
Expand Down

0 comments on commit a4df23a

Please sign in to comment.