Skip to content

Commit

Permalink
Refactor RebuildIndexRangesJob#calculateRange()
Browse files Browse the repository at this point in the history
  • Loading branch information
Jochen Schalanda committed Jun 3, 2015
1 parent 7be542e commit 26bf10a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 35 deletions.
Expand Up @@ -17,22 +17,20 @@
package org.graylog2.indexer.ranges; package org.graylog2.indexer.ranges;


import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject; import com.google.inject.assistedinject.AssistedInject;
import org.elasticsearch.search.SearchHit;
import org.graylog2.indexer.Deflector; import org.graylog2.indexer.Deflector;
import org.graylog2.indexer.EmptyIndexException; import org.graylog2.indexer.EmptyIndexException;
import org.graylog2.indexer.searches.Searches; import org.graylog2.indexer.searches.Searches;
import org.graylog2.plugin.Tools; import org.graylog2.plugin.Tools;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.shared.system.activities.Activity; import org.graylog2.shared.system.activities.Activity;
import org.graylog2.shared.system.activities.ActivityWriter; import org.graylog2.shared.system.activities.ActivityWriter;
import org.graylog2.system.jobs.SystemJob; import org.graylog2.system.jobs.SystemJob;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -42,7 +40,7 @@


public class RebuildIndexRangesJob extends SystemJob { public class RebuildIndexRangesJob extends SystemJob {
public interface Factory { public interface Factory {
public RebuildIndexRangesJob create(Deflector deflector); RebuildIndexRangesJob create(Deflector deflector);
} }


private static final Logger LOG = LoggerFactory.getLogger(RebuildIndexRangesJob.class); private static final Logger LOG = LoggerFactory.getLogger(RebuildIndexRangesJob.class);
Expand Down Expand Up @@ -81,7 +79,7 @@ public int getProgress() {
} }


// lolwtfbbqcasting // lolwtfbbqcasting
return (int) Math.floor(((float) indicesCalculated / (float) indicesToCalculate)*100); return (int) Math.floor(((float) indicesCalculated / (float) indicesToCalculate) * 100);
} }


@Override @Override
Expand All @@ -102,7 +100,7 @@ public void execute() {
indicesToCalculate = indices.length; indicesToCalculate = indices.length;


Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
for(String index : indices) { for (String index : indices) {
if (cancelRequested) { if (cancelRequested) {
info("Stop requested. Not calculating next index range, not updating ranges."); info("Stop requested. Not calculating next index range, not updating ranges.");
sw.stop(); sw.stop();
Expand Down Expand Up @@ -143,39 +141,24 @@ protected Map<String, Object> getDeflectorIndexRange(String index) {
return deflectorIndexRange; return deflectorIndexRange;
} }


private static int getTimestampOfMessage(SearchHit msg) {
Object field = msg.getSource().get("timestamp");
if (field == null) {
throw new RuntimeException("Document has no field timestamp.");
}

DateTimeFormatter formatter = DateTimeFormat.forPattern(Tools.ES_DATE_FORMAT).withZoneUTC();
DateTime dt = formatter.parseDateTime(field.toString());

return (int) (dt.getMillis() / 1000);
}


protected Map<String, Object> calculateRange(String index) throws EmptyIndexException { protected Map<String, Object> calculateRange(String index) throws EmptyIndexException {
Map<String, Object> range = Maps.newHashMap(); final Stopwatch x = Stopwatch.createStarted();

final DateTime timestamp = searches.findOldestMessageTimestampOfIndex(index);
Stopwatch x = Stopwatch.createStarted(); if (timestamp == null) {
SearchHit doc = searches.firstOfIndex(index);
if (doc == null || doc.isSourceEmpty()) {
x.stop(); x.stop();
throw new EmptyIndexException(); throw new EmptyIndexException();
} }


int rangeStart = getTimestampOfMessage(doc); final int rangeEnd = Ints.saturatedCast(timestamp.getMillis() / 1000L);
int took = (int) x.stop().elapsed(TimeUnit.MILLISECONDS); final int took = Ints.saturatedCast(x.stop().elapsed(TimeUnit.MILLISECONDS));

range.put("index", index);
range.put("start", rangeStart);
range.put("calculated_at", Tools.getUTCTimestamp());
range.put("took_ms", took);


LOG.info("Calculated range of [{}] in [{}ms].", index, took); LOG.info("Calculated range of [{}] in [{}ms].", index, took);
return range;
return ImmutableMap.<String, Object>of(
"index", index,
"start", rangeEnd, // FIXME The name of the attribute is massively misleading and should be rectified some time
"calculated_at", Tools.getUTCTimestamp(),
"took_ms", took);
} }


private void updateCollection(List<Map<String, Object>> ranges) { private void updateCollection(List<Map<String, Object>> ranges) {
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.lordofthejars.nosqlunit.elasticsearch.ElasticsearchRule; import com.lordofthejars.nosqlunit.elasticsearch.ElasticsearchRule;
import com.lordofthejars.nosqlunit.elasticsearch.EmbeddedElasticsearch; import com.lordofthejars.nosqlunit.elasticsearch.EmbeddedElasticsearch;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.indices.IndexMissingException;
import org.graylog2.Configuration; import org.graylog2.Configuration;
import org.graylog2.indexer.Deflector; import org.graylog2.indexer.Deflector;
import org.graylog2.indexer.EmptyIndexException; import org.graylog2.indexer.EmptyIndexException;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void testCalculateRange() throws Exception {
assertThat(range.get("start")).isEqualTo(Ints.saturatedCast(new DateTime(2015, 1, 1, 12, 0, DateTimeZone.UTC).getMillis() / 1000L)); assertThat(range.get("start")).isEqualTo(Ints.saturatedCast(new DateTime(2015, 1, 1, 12, 0, DateTimeZone.UTC).getMillis() / 1000L));
} }


@Test(expected = IndexMissingException.class) @Test(expected = EmptyIndexException.class)
public void testCalculateRangeWithNonExistingIndex() throws Exception { public void testCalculateRangeWithNonExistingIndex() throws Exception {
rebuildIndexRangesJob.calculateRange("does-not-exist"); rebuildIndexRangesJob.calculateRange("does-not-exist");
} }
Expand Down

0 comments on commit 26bf10a

Please sign in to comment.