From d571f4e551736c464fb9e4476a1beb9fc09101aa Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Tue, 24 May 2016 12:59:39 +0200 Subject: [PATCH 01/18] Returning job id and future instead of just id when scheduling a job. --- .../system/jobs/SystemJobManager.java | 30 +++++++++++++++---- .../system/jobs/SystemJobManagerTest.java | 4 +-- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java b/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java index f78c2fdb8865..ebc853c48b6b 100644 --- a/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java +++ b/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java @@ -31,12 +31,32 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static com.codahale.metrics.MetricRegistry.name; public class SystemJobManager { + + public class ScheduleResult { + private final String jobId; + private final ScheduledFuture future; + + ScheduleResult(final String jobId, final ScheduledFuture future) { + this.jobId = jobId; + this.future = future; + } + + public String getJobId() { + return jobId; + } + + public ScheduledFuture getFuture() { + return future; + } + } + private static final Logger LOG = LoggerFactory.getLogger(SystemJobManager.class); private static final int THREAD_POOL_SIZE = 15; @@ -59,11 +79,11 @@ private ScheduledExecutorService executorService(final MetricRegistry metricRegi name(this.getClass(), "executor-service")); } - public String submit(final SystemJob job) throws SystemJobConcurrencyException { + public ScheduleResult submit(final SystemJob job) throws SystemJobConcurrencyException { return submitWithDelay(job, 0, TimeUnit.SECONDS); } - public String submitWithDelay(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException { + public ScheduleResult submitWithDelay(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException { // for immediate jobs, check allowed concurrency right now if (delay == 0) { checkAllowedConcurrency(job); @@ -74,7 +94,7 @@ public String submitWithDelay(final SystemJob job, final long delay, TimeUnit ti job.setId(new UUID().toString()); jobs.put(job.getId(), job); - executor.schedule(new Runnable() { + final ScheduledFuture future = executor.schedule(new Runnable() { @Override public void run() { try { @@ -89,7 +109,7 @@ public void run() { x.stop(); final String msg = "SystemJob <" + job.getId() + "> [" + jobClass + "] finished in " + x.elapsed( - TimeUnit.MILLISECONDS) + "ms."; + TimeUnit.MILLISECONDS) + "ms."; LOG.info(msg); activityWriter.write(new Activity(msg, SystemJobManager.class)); } catch (SystemJobConcurrencyException ignored) { @@ -102,7 +122,7 @@ public void run() { }, delay, timeUnit); LOG.info("Submitted SystemJob <{}> [{}]", job.getId(), jobClass); - return job.getId(); + return new ScheduleResult(job.getId(), future); } protected void checkAllowedConcurrency(SystemJob job) throws SystemJobConcurrencyException { diff --git a/graylog2-server/src/test/java/org/graylog2/system/jobs/SystemJobManagerTest.java b/graylog2-server/src/test/java/org/graylog2/system/jobs/SystemJobManagerTest.java index 4d937d6c2af6..54bec4b29a4f 100644 --- a/graylog2-server/src/test/java/org/graylog2/system/jobs/SystemJobManagerTest.java +++ b/graylog2-server/src/test/java/org/graylog2/system/jobs/SystemJobManagerTest.java @@ -41,8 +41,8 @@ public void testGetRunningJobs() throws Exception { LongRunningJob job1 = new LongRunningJob(1); LongRunningJob job2 = new LongRunningJob(1); - String jobID1 = manager.submit(job1); - String jobID2 = manager.submit(job2); + String jobID1 = manager.submit(job1).getJobId(); + String jobID2 = manager.submit(job2).getJobId(); assertEquals(2, manager.getRunningJobs().size()); assertTrue(manager.getRunningJobs().containsValue(job1)); From c74d5c6b5b6f206be8c52c9f2955f6f6aa50496c Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Tue, 24 May 2016 13:00:18 +0200 Subject: [PATCH 02/18] Waiting for index range calculation before switching deflector alias. Fixes #2264. --- .../main/java/org/graylog2/Configuration.java | 3 ++ .../java/org/graylog2/indexer/Deflector.java | 29 ++++++++++++++++--- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/Configuration.java b/graylog2-server/src/main/java/org/graylog2/Configuration.java index 49ba839e1602..1815d1887148 100644 --- a/graylog2-server/src/main/java/org/graylog2/Configuration.java +++ b/graylog2-server/src/main/java/org/graylog2/Configuration.java @@ -156,6 +156,9 @@ public class Configuration extends BaseConfiguration { @Parameter(value = "index_ranges_cleanup_interval", validator = PositiveDurationValidator.class) private Duration indexRangesCleanupInterval = Duration.hours(1L); + @Parameter(value = "deflector_index_range_calculation_timeout", validator = PositiveDurationValidator.class) + private Duration deflectorIndexRangeCalculationTimeout = Duration.hours(1L); + @Parameter(value = "trusted_proxies", converter = IPSubnetConverter.class) private Set trustedProxies = Collections.emptySet(); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java index 107f2e354711..b7f2987a72b0 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java @@ -31,10 +31,13 @@ import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -58,6 +61,7 @@ public class Deflector { // extends Ablenkblech private final SystemJobManager systemJobManager; private final ActivityWriter activityWriter; private final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory; + private final Duration deflectorIndexRangeCalculationTimeout; private final String indexPrefix; private final String deflectorName; private final Indices indices; @@ -71,13 +75,15 @@ public Deflector(final SystemJobManager systemJobManager, final ActivityWriter activityWriter, final SetIndexReadOnlyJob.Factory indexReadOnlyJobFactory, final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory, - final Indices indices) { + final Indices indices, + @Named("deflector_index_range_calculation_timeout") final Duration deflectorIndexRangeCalculationTimeout) { this.indexPrefix = indexPrefix; this.systemJobManager = systemJobManager; this.activityWriter = activityWriter; this.indexReadOnlyJobFactory = indexReadOnlyJobFactory; this.createNewSingleIndexRangeJobFactory = createNewSingleIndexRangeJobFactory; + this.deflectorIndexRangeCalculationTimeout = deflectorIndexRangeCalculationTimeout; this.deflectorName = buildName(indexPrefix); this.indices = indices; @@ -155,9 +161,22 @@ public void cycle() { pointTo(newTarget); activity.setMessage("Cycled deflector from to <" + newTarget + ">"); } else { + final SystemJobManager.ScheduleResult scheduleResult = addSingleIndexRanges(oldTarget); + if (scheduleResult != null) { + try { + LOG.debug("Waiting for index calculation to complete"); + scheduleResult.getFuture().get(deflectorIndexRangeCalculationTimeout.toMillis(), TimeUnit.MILLISECONDS); + LOG.debug("Completed!"); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("Unable to calculate index range of index which is to be cycled: ", e); + } + } else { + LOG.warn("Unable to schedule job calculating index range of index which is to be cycled."); + } + // Re-pointing from existing old index to the new one. + LOG.debug("Now switching over deflector alias."); pointTo(newTarget, oldTarget); - addSingleIndexRanges(oldTarget); // perform these steps after a delay, so we don't race with indexing into the alias // it can happen that an index request still writes to the old deflector target, while we cycled it above. @@ -267,14 +286,16 @@ public void pointTo(final String newIndex) { indices.cycleAlias(getName(), newIndex); } - private void addSingleIndexRanges(String indexName) { + @Nullable + private SystemJobManager.ScheduleResult addSingleIndexRanges(String indexName) { try { - systemJobManager.submit(createNewSingleIndexRangeJobFactory.create(this, indexName)); + return systemJobManager.submit(createNewSingleIndexRangeJobFactory.create(this, indexName)); } catch (SystemJobConcurrencyException e) { final String msg = "Could not calculate index ranges for index " + indexName + " after cycling deflector: Maximum concurrency of job is reached."; activityWriter.write(new Activity(msg, Deflector.class)); LOG.error(msg, e); } + return null; } @Nullable From 8af98124fe5412abee2f59a75b3b4f06230dbc69 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Tue, 24 May 2016 13:45:13 +0200 Subject: [PATCH 03/18] Adapt constructor usage in deflector tests. --- .../org/graylog2/indexer/DeflectorTest.java | 45 ++++++++++--------- .../ranges/EsIndexRangeServiceTest.java | 3 +- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java index a0f0695aad9c..431b90106c7f 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java @@ -31,6 +31,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -46,6 +47,8 @@ @RunWith(MockitoJUnitRunner.class) public class DeflectorTest { + private static final Duration deflectorIndexRangeCalculationTimeout = Duration.ofHours(1L); + @Mock private SystemJobManager systemJobManager; @Mock @@ -60,7 +63,7 @@ public class DeflectorTest { @Before public void setUp() { - deflector = new Deflector(systemJobManager, "graylog", activityWriter, indexReadOnlyJobFactory, singleIndexRangeJobFactory, indices); + deflector = new Deflector(systemJobManager, "graylog", activityWriter, indexReadOnlyJobFactory, singleIndexRangeJobFactory, indices, deflectorIndexRangeCalculationTimeout); } @Test @@ -167,11 +170,12 @@ public void getNewestTargetNumber() throws NoTargetIndexException { when(indices.getIndexNamesAndAliases(anyString())).thenReturn(indexNameAliases); final Deflector deflector = new Deflector(systemJobManager, - "graylog", - activityWriter, - indexReadOnlyJobFactory, - singleIndexRangeJobFactory, - indices); + "graylog", + activityWriter, + indexReadOnlyJobFactory, + singleIndexRangeJobFactory, + indices, + deflectorIndexRangeCalculationTimeout); final int number = deflector.getNewestTargetNumber(); assertEquals(3, number); @@ -189,15 +193,16 @@ public void getAllGraylogIndexNames() { when(indices.getIndexNamesAndAliases(anyString())).thenReturn(indexNameAliases); final Deflector deflector = new Deflector(systemJobManager, - "graylog", - activityWriter, - indexReadOnlyJobFactory, - singleIndexRangeJobFactory, - indices); + "graylog", + activityWriter, + indexReadOnlyJobFactory, + singleIndexRangeJobFactory, + indices, + deflectorIndexRangeCalculationTimeout); final String[] allGraylogIndexNames = deflector.getAllGraylogIndexNames(); assertThat(allGraylogIndexNames) - .containsExactlyInAnyOrder("graylog_1", "graylog_2", "graylog_3", "graylog_4_restored_archive", "graylog_5"); + .containsExactlyInAnyOrder("graylog_1", "graylog_2", "graylog_3", "graylog_4_restored_archive", "graylog_5"); } @Test @@ -212,20 +217,18 @@ public void getAllGraylogDeflectorIndices() { when(indices.getIndexNamesAndAliases(anyString())).thenReturn(indexNameAliases); final Deflector deflector = new Deflector(systemJobManager, - "graylog", - activityWriter, - indexReadOnlyJobFactory, - singleIndexRangeJobFactory, - indices); + "graylog", + activityWriter, + indexReadOnlyJobFactory, + singleIndexRangeJobFactory, + indices, + deflectorIndexRangeCalculationTimeout); final Map> deflectorIndices = deflector.getAllGraylogDeflectorIndices(); assertThat(deflectorIndices).isNotNull(); assertThat(deflectorIndices).isNotEmpty(); assertThat(deflectorIndices.keySet()) - .containsExactlyInAnyOrder("graylog_1", "graylog_2", "graylog_3", "graylog_5"); + .containsExactlyInAnyOrder("graylog_1", "graylog_2", "graylog_3", "graylog_5"); } - - - } diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java index 7d994546fa6a..0f75ed339b9f 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java @@ -44,6 +44,7 @@ import org.mockito.runners.MockitoJUnitRunner; import javax.inject.Inject; +import java.time.Duration; import java.util.Set; import java.util.SortedSet; @@ -84,7 +85,7 @@ public EsIndexRangeServiceTest() { public void setUp() throws Exception { final Messages messages = new Messages(client, ELASTICSEARCH_CONFIGURATION, new MetricRegistry()); indices = new Indices(client, ELASTICSEARCH_CONFIGURATION, new IndexMapping(), messages); - final Deflector deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION.getIndexPrefix(), new NullActivityWriter(), null, null, indices); + final Deflector deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION.getIndexPrefix(), new NullActivityWriter(), null, null, indices, Duration.ofHours(1L)); indexRangeService = new EsIndexRangeService(client, deflector, localEventBus, new MetricRegistry()); } From 32846cfcf6b4cdbd6d6e5a7f19e297e3322f123b Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Tue, 24 May 2016 14:41:28 +0200 Subject: [PATCH 04/18] Fix usages of wrong Duration classes. --- .../src/main/java/org/graylog2/indexer/Deflector.java | 4 ++-- .../src/test/java/org/graylog2/indexer/DeflectorTest.java | 4 ++-- .../org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java index b7f2987a72b0..20fb6d2ee347 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java @@ -16,6 +16,7 @@ */ package org.graylog2.indexer; +import com.github.joschi.jadconfig.util.Duration; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.indices.InvalidAliasNameException; import org.graylog2.indexer.indices.Indices; @@ -31,7 +32,6 @@ import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Set; @@ -165,7 +165,7 @@ public void cycle() { if (scheduleResult != null) { try { LOG.debug("Waiting for index calculation to complete"); - scheduleResult.getFuture().get(deflectorIndexRangeCalculationTimeout.toMillis(), TimeUnit.MILLISECONDS); + scheduleResult.getFuture().get(deflectorIndexRangeCalculationTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); LOG.debug("Completed!"); } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.warn("Unable to calculate index range of index which is to be cycled: ", e); diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java index 431b90106c7f..cd9177eb19f2 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java @@ -20,6 +20,7 @@ */ package org.graylog2.indexer; +import com.github.joschi.jadconfig.util.Duration; import com.google.common.collect.Maps; import org.graylog2.indexer.indices.Indices; import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob; @@ -31,7 +32,6 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -47,7 +47,7 @@ @RunWith(MockitoJUnitRunner.class) public class DeflectorTest { - private static final Duration deflectorIndexRangeCalculationTimeout = Duration.ofHours(1L); + private static final Duration deflectorIndexRangeCalculationTimeout = Duration.hours(1L); @Mock private SystemJobManager systemJobManager; diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java index 0f75ed339b9f..a6f56e880e32 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java @@ -17,6 +17,7 @@ package org.graylog2.indexer.ranges; import com.codahale.metrics.MetricRegistry; +import com.github.joschi.jadconfig.util.Duration; import com.github.joschi.nosqlunit.elasticsearch2.ElasticsearchRule; import com.github.joschi.nosqlunit.elasticsearch2.EmbeddedElasticsearch; import com.google.common.collect.ImmutableSet; @@ -44,7 +45,6 @@ import org.mockito.runners.MockitoJUnitRunner; import javax.inject.Inject; -import java.time.Duration; import java.util.Set; import java.util.SortedSet; @@ -85,7 +85,7 @@ public EsIndexRangeServiceTest() { public void setUp() throws Exception { final Messages messages = new Messages(client, ELASTICSEARCH_CONFIGURATION, new MetricRegistry()); indices = new Indices(client, ELASTICSEARCH_CONFIGURATION, new IndexMapping(), messages); - final Deflector deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION.getIndexPrefix(), new NullActivityWriter(), null, null, indices, Duration.ofHours(1L)); + final Deflector deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION.getIndexPrefix(), new NullActivityWriter(), null, null, indices, Duration.hours(1L)); indexRangeService = new EsIndexRangeService(client, deflector, localEventBus, new MetricRegistry()); } From d523b74265e32a608bde922cb5fe8c228939bfcb Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Wed, 25 May 2016 17:41:02 +0200 Subject: [PATCH 05/18] Changing handling of deflector index ranges. Deflector index ranges are now statically created with the current time as begin and 0L as end. Searching for the indices matching a time range was changed accordingly. --- .../main/java/org/graylog2/Configuration.java | 4 +- .../java/org/graylog2/indexer/Deflector.java | 42 +++++++++--------- .../org/graylog2/indexer/IndexHelper.java | 10 ----- .../indexer/ranges/EsIndexRangeService.java | 5 +++ .../indexer/ranges/IndexRangeService.java | 1 + .../ranges/LegacyMongoIndexRangeService.java | 5 +++ .../ranges/MongoIndexRangeService.java | 44 ++++++++++++------- .../system/jobs/SystemJobManager.java | 16 +++++-- .../org/graylog2/indexer/DeflectorTest.java | 10 ++++- .../ranges/EsIndexRangeServiceTest.java | 3 +- .../system/jobs/SystemJobManagerTest.java | 4 +- 11 files changed, 89 insertions(+), 55 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/Configuration.java b/graylog2-server/src/main/java/org/graylog2/Configuration.java index 1815d1887148..f67716529df9 100644 --- a/graylog2-server/src/main/java/org/graylog2/Configuration.java +++ b/graylog2-server/src/main/java/org/graylog2/Configuration.java @@ -156,8 +156,8 @@ public class Configuration extends BaseConfiguration { @Parameter(value = "index_ranges_cleanup_interval", validator = PositiveDurationValidator.class) private Duration indexRangesCleanupInterval = Duration.hours(1L); - @Parameter(value = "deflector_index_range_calculation_timeout", validator = PositiveDurationValidator.class) - private Duration deflectorIndexRangeCalculationTimeout = Duration.hours(1L); + @Parameter(value = "deflector_index_read_only_timeout", validator = PositiveDurationValidator.class) + private Duration deflectorIndexReadOnlyTimeout = Duration.hours(1L); @Parameter(value = "trusted_proxies", converter = IPSubnetConverter.class) private Set trustedProxies = Collections.emptySet(); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java index 20fb6d2ee347..be6a19ba6c0e 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java @@ -21,6 +21,8 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.graylog2.indexer.indices.Indices; import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob; +import org.graylog2.indexer.ranges.IndexRange; +import org.graylog2.indexer.ranges.IndexRangeService; import org.graylog2.shared.system.activities.Activity; import org.graylog2.shared.system.activities.ActivityWriter; import org.graylog2.system.jobs.SystemJob; @@ -61,7 +63,8 @@ public class Deflector { // extends Ablenkblech private final SystemJobManager systemJobManager; private final ActivityWriter activityWriter; private final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory; - private final Duration deflectorIndexRangeCalculationTimeout; + private final IndexRangeService indexRangeService; + private final Duration deflectorIndexReadOnlyTimeout; private final String indexPrefix; private final String deflectorName; private final Indices indices; @@ -76,14 +79,16 @@ public Deflector(final SystemJobManager systemJobManager, final SetIndexReadOnlyJob.Factory indexReadOnlyJobFactory, final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory, final Indices indices, - @Named("deflector_index_range_calculation_timeout") final Duration deflectorIndexRangeCalculationTimeout) { + final IndexRangeService indexRangeService, + @Named("deflector_index_read_only_timeout")final Duration deflectorIndexReadOnlyTimeout) { this.indexPrefix = indexPrefix; this.systemJobManager = systemJobManager; this.activityWriter = activityWriter; this.indexReadOnlyJobFactory = indexReadOnlyJobFactory; this.createNewSingleIndexRangeJobFactory = createNewSingleIndexRangeJobFactory; - this.deflectorIndexRangeCalculationTimeout = deflectorIndexRangeCalculationTimeout; + this.indexRangeService = indexRangeService; + this.deflectorIndexReadOnlyTimeout = deflectorIndexReadOnlyTimeout; this.deflectorName = buildName(indexPrefix); this.indices = indices; @@ -150,6 +155,7 @@ public void cycle() { ClusterHealthStatus healthStatus = indices.waitForRecovery(newTarget); LOG.debug("Health status of index <{}>: {}", newTarget, healthStatus); + addDeflectorIndexRange(newTarget); LOG.info("Done!"); // Point deflector to new index. @@ -161,19 +167,6 @@ public void cycle() { pointTo(newTarget); activity.setMessage("Cycled deflector from to <" + newTarget + ">"); } else { - final SystemJobManager.ScheduleResult scheduleResult = addSingleIndexRanges(oldTarget); - if (scheduleResult != null) { - try { - LOG.debug("Waiting for index calculation to complete"); - scheduleResult.getFuture().get(deflectorIndexRangeCalculationTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - LOG.debug("Completed!"); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn("Unable to calculate index range of index which is to be cycled: ", e); - } - } else { - LOG.warn("Unable to schedule job calculating index range of index which is to be cycled."); - } - // Re-pointing from existing old index to the new one. LOG.debug("Now switching over deflector alias."); pointTo(newTarget, oldTarget); @@ -184,20 +177,29 @@ public void cycle() { // waiting 30 seconds to perform the background task should completely get rid of these errors. final SystemJob makeReadOnlyJob = indexReadOnlyJobFactory.create(oldTarget); try { - systemJobManager.submitWithDelay(makeReadOnlyJob, 30, TimeUnit.SECONDS); + final SystemJobManager.ScheduleResult scheduleResult = systemJobManager.submitWithDelayForResult(makeReadOnlyJob, 30, TimeUnit.SECONDS); + if (scheduleResult != null) { + scheduleResult.getFuture().get(deflectorIndexReadOnlyTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } } catch (SystemJobConcurrencyException e) { LOG.error("Cannot set index <" + oldTarget + "> to read only. It won't be optimized.", e); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + e.printStackTrace(); } + addSingleIndexRanges(oldTarget); activity.setMessage("Cycled deflector from <" + oldTarget + "> to <" + newTarget + ">"); } - addSingleIndexRanges(newTarget); - LOG.info("Done!"); activityWriter.write(activity); } + private void addDeflectorIndexRange(String newTarget) { + final IndexRange deflectorRange = indexRangeService.createForDeflector(newTarget); + indexRangeService.save(deflectorRange); + } + public int getNewestTargetNumber() throws NoTargetIndexException { final Set indexNames = indices.getIndexNamesAndAliases(getDeflectorWildcard()).keySet(); @@ -289,7 +291,7 @@ public void pointTo(final String newIndex) { @Nullable private SystemJobManager.ScheduleResult addSingleIndexRanges(String indexName) { try { - return systemJobManager.submit(createNewSingleIndexRangeJobFactory.create(this, indexName)); + return systemJobManager.submitForResult(createNewSingleIndexRangeJobFactory.create(this, indexName)); } catch (SystemJobConcurrencyException e) { final String msg = "Could not calculate index ranges for index " + indexName + " after cycling deflector: Maximum concurrency of job is reached."; activityWriter.write(new Activity(msg, Deflector.class)); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java b/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java index ea5c80e2cbd3..726908d4966d 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java @@ -107,16 +107,6 @@ public static Set determineAffectedIndicesWithRanges(IndexRangeServi } // Always include the deflector target - final String targetIndex = deflector.getCurrentActualTargetIndex(); - if (targetIndex != null) { - try { - final IndexRange deflectorIndexRange = indexRangeService.get(targetIndex); - indices.add(deflectorIndexRange); - } catch (NotFoundException e) { - LOG.warn("Couldn't find latest deflector target index", e); - } - } - return indices.build(); } } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java index c1cfc0e7a282..cbf6350e7c84 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java @@ -178,6 +178,11 @@ public IndexRange calculateRange(String index) { throw new UnsupportedOperationException(); } + @Override + public IndexRange createForDeflector(String index) { + return null; + } + @Override public void save(IndexRange indexRange) { throw new UnsupportedOperationException(); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java index 2cc63692dd3c..b23f4b25789b 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java @@ -31,4 +31,5 @@ public interface IndexRangeService { void save(IndexRange indexRange); IndexRange calculateRange(String index); + IndexRange createForDeflector(String index); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java index 7250259570ec..6c571561735c 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java @@ -108,6 +108,11 @@ public IndexRange calculateRange(String index) { throw new UnsupportedOperationException(); } + @Override + public IndexRange createForDeflector(String index) { + return null; + } + public int delete(String index) { return destroy(new BasicDBObject(FIELD_INDEX, index), COLLECTION_NAME); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java index 9251bc191bfe..de0dfbacaf82 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java @@ -66,10 +66,10 @@ public MongoIndexRangeService(MongoConnection mongoConnection, EventBus eventBus) { this.indices = indices; this.collection = JacksonDBCollection.wrap( - mongoConnection.getDatabase().getCollection(COLLECTION_NAME), - MongoIndexRange.class, - ObjectId.class, - objectMapperProvider.get()); + mongoConnection.getDatabase().getCollection(COLLECTION_NAME), + MongoIndexRange.class, + ObjectId.class, + objectMapperProvider.get()); // This sucks. We need to bridge Elasticsearch's and our own Guice injector. IndexChangeMonitor.setEventBus(eventBus); @@ -77,16 +77,16 @@ public MongoIndexRangeService(MongoConnection mongoConnection, collection.createIndex(new BasicDBObject(MongoIndexRange.FIELD_INDEX_NAME, 1)); collection.createIndex(BasicDBObjectBuilder.start() - .add(MongoIndexRange.FIELD_BEGIN, 1) - .add(MongoIndexRange.FIELD_END, 1) - .get()); + .add(MongoIndexRange.FIELD_BEGIN, 1) + .add(MongoIndexRange.FIELD_END, 1) + .get()); } @Override public IndexRange get(String index) throws NotFoundException { final DBQuery.Query query = DBQuery.and( - DBQuery.notExists("start"), - DBQuery.is(IndexRange.FIELD_INDEX_NAME, index)); + DBQuery.notExists("start"), + DBQuery.is(IndexRange.FIELD_INDEX_NAME, index)); final MongoIndexRange indexRange = collection.findOne(query); if (indexRange == null) { throw new NotFoundException("Index range for index <" + index + "> not found."); @@ -98,11 +98,18 @@ public IndexRange get(String index) throws NotFoundException { @Override public SortedSet find(DateTime begin, DateTime end) { final DBCursor indexRanges = collection.find( + DBQuery.or( DBQuery.and( - DBQuery.notExists("start"), // "start" has been used by the old index ranges in MongoDB - DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, end.getMillis()), - DBQuery.greaterThanEquals(IndexRange.FIELD_END, begin.getMillis()) + DBQuery.notExists("start"), // "start" has been used by the old index ranges in MongoDB + DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, end.getMillis()), + DBQuery.greaterThanEquals(IndexRange.FIELD_END, begin.getMillis()) + ), + DBQuery.and( + DBQuery.notExists("start"), + DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, end.getMillis()), + DBQuery.is(IndexRange.FIELD_END, 0L) ) + ) ); return ImmutableSortedSet.copyOf(IndexRange.COMPARATOR, (Iterator) indexRanges); @@ -125,6 +132,11 @@ public IndexRange calculateRange(String index) { return MongoIndexRange.create(index, stats.min(), stats.max(), now, duration); } + @Override + public IndexRange createForDeflector(String index) { + return MongoIndexRange.create(index, DateTime.now(), new DateTime(0L), DateTime.now(), 0); + } + @Override public void save(IndexRange indexRange) { collection.remove(DBQuery.in(IndexRange.FIELD_INDEX_NAME, indexRange.indexName())); @@ -158,10 +170,10 @@ public void handleIndexReopening(IndicesReopenedEvent event) { indices.waitForRecovery(index); final Retryer retryer = RetryerBuilder.newBuilder() - .retryIfException(input -> !(input instanceof IndexClosedException)) - .withWaitStrategy(WaitStrategies.exponentialWait()) - .withStopStrategy(StopStrategies.stopAfterDelay(5, TimeUnit.MINUTES)) - .build(); + .retryIfException(input -> !(input instanceof IndexClosedException)) + .withWaitStrategy(WaitStrategies.exponentialWait()) + .withStopStrategy(StopStrategies.stopAfterDelay(5, TimeUnit.MINUTES)) + .build(); final IndexRange indexRange; try { diff --git a/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java b/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java index ebc853c48b6b..a21763c6a7ce 100644 --- a/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java +++ b/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java @@ -26,7 +26,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.inject.Inject; +import javax.validation.constraints.Null; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -79,11 +81,19 @@ private ScheduledExecutorService executorService(final MetricRegistry metricRegi name(this.getClass(), "executor-service")); } - public ScheduleResult submit(final SystemJob job) throws SystemJobConcurrencyException { - return submitWithDelay(job, 0, TimeUnit.SECONDS); + public String submit(final SystemJob job) throws SystemJobConcurrencyException { + return submitForResult(job).getJobId(); } - public ScheduleResult submitWithDelay(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException { + public ScheduleResult submitForResult(final SystemJob job) throws SystemJobConcurrencyException { + return submitWithDelayForResult(job, 0, TimeUnit.SECONDS); + } + + public String submitWithDelay(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException { + return submitWithDelayForResult(job, delay, timeUnit).getJobId(); + } + + public ScheduleResult submitWithDelayForResult(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException { // for immediate jobs, check allowed concurrency right now if (delay == 0) { checkAllowedConcurrency(job); diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java index cd9177eb19f2..39b20889ceac 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.Maps; import org.graylog2.indexer.indices.Indices; import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob; +import org.graylog2.indexer.ranges.IndexRangeService; import org.graylog2.system.activities.SystemMessageActivityWriter; import org.graylog2.system.jobs.SystemJobManager; import org.junit.Before; @@ -61,9 +62,13 @@ public class DeflectorTest { private Indices indices; private Deflector deflector; + @Mock + private IndexRangeService indexRangeService; + @Before public void setUp() { - deflector = new Deflector(systemJobManager, "graylog", activityWriter, indexReadOnlyJobFactory, singleIndexRangeJobFactory, indices, deflectorIndexRangeCalculationTimeout); + deflector = new Deflector(systemJobManager, "graylog", activityWriter, indexReadOnlyJobFactory, singleIndexRangeJobFactory, + indices, indexRangeService, deflectorIndexRangeCalculationTimeout); } @Test @@ -175,6 +180,7 @@ public void getNewestTargetNumber() throws NoTargetIndexException { indexReadOnlyJobFactory, singleIndexRangeJobFactory, indices, + indexRangeService, deflectorIndexRangeCalculationTimeout); final int number = deflector.getNewestTargetNumber(); @@ -198,6 +204,7 @@ public void getAllGraylogIndexNames() { indexReadOnlyJobFactory, singleIndexRangeJobFactory, indices, + indexRangeService, deflectorIndexRangeCalculationTimeout); final String[] allGraylogIndexNames = deflector.getAllGraylogIndexNames(); @@ -222,6 +229,7 @@ public void getAllGraylogDeflectorIndices() { indexReadOnlyJobFactory, singleIndexRangeJobFactory, indices, + indexRangeService, deflectorIndexRangeCalculationTimeout); final Map> deflectorIndices = deflector.getAllGraylogDeflectorIndices(); diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java index a6f56e880e32..ef0f025757f6 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java @@ -85,7 +85,8 @@ public EsIndexRangeServiceTest() { public void setUp() throws Exception { final Messages messages = new Messages(client, ELASTICSEARCH_CONFIGURATION, new MetricRegistry()); indices = new Indices(client, ELASTICSEARCH_CONFIGURATION, new IndexMapping(), messages); - final Deflector deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION.getIndexPrefix(), new NullActivityWriter(), null, null, indices, Duration.hours(1L)); + final Deflector deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION.getIndexPrefix(), new NullActivityWriter(), + null, null, indices, null, Duration.hours(1L)); indexRangeService = new EsIndexRangeService(client, deflector, localEventBus, new MetricRegistry()); } diff --git a/graylog2-server/src/test/java/org/graylog2/system/jobs/SystemJobManagerTest.java b/graylog2-server/src/test/java/org/graylog2/system/jobs/SystemJobManagerTest.java index 54bec4b29a4f..4d937d6c2af6 100644 --- a/graylog2-server/src/test/java/org/graylog2/system/jobs/SystemJobManagerTest.java +++ b/graylog2-server/src/test/java/org/graylog2/system/jobs/SystemJobManagerTest.java @@ -41,8 +41,8 @@ public void testGetRunningJobs() throws Exception { LongRunningJob job1 = new LongRunningJob(1); LongRunningJob job2 = new LongRunningJob(1); - String jobID1 = manager.submit(job1).getJobId(); - String jobID2 = manager.submit(job2).getJobId(); + String jobID1 = manager.submit(job1); + String jobID2 = manager.submit(job2); assertEquals(2, manager.getRunningJobs().size()); assertTrue(manager.getRunningJobs().containsValue(job1)); From 4211a3f2c125f414aab8e3739c0ebe5b9952d71c Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Thu, 26 May 2016 11:26:49 +0200 Subject: [PATCH 06/18] Throwing exception instead of returning null. --- .../java/org/graylog2/indexer/ranges/EsIndexRangeService.java | 2 +- .../graylog2/indexer/ranges/LegacyMongoIndexRangeService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java index cbf6350e7c84..61817cda91c0 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java @@ -180,7 +180,7 @@ public IndexRange calculateRange(String index) { @Override public IndexRange createForDeflector(String index) { - return null; + throw new UnsupportedOperationException(); } @Override diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java index 6c571561735c..7042bb12a6c5 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java @@ -110,7 +110,7 @@ public IndexRange calculateRange(String index) { @Override public IndexRange createForDeflector(String index) { - return null; + throw new UnsupportedOperationException(); } public int delete(String index) { From d30d23c62ec15e21f3b087c6ee8827801cbfd824 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Thu, 26 May 2016 11:27:08 +0200 Subject: [PATCH 07/18] Changing begin for deflector index range, returning it per default. As the timestamp of a message is generated on the source and not during ingestion time, we cannot determine anything for the deflector index range. Therefore we need to set begin time to 0 and always return the deflector index range for search. --- .../src/main/java/org/graylog2/indexer/IndexHelper.java | 7 +++++++ .../graylog2/indexer/ranges/MongoIndexRangeService.java | 8 ++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java b/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java index 726908d4966d..d0855794b925 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java @@ -106,6 +106,13 @@ public static Set determineAffectedIndicesWithRanges(IndexRangeServi indices.add(indexRange); } + try { + final IndexRange deflectorIndexRange = indexRangeService.get(deflector.getName()); + indices.add(deflectorIndexRange); + } catch (NotFoundException e) { + LOG.warn("Unable to add deflector index range to search indices: ", e); + } + // Always include the deflector target return indices.build(); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java index de0dfbacaf82..1982ad91b4d2 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java @@ -105,9 +105,9 @@ public SortedSet find(DateTime begin, DateTime end) { DBQuery.greaterThanEquals(IndexRange.FIELD_END, begin.getMillis()) ), DBQuery.and( - DBQuery.notExists("start"), - DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, end.getMillis()), - DBQuery.is(IndexRange.FIELD_END, 0L) + DBQuery.notExists("start"), // "start" has been used by the old index ranges in MongoDB + DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, 0), + DBQuery.greaterThanEquals(IndexRange.FIELD_END, 0) ) ) ); @@ -134,7 +134,7 @@ public IndexRange calculateRange(String index) { @Override public IndexRange createForDeflector(String index) { - return MongoIndexRange.create(index, DateTime.now(), new DateTime(0L), DateTime.now(), 0); + return MongoIndexRange.create(index, new DateTime(0), new DateTime(0), DateTime.now(), 0); } @Override From 562ea417a9424fca0d392d566973dde01cc565b4 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Thu, 26 May 2016 11:31:31 +0200 Subject: [PATCH 08/18] Do not recalculate range for current deflector target. --- .../org/graylog2/indexer/ranges/RebuildIndexRangesJob.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/RebuildIndexRangesJob.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/RebuildIndexRangesJob.java index 03b3acb52f4b..22149bcc9fc5 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/RebuildIndexRangesJob.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/RebuildIndexRangesJob.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.concurrent.TimeUnit; public class RebuildIndexRangesJob extends SystemJob { @@ -85,7 +86,11 @@ public void execute() { indicesToCalculate = indices.length; Stopwatch sw = Stopwatch.createStarted(); + final String deflectorIndexName = deflector.getName(); for (String index : indices) { + if (deflectorIndexName.equals(index)) { + continue; + } if (cancelRequested) { info("Stop requested. Not calculating next index range, not updating ranges."); sw.stop(); From 4ee1bd9e77509ba95f8ba93a4d473bf56a467192 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Thu, 26 May 2016 14:12:32 +0200 Subject: [PATCH 09/18] Adapt tests to new index helper behavior. --- .../org/graylog2/indexer/IndexHelper.java | 7 -- .../org/graylog2/indexer/IndexHelperTest.java | 104 +++++++++--------- 2 files changed, 53 insertions(+), 58 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java b/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java index d0855794b925..726908d4966d 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java @@ -106,13 +106,6 @@ public static Set determineAffectedIndicesWithRanges(IndexRangeServi indices.add(indexRange); } - try { - final IndexRange deflectorIndexRange = indexRangeService.get(deflector.getName()); - indices.add(deflectorIndexRange); - } catch (NotFoundException e) { - LOG.warn("Unable to add deflector index range to search indices: ", e); - } - // Always include the deflector target return indices.build(); } diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/IndexHelperTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/IndexHelperTest.java index e51676d905d9..dde430247fef 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/IndexHelperTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/IndexHelperTest.java @@ -69,31 +69,31 @@ public static void shutdown() { @Test public void testGetOldestIndices() { final Set indices = ImmutableSet.builder() - .add("graylog2_production_1") - .add("graylog2_production_7") - .add("graylog2_production_0") - .add("graylog2_production_2") - .add("graylog2_production_4") - .add("graylog2_production_6") - .add("graylog2_production_3") - .add("graylog2_production_5") - .add("graylog2_production_8") - .add("graylog2_production_9") - .add("graylog2_production_10") - .add("graylog2_production_110") - .add("graylog2_production_125") - .add("graylog2_production_20") - .add("graylog2_production_21") - .build(); + .add("graylog2_production_1") + .add("graylog2_production_7") + .add("graylog2_production_0") + .add("graylog2_production_2") + .add("graylog2_production_4") + .add("graylog2_production_6") + .add("graylog2_production_3") + .add("graylog2_production_5") + .add("graylog2_production_8") + .add("graylog2_production_9") + .add("graylog2_production_10") + .add("graylog2_production_110") + .add("graylog2_production_125") + .add("graylog2_production_20") + .add("graylog2_production_21") + .build(); assertThat(IndexHelper.getOldestIndices(indices, 7)).containsOnly( - "graylog2_production_0", - "graylog2_production_1", - "graylog2_production_2", - "graylog2_production_3", - "graylog2_production_4", - "graylog2_production_5", - "graylog2_production_6"); + "graylog2_production_0", + "graylog2_production_1", + "graylog2_production_2", + "graylog2_production_3", + "graylog2_production_4", + "graylog2_production_5", + "graylog2_production_6"); assertThat(IndexHelper.getOldestIndices(indices, 1)).containsOnly("graylog2_production_0"); } @@ -109,9 +109,10 @@ public void determineAffectedIndicesWithRangesIncludesDeflectorTarget() throws E final MongoIndexRange indexRange1 = MongoIndexRange.create("graylog_1", now.plusDays(1), now.plusDays(2), now, 0); final MongoIndexRange indexRangeLatest = MongoIndexRange.create("graylog_2", new DateTime(0L), new DateTime(0L), now, 0); final SortedSet indices = ImmutableSortedSet.orderedBy(IndexRange.COMPARATOR) - .add(indexRange0) - .add(indexRange1) - .build(); + .add(indexRange0) + .add(indexRange1) + .add(indexRangeLatest) + .build(); when(indexRangeService.find(any(DateTime.class), any(DateTime.class))).thenReturn(indices); when(indexRangeService.get("graylog_2")).thenReturn(indexRangeLatest); @@ -122,11 +123,11 @@ public void determineAffectedIndicesWithRangesIncludesDeflectorTarget() throws E final TimeRange relativeRange = RelativeRange.create(3600); assertThat(IndexHelper.determineAffectedIndicesWithRanges(indexRangeService, deflector, absoluteRange)) - .containsExactly(indexRangeLatest, indexRange0, indexRange1); + .containsExactly(indexRangeLatest, indexRange0, indexRange1); assertThat(IndexHelper.determineAffectedIndicesWithRanges(indexRangeService, deflector, keywordRange)) - .containsExactly(indexRangeLatest, indexRange0, indexRange1); + .containsExactly(indexRangeLatest, indexRange0, indexRange1); assertThat(IndexHelper.determineAffectedIndicesWithRanges(indexRangeService, deflector, relativeRange)) - .containsExactly(indexRangeLatest, indexRange0, indexRange1); + .containsExactly(indexRangeLatest, indexRange0, indexRange1); } @Test @@ -135,9 +136,9 @@ public void determineAffectedIndicesWithRangesDoesNotIncludesDeflectorTargetIfMi final MongoIndexRange indexRange0 = MongoIndexRange.create("graylog_0", now, now.plusDays(1), now, 0); final MongoIndexRange indexRange1 = MongoIndexRange.create("graylog_1", now.plusDays(1), now.plusDays(2), now, 0); final SortedSet indices = ImmutableSortedSet.orderedBy(IndexRange.COMPARATOR) - .add(indexRange0) - .add(indexRange1) - .build(); + .add(indexRange0) + .add(indexRange1) + .build(); when(indexRangeService.find(any(DateTime.class), any(DateTime.class))).thenReturn(indices); when(deflector.getCurrentActualTargetIndex()).thenReturn(null); @@ -147,11 +148,11 @@ public void determineAffectedIndicesWithRangesDoesNotIncludesDeflectorTargetIfMi final TimeRange relativeRange = RelativeRange.create(3600); assertThat(IndexHelper.determineAffectedIndicesWithRanges(indexRangeService, deflector, absoluteRange)) - .containsExactly(indexRange0, indexRange1); + .containsExactly(indexRange0, indexRange1); assertThat(IndexHelper.determineAffectedIndicesWithRanges(indexRangeService, deflector, keywordRange)) - .containsExactly(indexRange0, indexRange1); + .containsExactly(indexRange0, indexRange1); assertThat(IndexHelper.determineAffectedIndicesWithRanges(indexRangeService, deflector, relativeRange)) - .containsExactly(indexRange0, indexRange1); + .containsExactly(indexRange0, indexRange1); } @Test @@ -161,9 +162,10 @@ public void determineAffectedIndicesIncludesDeflectorTarget() throws Exception { final MongoIndexRange indexRange1 = MongoIndexRange.create("graylog_1", now.plusDays(1), now.plusDays(2), now, 0); final MongoIndexRange indexRangeLatest = MongoIndexRange.create("graylog_2", new DateTime(0L), new DateTime(0L), now, 0); final SortedSet indices = ImmutableSortedSet.orderedBy(IndexRange.COMPARATOR) - .add(indexRange0) - .add(indexRange1) - .build(); + .add(indexRange0) + .add(indexRange1) + .add(indexRangeLatest) + .build(); when(indexRangeService.find(any(DateTime.class), any(DateTime.class))).thenReturn(indices); when(indexRangeService.get("graylog_2")).thenReturn(indexRangeLatest); @@ -174,11 +176,11 @@ public void determineAffectedIndicesIncludesDeflectorTarget() throws Exception { final TimeRange relativeRange = RelativeRange.create(3600); assertThat(IndexHelper.determineAffectedIndices(indexRangeService, deflector, absoluteRange)) - .containsExactly(indexRangeLatest.indexName(), indexRange0.indexName(), indexRange1.indexName()); + .containsExactly(indexRangeLatest.indexName(), indexRange0.indexName(), indexRange1.indexName()); assertThat(IndexHelper.determineAffectedIndices(indexRangeService, deflector, keywordRange)) - .containsExactly(indexRangeLatest.indexName(), indexRange0.indexName(), indexRange1.indexName()); + .containsExactly(indexRangeLatest.indexName(), indexRange0.indexName(), indexRange1.indexName()); assertThat(IndexHelper.determineAffectedIndices(indexRangeService, deflector, relativeRange)) - .containsExactly(indexRangeLatest.indexName(), indexRange0.indexName(), indexRange1.indexName()); + .containsExactly(indexRangeLatest.indexName(), indexRange0.indexName(), indexRange1.indexName()); } @Test @@ -187,9 +189,9 @@ public void determineAffectedIndicesDoesNotIncludesDeflectorTargetIfMissing() th final MongoIndexRange indexRange0 = MongoIndexRange.create("graylog_0", now, now.plusDays(1), now, 0); final MongoIndexRange indexRange1 = MongoIndexRange.create("graylog_1", now.plusDays(1), now.plusDays(2), now, 0); final SortedSet indices = ImmutableSortedSet.orderedBy(IndexRange.COMPARATOR) - .add(indexRange0) - .add(indexRange1) - .build(); + .add(indexRange0) + .add(indexRange1) + .build(); when(indexRangeService.find(any(DateTime.class), any(DateTime.class))).thenReturn(indices); when(deflector.getCurrentActualTargetIndex()).thenReturn(null); @@ -199,11 +201,11 @@ public void determineAffectedIndicesDoesNotIncludesDeflectorTargetIfMissing() th final TimeRange relativeRange = RelativeRange.create(3600); assertThat(IndexHelper.determineAffectedIndices(indexRangeService, deflector, absoluteRange)) - .containsOnly(indexRange0.indexName(), indexRange1.indexName()); + .containsOnly(indexRange0.indexName(), indexRange1.indexName()); assertThat(IndexHelper.determineAffectedIndices(indexRangeService, deflector, keywordRange)) - .containsOnly(indexRange0.indexName(), indexRange1.indexName()); + .containsOnly(indexRange0.indexName(), indexRange1.indexName()); assertThat(IndexHelper.determineAffectedIndices(indexRangeService, deflector, relativeRange)) - .containsOnly(indexRange0.indexName(), indexRange1.indexName()); + .containsOnly(indexRange0.indexName(), indexRange1.indexName()); } @Test @@ -218,9 +220,9 @@ public void getTimestampRangeFilterReturnsRangeQueryWithGivenTimeRange() { final TimeRange timeRange = AbsoluteRange.create(from, to); final RangeQueryBuilder queryBuilder = (RangeQueryBuilder) IndexHelper.getTimestampRangeFilter(timeRange); assertThat(queryBuilder) - .isNotNull() - .hasFieldOrPropertyWithValue("name", "timestamp") - .hasFieldOrPropertyWithValue("from", Tools.buildElasticSearchTimeFormat(from)) - .hasFieldOrPropertyWithValue("to", Tools.buildElasticSearchTimeFormat(to)); + .isNotNull() + .hasFieldOrPropertyWithValue("name", "timestamp") + .hasFieldOrPropertyWithValue("from", Tools.buildElasticSearchTimeFormat(from)) + .hasFieldOrPropertyWithValue("to", Tools.buildElasticSearchTimeFormat(to)); } } From 6a1fa50f31bce270a75bfece7b536a67d84718aa Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Thu, 26 May 2016 14:52:27 +0200 Subject: [PATCH 10/18] Making Long what's supposed to be Long. --- .../org/graylog2/indexer/ranges/MongoIndexRangeService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java index 1982ad91b4d2..540d6b2fd1aa 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java @@ -106,8 +106,8 @@ public SortedSet find(DateTime begin, DateTime end) { ), DBQuery.and( DBQuery.notExists("start"), // "start" has been used by the old index ranges in MongoDB - DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, 0), - DBQuery.greaterThanEquals(IndexRange.FIELD_END, 0) + DBQuery.lessThanEquals(IndexRange.FIELD_BEGIN, 0L), + DBQuery.greaterThanEquals(IndexRange.FIELD_END, 0L) ) ) ); @@ -134,7 +134,7 @@ public IndexRange calculateRange(String index) { @Override public IndexRange createForDeflector(String index) { - return MongoIndexRange.create(index, new DateTime(0), new DateTime(0), DateTime.now(), 0); + return MongoIndexRange.create(index, new DateTime(0L), new DateTime(0L), DateTime.now(), 0); } @Override From 9fdf88a80c5dc1a974ab2a6e263c0d2c48e1a6b6 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Thu, 26 May 2016 15:11:43 +0200 Subject: [PATCH 11/18] Adding whitespace, fixing exception logging. --- .../src/main/java/org/graylog2/indexer/Deflector.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java index be6a19ba6c0e..a1783a488807 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java @@ -80,7 +80,7 @@ public Deflector(final SystemJobManager systemJobManager, final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory, final Indices indices, final IndexRangeService indexRangeService, - @Named("deflector_index_read_only_timeout")final Duration deflectorIndexReadOnlyTimeout) { + @Named("deflector_index_read_only_timeout") final Duration deflectorIndexReadOnlyTimeout) { this.indexPrefix = indexPrefix; this.systemJobManager = systemJobManager; @@ -181,10 +181,8 @@ public void cycle() { if (scheduleResult != null) { scheduleResult.getFuture().get(deflectorIndexReadOnlyTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); } - } catch (SystemJobConcurrencyException e) { + } catch (SystemJobConcurrencyException | InterruptedException | ExecutionException | TimeoutException e) { LOG.error("Cannot set index <" + oldTarget + "> to read only. It won't be optimized.", e); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - e.printStackTrace(); } addSingleIndexRanges(oldTarget); activity.setMessage("Cycled deflector from <" + oldTarget + "> to <" + newTarget + ">"); From ff0755c250bf990b5a020b2de444eaa30d5411fc Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Thu, 26 May 2016 15:47:20 +0200 Subject: [PATCH 12/18] Remove leftover comment. --- .../src/main/java/org/graylog2/indexer/IndexHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java b/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java index 726908d4966d..920e505db41e 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/IndexHelper.java @@ -106,7 +106,6 @@ public static Set determineAffectedIndicesWithRanges(IndexRangeServi indices.add(indexRange); } - // Always include the deflector target return indices.build(); } } From 7eb08c56869c831924176f5c63c96109b302a83b Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Wed, 1 Jun 2016 16:57:29 +0200 Subject: [PATCH 13/18] Renaming method to make its intention clearer. --- .../src/main/java/org/graylog2/indexer/Deflector.java | 2 +- .../java/org/graylog2/indexer/ranges/EsIndexRangeService.java | 2 +- .../java/org/graylog2/indexer/ranges/IndexRangeService.java | 2 +- .../graylog2/indexer/ranges/LegacyMongoIndexRangeService.java | 2 +- .../org/graylog2/indexer/ranges/MongoIndexRangeService.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java index a1783a488807..7888644d86d7 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java @@ -194,7 +194,7 @@ public void cycle() { } private void addDeflectorIndexRange(String newTarget) { - final IndexRange deflectorRange = indexRangeService.createForDeflector(newTarget); + final IndexRange deflectorRange = indexRangeService.createEmptyRange(newTarget); indexRangeService.save(deflectorRange); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java index 61817cda91c0..5072db9c1d6f 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java @@ -179,7 +179,7 @@ public IndexRange calculateRange(String index) { } @Override - public IndexRange createForDeflector(String index) { + public IndexRange createEmptyRange(String index) { throw new UnsupportedOperationException(); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java index b23f4b25789b..7936b7a1983c 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java @@ -31,5 +31,5 @@ public interface IndexRangeService { void save(IndexRange indexRange); IndexRange calculateRange(String index); - IndexRange createForDeflector(String index); + IndexRange createEmptyRange(String index); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java index 7042bb12a6c5..c90cea3d68b1 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java @@ -109,7 +109,7 @@ public IndexRange calculateRange(String index) { } @Override - public IndexRange createForDeflector(String index) { + public IndexRange createEmptyRange(String index) { throw new UnsupportedOperationException(); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java index 540d6b2fd1aa..1a6a4b202654 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java @@ -133,7 +133,7 @@ public IndexRange calculateRange(String index) { } @Override - public IndexRange createForDeflector(String index) { + public IndexRange createEmptyRange(String index) { return MongoIndexRange.create(index, new DateTime(0L), new DateTime(0L), DateTime.now(), 0); } From 56dd94a49d7778543b6e5438fa83f73f241d04b8 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Mon, 6 Jun 2016 14:49:20 +0200 Subject: [PATCH 14/18] Extracting index r/o and range calculation logic into separate composed job. --- .../main/java/org/graylog2/Configuration.java | 3 - .../java/org/graylog2/indexer/Deflector.java | 41 +++-------- .../SetIndexReadOnlyAndCalculateRangeJob.java | 72 +++++++++++++++++++ .../system/jobs/SystemJobManager.java | 35 +-------- .../org/graylog2/indexer/DeflectorTest.java | 23 +++--- .../ranges/EsIndexRangeServiceTest.java | 2 +- 6 files changed, 96 insertions(+), 80 deletions(-) create mode 100644 graylog2-server/src/main/java/org/graylog2/indexer/indices/jobs/SetIndexReadOnlyAndCalculateRangeJob.java diff --git a/graylog2-server/src/main/java/org/graylog2/Configuration.java b/graylog2-server/src/main/java/org/graylog2/Configuration.java index f67716529df9..49ba839e1602 100644 --- a/graylog2-server/src/main/java/org/graylog2/Configuration.java +++ b/graylog2-server/src/main/java/org/graylog2/Configuration.java @@ -156,9 +156,6 @@ public class Configuration extends BaseConfiguration { @Parameter(value = "index_ranges_cleanup_interval", validator = PositiveDurationValidator.class) private Duration indexRangesCleanupInterval = Duration.hours(1L); - @Parameter(value = "deflector_index_read_only_timeout", validator = PositiveDurationValidator.class) - private Duration deflectorIndexReadOnlyTimeout = Duration.hours(1L); - @Parameter(value = "trusted_proxies", converter = IPSubnetConverter.class) private Set trustedProxies = Collections.emptySet(); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java index 7888644d86d7..3b9da22821a3 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java @@ -16,11 +16,10 @@ */ package org.graylog2.indexer; -import com.github.joschi.jadconfig.util.Duration; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.indices.InvalidAliasNameException; import org.graylog2.indexer.indices.Indices; -import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob; +import org.graylog2.indexer.indices.jobs.SetIndexReadOnlyAndCalculateRangeJob; import org.graylog2.indexer.ranges.IndexRange; import org.graylog2.indexer.ranges.IndexRangeService; import org.graylog2.shared.system.activities.Activity; @@ -37,9 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -62,33 +59,27 @@ public class Deflector { // extends Ablenkblech private final SystemJobManager systemJobManager; private final ActivityWriter activityWriter; - private final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory; private final IndexRangeService indexRangeService; - private final Duration deflectorIndexReadOnlyTimeout; private final String indexPrefix; private final String deflectorName; private final Indices indices; - private final SetIndexReadOnlyJob.Factory indexReadOnlyJobFactory; private final Pattern deflectorIndexPattern; private final Pattern indexPattern; + private final SetIndexReadOnlyAndCalculateRangeJob.Factory setIndexReadOnlyAndCalculateRangeJobFactory; @Inject public Deflector(final SystemJobManager systemJobManager, @Named("elasticsearch_index_prefix") final String indexPrefix, final ActivityWriter activityWriter, - final SetIndexReadOnlyJob.Factory indexReadOnlyJobFactory, - final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory, final Indices indices, final IndexRangeService indexRangeService, - @Named("deflector_index_read_only_timeout") final Duration deflectorIndexReadOnlyTimeout) { + final SetIndexReadOnlyAndCalculateRangeJob.Factory setIndexReadOnlyAndCalculateRangeJobFactory) { this.indexPrefix = indexPrefix; this.systemJobManager = systemJobManager; this.activityWriter = activityWriter; - this.indexReadOnlyJobFactory = indexReadOnlyJobFactory; - this.createNewSingleIndexRangeJobFactory = createNewSingleIndexRangeJobFactory; this.indexRangeService = indexRangeService; - this.deflectorIndexReadOnlyTimeout = deflectorIndexReadOnlyTimeout; + this.setIndexReadOnlyAndCalculateRangeJobFactory = setIndexReadOnlyAndCalculateRangeJobFactory; this.deflectorName = buildName(indexPrefix); this.indices = indices; @@ -175,16 +166,12 @@ public void cycle() { // it can happen that an index request still writes to the old deflector target, while we cycled it above. // setting the index to readOnly would result in ClusterBlockExceptions in the indexing request. // waiting 30 seconds to perform the background task should completely get rid of these errors. - final SystemJob makeReadOnlyJob = indexReadOnlyJobFactory.create(oldTarget); + final SystemJob setIndexReadOnlyAndCalculateRangeJob = setIndexReadOnlyAndCalculateRangeJobFactory.create(oldTarget); try { - final SystemJobManager.ScheduleResult scheduleResult = systemJobManager.submitWithDelayForResult(makeReadOnlyJob, 30, TimeUnit.SECONDS); - if (scheduleResult != null) { - scheduleResult.getFuture().get(deflectorIndexReadOnlyTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } - } catch (SystemJobConcurrencyException | InterruptedException | ExecutionException | TimeoutException e) { - LOG.error("Cannot set index <" + oldTarget + "> to read only. It won't be optimized.", e); + systemJobManager.submitWithDelay(setIndexReadOnlyAndCalculateRangeJob, 30, TimeUnit.SECONDS); + } catch (SystemJobConcurrencyException e) { + LOG.error("Cannot set index <" + oldTarget + "> to read only and calculate its range. It won't be optimized.", e); } - addSingleIndexRanges(oldTarget); activity.setMessage("Cycled deflector from <" + oldTarget + "> to <" + newTarget + ">"); } @@ -286,18 +273,6 @@ public void pointTo(final String newIndex) { indices.cycleAlias(getName(), newIndex); } - @Nullable - private SystemJobManager.ScheduleResult addSingleIndexRanges(String indexName) { - try { - return systemJobManager.submitForResult(createNewSingleIndexRangeJobFactory.create(this, indexName)); - } catch (SystemJobConcurrencyException e) { - final String msg = "Could not calculate index ranges for index " + indexName + " after cycling deflector: Maximum concurrency of job is reached."; - activityWriter.write(new Activity(msg, Deflector.class)); - LOG.error(msg, e); - } - return null; - } - @Nullable public String getCurrentActualTargetIndex() { return indices.aliasTarget(getName()); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/indices/jobs/SetIndexReadOnlyAndCalculateRangeJob.java b/graylog2-server/src/main/java/org/graylog2/indexer/indices/jobs/SetIndexReadOnlyAndCalculateRangeJob.java new file mode 100644 index 000000000000..75cf6566a050 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/indexer/indices/jobs/SetIndexReadOnlyAndCalculateRangeJob.java @@ -0,0 +1,72 @@ +package org.graylog2.indexer.indices.jobs; + +import com.google.inject.assistedinject.Assisted; +import org.graylog2.indexer.Deflector; +import org.graylog2.indexer.SetIndexReadOnlyJob; +import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob; +import org.graylog2.system.jobs.SystemJob; + +import javax.inject.Inject; + +public class SetIndexReadOnlyAndCalculateRangeJob extends SystemJob { + public interface Factory { + SetIndexReadOnlyAndCalculateRangeJob create(String indexName); + } + + private final SetIndexReadOnlyJob.Factory setIndexReadOnlyJobFactory; + private final CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory; + private final Deflector deflector; + private final String indexName; + + @Inject + public SetIndexReadOnlyAndCalculateRangeJob(SetIndexReadOnlyJob.Factory setIndexReadOnlyJobFactory, + CreateNewSingleIndexRangeJob.Factory createNewSingleIndexRangeJobFactory, + Deflector deflector, + @Assisted String indexName) { + this.setIndexReadOnlyJobFactory = setIndexReadOnlyJobFactory; + this.createNewSingleIndexRangeJobFactory = createNewSingleIndexRangeJobFactory; + this.deflector = deflector; + this.indexName = indexName; + } + + public void execute() { + final SystemJob setIndexReadOnlyJob = setIndexReadOnlyJobFactory.create(indexName); + setIndexReadOnlyJob.execute(); + final SystemJob createNewSingleIndexRangeJob = createNewSingleIndexRangeJobFactory.create(deflector, indexName); + createNewSingleIndexRangeJob.execute(); + + } + + @Override + public void requestCancel() {} + + @Override + public int getProgress() { + return 0; + } + + @Override + public int maxConcurrency() { + return 1000; + } + + @Override + public boolean providesProgress() { + return false; + } + + @Override + public boolean isCancelable() { + return false; + } + + @Override + public String getDescription() { + return "Makes index " + indexName + " read only and calculates and adds its index range afterwards."; + } + + @Override + public String getClassName() { + return this.getClass().getCanonicalName(); + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java b/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java index a21763c6a7ce..d70408121bf5 100644 --- a/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java +++ b/graylog2-server/src/main/java/org/graylog2/system/jobs/SystemJobManager.java @@ -26,14 +26,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import javax.inject.Inject; -import javax.validation.constraints.Null; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -41,24 +38,6 @@ public class SystemJobManager { - public class ScheduleResult { - private final String jobId; - private final ScheduledFuture future; - - ScheduleResult(final String jobId, final ScheduledFuture future) { - this.jobId = jobId; - this.future = future; - } - - public String getJobId() { - return jobId; - } - - public ScheduledFuture getFuture() { - return future; - } - } - private static final Logger LOG = LoggerFactory.getLogger(SystemJobManager.class); private static final int THREAD_POOL_SIZE = 15; @@ -82,18 +61,10 @@ private ScheduledExecutorService executorService(final MetricRegistry metricRegi } public String submit(final SystemJob job) throws SystemJobConcurrencyException { - return submitForResult(job).getJobId(); - } - - public ScheduleResult submitForResult(final SystemJob job) throws SystemJobConcurrencyException { - return submitWithDelayForResult(job, 0, TimeUnit.SECONDS); + return submitWithDelay(job, 0, TimeUnit.SECONDS); } public String submitWithDelay(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException { - return submitWithDelayForResult(job, delay, timeUnit).getJobId(); - } - - public ScheduleResult submitWithDelayForResult(final SystemJob job, final long delay, TimeUnit timeUnit) throws SystemJobConcurrencyException { // for immediate jobs, check allowed concurrency right now if (delay == 0) { checkAllowedConcurrency(job); @@ -104,7 +75,7 @@ public ScheduleResult submitWithDelayForResult(final SystemJob job, final long d job.setId(new UUID().toString()); jobs.put(job.getId(), job); - final ScheduledFuture future = executor.schedule(new Runnable() { + executor.schedule(new Runnable() { @Override public void run() { try { @@ -132,7 +103,7 @@ public void run() { }, delay, timeUnit); LOG.info("Submitted SystemJob <{}> [{}]", job.getId(), jobClass); - return new ScheduleResult(job.getId(), future); + return job.getId(); } protected void checkAllowedConcurrency(SystemJob job) throws SystemJobConcurrencyException { diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java index 39b20889ceac..89b6d7670c78 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java @@ -23,6 +23,7 @@ import com.github.joschi.jadconfig.util.Duration; import com.google.common.collect.Maps; import org.graylog2.indexer.indices.Indices; +import org.graylog2.indexer.indices.jobs.SetIndexReadOnlyAndCalculateRangeJob; import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob; import org.graylog2.indexer.ranges.IndexRangeService; import org.graylog2.system.activities.SystemMessageActivityWriter; @@ -59,6 +60,8 @@ public class DeflectorTest { @Mock private CreateNewSingleIndexRangeJob.Factory singleIndexRangeJobFactory; @Mock + private SetIndexReadOnlyAndCalculateRangeJob.Factory setIndexReadOnlyAndCalculateRangeJobFactory; + @Mock private Indices indices; private Deflector deflector; @@ -67,8 +70,12 @@ public class DeflectorTest { @Before public void setUp() { - deflector = new Deflector(systemJobManager, "graylog", activityWriter, indexReadOnlyJobFactory, singleIndexRangeJobFactory, - indices, indexRangeService, deflectorIndexRangeCalculationTimeout); + deflector = new Deflector(systemJobManager, + "graylog", + activityWriter, + indices, + indexRangeService, + setIndexReadOnlyAndCalculateRangeJobFactory); } @Test @@ -177,11 +184,9 @@ public void getNewestTargetNumber() throws NoTargetIndexException { final Deflector deflector = new Deflector(systemJobManager, "graylog", activityWriter, - indexReadOnlyJobFactory, - singleIndexRangeJobFactory, indices, indexRangeService, - deflectorIndexRangeCalculationTimeout); + setIndexReadOnlyAndCalculateRangeJobFactory); final int number = deflector.getNewestTargetNumber(); assertEquals(3, number); @@ -201,11 +206,9 @@ public void getAllGraylogIndexNames() { final Deflector deflector = new Deflector(systemJobManager, "graylog", activityWriter, - indexReadOnlyJobFactory, - singleIndexRangeJobFactory, indices, indexRangeService, - deflectorIndexRangeCalculationTimeout); + setIndexReadOnlyAndCalculateRangeJobFactory); final String[] allGraylogIndexNames = deflector.getAllGraylogIndexNames(); assertThat(allGraylogIndexNames) @@ -226,11 +229,9 @@ public void getAllGraylogDeflectorIndices() { final Deflector deflector = new Deflector(systemJobManager, "graylog", activityWriter, - indexReadOnlyJobFactory, - singleIndexRangeJobFactory, indices, indexRangeService, - deflectorIndexRangeCalculationTimeout); + setIndexReadOnlyAndCalculateRangeJobFactory); final Map> deflectorIndices = deflector.getAllGraylogDeflectorIndices(); diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java index ef0f025757f6..b8d890613ec5 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java @@ -86,7 +86,7 @@ public void setUp() throws Exception { final Messages messages = new Messages(client, ELASTICSEARCH_CONFIGURATION, new MetricRegistry()); indices = new Indices(client, ELASTICSEARCH_CONFIGURATION, new IndexMapping(), messages); final Deflector deflector = new Deflector(null, ELASTICSEARCH_CONFIGURATION.getIndexPrefix(), new NullActivityWriter(), - null, null, indices, null, Duration.hours(1L)); + indices, null, null); indexRangeService = new EsIndexRangeService(client, deflector, localEventBus, new MetricRegistry()); } From af3d6191d65b47aca020877d701076dbeb208484 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Mon, 6 Jun 2016 14:51:23 +0200 Subject: [PATCH 15/18] Add license header. --- .../SetIndexReadOnlyAndCalculateRangeJob.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/indices/jobs/SetIndexReadOnlyAndCalculateRangeJob.java b/graylog2-server/src/main/java/org/graylog2/indexer/indices/jobs/SetIndexReadOnlyAndCalculateRangeJob.java index 75cf6566a050..1950dbffb76f 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/indices/jobs/SetIndexReadOnlyAndCalculateRangeJob.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/indices/jobs/SetIndexReadOnlyAndCalculateRangeJob.java @@ -1,3 +1,19 @@ +/** + * This file is part of Graylog. + * + * Graylog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog. If not, see . + */ package org.graylog2.indexer.indices.jobs; import com.google.inject.assistedinject.Assisted; From 74ea13cf94eaa4520acacd968c7763cd7378b3cf Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Mon, 6 Jun 2016 14:52:49 +0200 Subject: [PATCH 16/18] Rename method again to make intention even clearer :) --- .../src/main/java/org/graylog2/indexer/Deflector.java | 2 +- .../java/org/graylog2/indexer/ranges/EsIndexRangeService.java | 2 +- .../java/org/graylog2/indexer/ranges/IndexRangeService.java | 2 +- .../graylog2/indexer/ranges/LegacyMongoIndexRangeService.java | 2 +- .../org/graylog2/indexer/ranges/MongoIndexRangeService.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java index 3b9da22821a3..958655735c82 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/Deflector.java @@ -181,7 +181,7 @@ public void cycle() { } private void addDeflectorIndexRange(String newTarget) { - final IndexRange deflectorRange = indexRangeService.createEmptyRange(newTarget); + final IndexRange deflectorRange = indexRangeService.createUnknownRange(newTarget); indexRangeService.save(deflectorRange); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java index 5072db9c1d6f..59da0317d413 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/EsIndexRangeService.java @@ -179,7 +179,7 @@ public IndexRange calculateRange(String index) { } @Override - public IndexRange createEmptyRange(String index) { + public IndexRange createUnknownRange(String index) { throw new UnsupportedOperationException(); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java index 7936b7a1983c..735aa3e3120d 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/IndexRangeService.java @@ -31,5 +31,5 @@ public interface IndexRangeService { void save(IndexRange indexRange); IndexRange calculateRange(String index); - IndexRange createEmptyRange(String index); + IndexRange createUnknownRange(String index); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java index c90cea3d68b1..37a92cbeca54 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/LegacyMongoIndexRangeService.java @@ -109,7 +109,7 @@ public IndexRange calculateRange(String index) { } @Override - public IndexRange createEmptyRange(String index) { + public IndexRange createUnknownRange(String index) { throw new UnsupportedOperationException(); } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java index 1a6a4b202654..d07f45d5a823 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/ranges/MongoIndexRangeService.java @@ -133,7 +133,7 @@ public IndexRange calculateRange(String index) { } @Override - public IndexRange createEmptyRange(String index) { + public IndexRange createUnknownRange(String index) { return MongoIndexRange.create(index, new DateTime(0L), new DateTime(0L), DateTime.now(), 0); } From 449ab41b7342c596f763f1f71ecc805254f9c873 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Tue, 7 Jun 2016 10:50:05 +0200 Subject: [PATCH 17/18] Binding factory for r/o + index range job. --- .../src/main/java/org/graylog2/bindings/ServerBindings.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/graylog2-server/src/main/java/org/graylog2/bindings/ServerBindings.java b/graylog2-server/src/main/java/org/graylog2/bindings/ServerBindings.java index 0d3ab44af93a..4ae42f674292 100644 --- a/graylog2-server/src/main/java/org/graylog2/bindings/ServerBindings.java +++ b/graylog2-server/src/main/java/org/graylog2/bindings/ServerBindings.java @@ -51,6 +51,7 @@ import org.graylog2.indexer.healing.FixDeflectorByDeleteJob; import org.graylog2.indexer.healing.FixDeflectorByMoveJob; import org.graylog2.indexer.indices.jobs.OptimizeIndexJob; +import org.graylog2.indexer.indices.jobs.SetIndexReadOnlyAndCalculateRangeJob; import org.graylog2.indexer.ranges.CreateNewSingleIndexRangeJob; import org.graylog2.indexer.ranges.RebuildIndexRangesJob; import org.graylog2.inputs.InputEventListener; @@ -116,12 +117,15 @@ private void bindProviders() { } private void bindFactoryModules() { + // System Jobs install(new FactoryModuleBuilder().build(RebuildIndexRangesJob.Factory.class)); install(new FactoryModuleBuilder().build(OptimizeIndexJob.Factory.class)); install(new FactoryModuleBuilder().build(SetIndexReadOnlyJob.Factory.class)); install(new FactoryModuleBuilder().build(CreateNewSingleIndexRangeJob.Factory.class)); install(new FactoryModuleBuilder().build(FixDeflectorByDeleteJob.Factory.class)); install(new FactoryModuleBuilder().build(FixDeflectorByMoveJob.Factory.class)); + install(new FactoryModuleBuilder().build(SetIndexReadOnlyAndCalculateRangeJob.Factory.class)); + install(new FactoryModuleBuilder().build(LdapSettingsImpl.Factory.class)); install(new FactoryModuleBuilder().build(FieldValueAlertCondition.Factory.class)); install(new FactoryModuleBuilder().build(MessageCountAlertCondition.Factory.class)); From 477a01d9de2b305576ba489dd70d4f7ba42515ad Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Tue, 7 Jun 2016 11:21:27 +0200 Subject: [PATCH 18/18] Removing unused imports/field. --- .../src/test/java/org/graylog2/indexer/DeflectorTest.java | 3 --- .../org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java | 1 - 2 files changed, 4 deletions(-) diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java index 89b6d7670c78..b80dfa322913 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/DeflectorTest.java @@ -20,7 +20,6 @@ */ package org.graylog2.indexer; -import com.github.joschi.jadconfig.util.Duration; import com.google.common.collect.Maps; import org.graylog2.indexer.indices.Indices; import org.graylog2.indexer.indices.jobs.SetIndexReadOnlyAndCalculateRangeJob; @@ -49,8 +48,6 @@ @RunWith(MockitoJUnitRunner.class) public class DeflectorTest { - private static final Duration deflectorIndexRangeCalculationTimeout = Duration.hours(1L); - @Mock private SystemJobManager systemJobManager; @Mock diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java index b8d890613ec5..b8de8003096a 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/ranges/EsIndexRangeServiceTest.java @@ -17,7 +17,6 @@ package org.graylog2.indexer.ranges; import com.codahale.metrics.MetricRegistry; -import com.github.joschi.jadconfig.util.Duration; import com.github.joschi.nosqlunit.elasticsearch2.ElasticsearchRule; import com.github.joschi.nosqlunit.elasticsearch2.EmbeddedElasticsearch; import com.google.common.collect.ImmutableSet;