Skip to content

Commit

Permalink
Add EsIndexRangeService
Browse files Browse the repository at this point in the history
  • Loading branch information
Jochen Schalanda committed Jun 30, 2015
1 parent c0744f4 commit 9c21d34
Show file tree
Hide file tree
Showing 19 changed files with 881 additions and 77 deletions.
Expand Up @@ -92,7 +92,7 @@ public static Set<String> determineAffectedIndices(IndexRangeService indexRangeS
Set<String> indices = Sets.newHashSet(); Set<String> indices = Sets.newHashSet();


for (IndexRange indexRange : indexRangeService.getFrom((int) (range.getFrom().getMillis() / 1000))) { for (IndexRange indexRange : indexRangeService.getFrom((int) (range.getFrom().getMillis() / 1000))) {
indices.add(indexRange.getIndexName()); indices.add(indexRange.indexName());
} }


// Always include the most recent index in some cases. // Always include the most recent index in some cases.
Expand Down
Expand Up @@ -66,8 +66,6 @@ public void execute() {
indexRangeService.destroy(indexName); indexRangeService.destroy(indexName);
indexRangeService.save(indexRange); indexRangeService.save(indexRange);
LOG.info("Created ranges for index {}.", indexName); LOG.info("Created ranges for index {}.", indexName);
} catch (ValidationException e) {
LOG.error("Unable to save index range for index {}: {}", indexName, e);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Exception during index range calculation for index {}: ", indexName, e); LOG.error("Exception during index range calculation for index {}: ", indexName, e);
} }
Expand Down
@@ -0,0 +1,246 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.indexer.ranges;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.primitives.Ints;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.graylog2.database.NotFoundException;
import org.graylog2.indexer.IndexMapping;
import org.graylog2.indexer.searches.Searches;
import org.graylog2.indexer.searches.TimestampStats;
import org.graylog2.plugin.Tools;
import org.graylog2.shared.system.activities.Activity;
import org.graylog2.shared.system.activities.ActivityWriter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.MoreObjects.firstNonNull;

public class EsIndexRangeService implements IndexRangeService {
private static final Logger LOG = LoggerFactory.getLogger(EsIndexRangeService.class);

private final Client client;
private final ActivityWriter activityWriter;
private final Searches searches;
private final ObjectMapper objectMapper;

@Inject
public EsIndexRangeService(Client client, ActivityWriter activityWriter, Searches searches, ObjectMapper objectMapper) {
this.client = client;
this.activityWriter = activityWriter;
this.searches = searches;
this.objectMapper = objectMapper;
}

@Override
public IndexRange get(String index) throws NotFoundException {
final GetRequest request = new GetRequestBuilder(client, index)
.setType(IndexMapping.TYPE_META)
.setId(index)
.request();

final GetResponse r;
try {
r = client.get(request).actionGet();
} catch (NoShardAvailableActionException e) {
throw new NotFoundException(e);
}

if (!r.isExists()) {
throw new NotFoundException("Index [" + index + "] not found.");
}

return parseSource(r.getIndex(), r.getSource());
}

@Nullable
private IndexRange parseSource(String index, Map<String, Object> fields) {
try {
return IndexRangeImpl.create(
index,
parseFromDateString((String) fields.get("begin")),
parseFromDateString((String) fields.get("end")),
parseFromDateString((String) fields.get("calculated_at")),
(int) fields.get("took_ms")
);
} catch (Exception e) {
return null;
}
}

private DateTime parseFromDateString(String s) {
return DateTime.parse(s);
}

@Override
public SortedSet<IndexRange> getFrom(int timestamp) {
return getFrom(new DateTime(timestamp * 1000L, DateTimeZone.UTC));
}

@Override
public SortedSet<IndexRange> getFrom(DateTime dateTime) {
final RangeQueryBuilder q = QueryBuilders.rangeQuery("begin").gte(dateTime.getMillis());
final SearchRequest request = client.prepareSearch()
.setTypes(IndexMapping.TYPE_META)
.setQuery(q)
.request();

final SearchResponse response = client.search(request).actionGet();
final ImmutableSortedSet.Builder<IndexRange> indexRanges = ImmutableSortedSet.orderedBy(IndexRange.COMPARATOR);
for (SearchHit searchHit : response.getHits()) {
final IndexRange indexRange = parseSource(searchHit.getIndex(), searchHit.getSource());
if (indexRange != null) {
indexRanges.add(indexRange);
}
}

return indexRanges.build();
}

@Override
public void destroy(String index) {
final DeleteRequest request = client.prepareDelete()
.setIndex(index)
.setId(index)
.setType(IndexMapping.TYPE_META)
.setRefresh(true)
.request();
final DeleteResponse response = client.delete(request).actionGet();

if (response.isFound()) {
String msg = "Removed range meta-information of [" + index + "]";
LOG.info(msg);
activityWriter.write(new Activity(msg, IndexRange.class));
} else {
LOG.warn("Couldn't find meta-information of index [{}]", index);
}
}

@Override
public IndexRange create(Map<String, Object> range) {
return new MongoIndexRange(range);
}

@Override
public void destroyAll() {
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
final SearchRequest searchRequest = client.prepareSearch()
.setTypes(IndexMapping.TYPE_META)
.setSearchType(SearchType.SCAN)
.setScroll(scroll)
.setQuery(QueryBuilders.matchAllQuery())
.request();
final SearchResponse searchResponse = client.search(searchRequest).actionGet();

final SearchScrollRequest scrollRequest = client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(scroll)
.request();
final SearchResponse scrollResponse = client.searchScroll(scrollRequest).actionGet();

final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for(SearchHit hit : scrollResponse.getHits().hits()) {
final DeleteRequest deleteRequest = client.prepareDelete(hit.index(), hit.type(), hit.id()).request();
bulkRequestBuilder.add(deleteRequest);
}

final BulkRequest bulkRequest = bulkRequestBuilder.request();
final BulkResponse bulkResponse = client.bulk(bulkRequest).actionGet();

if (bulkResponse.hasFailures()) {
LOG.warn("Couldn't remove meta-information of all indices");
LOG.debug("Bulk delete error details: {}", bulkResponse.buildFailureMessage());
} else {
String msg = "Removed range meta-information of all indices";
LOG.info(msg);
activityWriter.write(new Activity(msg, IndexRange.class));
}
}

@Override
public IndexRange calculateRange(String index) {
final Stopwatch sw = Stopwatch.createStarted();
final DateTime now = DateTime.now(DateTimeZone.UTC);
final TimestampStats stats = firstNonNull(
searches.timestampStatsOfIndex(index),
TimestampStats.create(new DateTime(0L, DateTimeZone.UTC), now, new DateTime(0L, DateTimeZone.UTC))
);
final int duration = Ints.saturatedCast(sw.stop().elapsed(TimeUnit.MILLISECONDS));

LOG.info("Calculated range of [{}] in [{}ms].", index, duration);
return IndexRangeImpl.create(index, stats.min(), stats.max(), now, duration);
}

@Override
public void save(IndexRange indexRange) {
final byte[] source;
try {
source = objectMapper.writeValueAsBytes(indexRange);
} catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}

final IndexRequest request = client.prepareIndex()
.setIndex(indexRange.indexName())
.setType(IndexMapping.TYPE_META)
.setId(indexRange.indexName())
.setRefresh(true)
.setSource(source)
.request();
final IndexResponse response = client.index(request).actionGet();
if (response.isCreated()) {
LOG.debug("Successfully saved index range {}", indexRange);
} else {
LOG.warn("Couldn't safe index range for index [{}]: {}", indexRange.indexName(), indexRange);
}
}
}
Expand Up @@ -16,19 +16,20 @@
*/ */
package org.graylog2.indexer.ranges; package org.graylog2.indexer.ranges;


import org.graylog2.plugin.database.Persisted;
import org.joda.time.DateTime; import org.joda.time.DateTime;


import java.util.Comparator; import java.util.Comparator;


public interface IndexRange extends Persisted { public interface IndexRange {
Comparator<IndexRange> COMPARATOR = new IndexRangeComparator(); Comparator<IndexRange> COMPARATOR = new IndexRangeComparator();


String getIndexName(); String indexName();


DateTime getCalculatedAt(); DateTime begin();


DateTime getStart(); DateTime end();


int getCalculationTookMs(); DateTime calculatedAt();

int calculationDuration();
} }
Expand Up @@ -24,6 +24,6 @@ public class IndexRangeComparator implements Comparator<IndexRange> {
*/ */
@Override @Override
public int compare(IndexRange o1, IndexRange o2) { public int compare(IndexRange o1, IndexRange o2) {
return o2.getStart().compareTo(o1.getStart()); return o2.end().compareTo(o1.end());
} }
} }
@@ -0,0 +1,50 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.indexer.ranges;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import org.joda.time.DateTime;

@AutoValue
@JsonAutoDetect
public abstract class IndexRangeImpl implements IndexRange {
@JsonProperty
public abstract String indexName();

@JsonProperty
public abstract DateTime begin();

@JsonProperty
public abstract DateTime end();

@JsonProperty
public abstract DateTime calculatedAt();

@JsonProperty("took_ms")
public abstract int calculationDuration();

public static IndexRange create(String indexName,
DateTime begin,
DateTime end,
DateTime calculatedAt,
int calculationDuration) {
return new AutoValue_IndexRangeImpl(indexName, begin, end, calculatedAt, calculationDuration);
}
}
Expand Up @@ -26,7 +26,7 @@
import java.util.Map; import java.util.Map;
import java.util.SortedSet; import java.util.SortedSet;


public interface IndexRangeService extends PersistedService { public interface IndexRangeService {
IndexRange get(String index) throws NotFoundException; IndexRange get(String index) throws NotFoundException;


SortedSet<IndexRange> getFrom(int timestamp); SortedSet<IndexRange> getFrom(int timestamp);
Expand All @@ -37,7 +37,7 @@ public interface IndexRangeService extends PersistedService {


IndexRange create(Map<String, Object> range); IndexRange create(Map<String, Object> range);


void save(IndexRange indexRange) throws ValidationException; void save(IndexRange indexRange);


void destroyAll(); void destroyAll();


Expand Down

0 comments on commit 9c21d34

Please sign in to comment.