From a5fb1f9e0ce662aa7f8a8d6e79645dc9438cee82 Mon Sep 17 00:00:00 2001 From: Eric Hartmann Date: Fri, 14 Apr 2017 11:58:04 +0200 Subject: [PATCH] SONAR-8986 add purge of tasks of non existing workers to ce clean job --- .../ce/configuration/CeConfiguration.java | 11 +- .../ce/configuration/CeConfigurationImpl.java | 5 +- .../CeConfigurationImplTest.java | 8 +- .../ce/cleaning/CeCleaningSchedulerImpl.java | 28 ++++- .../org/sonar/ce/queue/InternalCeQueue.java | 6 +- .../sonar/ce/queue/InternalCeQueueImpl.java | 10 +- .../cleaning/CeCleaningSchedulerImplTest.java | 78 +++++++++--- .../ce/configuration/CeConfigurationRule.java | 8 +- .../ce/monitoring/CeTasksMBeanImplTest.java | 9 +- .../ce/queue/InternalCeQueueImplTest.java | 111 ++++++++++++++++-- .../main/java/org/sonar/db/ce/CeQueueDao.java | 14 +++ .../java/org/sonar/db/ce/CeQueueMapper.java | 11 ++ .../org/sonar/db/ce/CeQueueMapper.xml | 26 ++++ .../java/org/sonar/db/ce/CeQueueDaoTest.java | 60 ++++++++++ 14 files changed, 333 insertions(+), 52 deletions(-) diff --git a/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java b/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java index dd27ba3f8820..393ec1d6f52e 100644 --- a/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java +++ b/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfiguration.java @@ -27,18 +27,19 @@ public interface CeConfiguration { int getWorkerCount(); /** - * The delay in millisecond before a {@link CeWorker} shall try and find a task + * The delay in millisecond before a {@link org.sonar.ce.taskprocessor.CeWorker} shall try and find a task * to process when it's previous execution had nothing to do. */ long getQueuePollingDelay(); /** - * Delay before running job that cancels worn out tasks for the first time (in minutes). + * Delay before running job that cleans CE tasks for the first time (in minutes). */ - long getCancelWornOutsInitialDelay(); + long getCleanCeTasksInitialDelay(); /** - * Delay between the end of a run and the start of the next one of the job that cancels worn out CE tasks (in minutes). + * Delay between the end of a run and the start of the next one of the job that cleans CE tasks (in minutes). */ - long getCancelWornOutsDelay(); + long getCleanCeTasksDelay(); + } diff --git a/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfigurationImpl.java b/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfigurationImpl.java index 086cd79708c4..77e6f78be49b 100644 --- a/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfigurationImpl.java +++ b/server/sonar-ce-api/src/main/java/org/sonar/ce/configuration/CeConfigurationImpl.java @@ -99,12 +99,13 @@ public long getQueuePollingDelay() { } @Override - public long getCancelWornOutsInitialDelay() { + public long getCleanCeTasksInitialDelay() { return CANCEL_WORN_OUTS_INITIAL_DELAY; } @Override - public long getCancelWornOutsDelay() { + public long getCleanCeTasksDelay() { return CANCEL_WORN_OUTS_DELAY; } + } diff --git a/server/sonar-ce-api/src/test/java/org/sonar/ce/configuration/CeConfigurationImplTest.java b/server/sonar-ce-api/src/test/java/org/sonar/ce/configuration/CeConfigurationImplTest.java index 9d7f3f0b6db7..11a80512919d 100644 --- a/server/sonar-ce-api/src/test/java/org/sonar/ce/configuration/CeConfigurationImplTest.java +++ b/server/sonar-ce-api/src/test/java/org/sonar/ce/configuration/CeConfigurationImplTest.java @@ -111,14 +111,14 @@ private void expectMessageException(int value) { } @Test - public void getCancelWornOutsInitialDelay_returns_1() { - assertThat(new CeConfigurationImpl(settings).getCancelWornOutsInitialDelay()) + public void getCleanCeTasksInitialDelay_returns_1() { + assertThat(new CeConfigurationImpl(settings).getCleanCeTasksInitialDelay()) .isEqualTo(1L); } @Test - public void getCancelWornOutsDelay_returns_10() { - assertThat(new CeConfigurationImpl(settings).getCancelWornOutsDelay()) + public void getCleanCeTasksDelay_returns_10() { + assertThat(new CeConfigurationImpl(settings).getCleanCeTasksDelay()) .isEqualTo(10L); } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java index 82675c693d62..1fdb2c5a236d 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java @@ -21,6 +21,7 @@ import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; +import org.sonar.ce.CeDistributedInformation; import org.sonar.ce.configuration.CeConfiguration; import org.sonar.ce.queue.InternalCeQueue; @@ -32,27 +33,44 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler { private final CeCleaningExecutorService executorService; private final CeConfiguration ceConfiguration; private final InternalCeQueue internalCeQueue; + private final CeDistributedInformation ceDistributedInformation; - public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration, InternalCeQueue internalCeQueue) { + public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration, + InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) { this.executorService = executorService; this.internalCeQueue = internalCeQueue; this.ceConfiguration = ceConfiguration; + this.ceDistributedInformation = ceDistributedInformation; } @Override public void startScheduling() { - executorService.scheduleWithFixedDelay(this::cancelWornOuts, - ceConfiguration.getCancelWornOutsInitialDelay(), - ceConfiguration.getCancelWornOutsDelay(), + executorService.scheduleWithFixedDelay(this::cleanCeQueue, + ceConfiguration.getCleanCeTasksInitialDelay(), + ceConfiguration.getCleanCeTasksDelay(), MINUTES); } + private void cleanCeQueue() { + cancelWornOuts(); + resetTasksWithUnknownWorkerUUIDs(); + } + private void cancelWornOuts() { try { - LOG.info("Deleting any worn out task"); + LOG.debug("Deleting any worn out task"); internalCeQueue.cancelWornOuts(); } catch (Exception e) { LOG.warn("Failed to cancel worn out tasks", e); } } + + private void resetTasksWithUnknownWorkerUUIDs() { + try { + LOG.debug("Resetting state of tasks with unknown worker UUIDs"); + internalCeQueue.resetTasksWithUnknownWorkerUUIDs(ceDistributedInformation.getWorkerUUIDs()); + } catch (Exception e) { + LOG.warn("Failed to reset tasks with unknown worker UUIDs", e); + } + } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java index 42447633920b..9cdc5243de9a 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java @@ -20,10 +20,8 @@ package org.sonar.ce.queue; import java.util.Optional; +import java.util.Set; import javax.annotation.Nullable; -import org.sonar.ce.queue.CeQueue; -import org.sonar.ce.queue.CeTask; -import org.sonar.ce.queue.CeTaskResult; import org.sonar.db.ce.CeActivityDto.Status; /** @@ -70,6 +68,8 @@ public interface InternalCeQueue extends CeQueue { void cancelWornOuts(); + void resetTasksWithUnknownWorkerUUIDs(Set knownWorkerUUIDs); + void pausePeek(); void resumePeek(); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 8e5bc19bf8fb..8a2009dc25bb 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -25,6 +25,7 @@ import java.io.PrintWriter; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.CheckForNull; import javax.annotation.Nullable; @@ -86,7 +87,6 @@ public Optional peek(String workerUuid) { queueStatus.addInProgress(); } return Optional.ofNullable(task); - } } @@ -172,6 +172,14 @@ public void cancelWornOuts() { } } + @Override + public void resetTasksWithUnknownWorkerUUIDs(Set knownWorkerUUIDs) { + try (DbSession dbSession = dbClient.openSession(false)) { + dbClient.ceQueueDao().resetTasksWithUnknownWorkerUUIDs(dbSession, knownWorkerUUIDs); + dbSession.commit(); + } + } + @Override public void pausePeek() { this.peekPaused.set(true); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java index 5449865fc7e3..bf91352b379d 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java @@ -28,11 +28,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.Test; +import org.sonar.ce.CeDistributedInformation; import org.sonar.ce.configuration.CeConfiguration; import org.sonar.ce.queue.InternalCeQueue; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -40,7 +42,7 @@ public class CeCleaningSchedulerImplTest { @Test - public void startScheduling_does_not_fail_if_cancelWornOuts_send_even_an_Exception() { + public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { @Override @@ -49,16 +51,19 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD command.run(); return null; } - }, mockCeConfiguration(1, 10), mockedInternalCeQueue); - doThrow(new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts")).when(mockedInternalCeQueue).cancelWornOuts(); + }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); + Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts"); + doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts(); + doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); underTest.startScheduling(); verify(mockedInternalCeQueue).cancelWornOuts(); + verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); } @Test - public void startScheduling_fails_if_cancelWornOuts_send_even_an_Error() { + public void startScheduling_fails_if_cancelWornOuts_send_an_Error() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { @Override @@ -67,7 +72,7 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD command.run(); return null; } - }, mockCeConfiguration(1, 10), mockedInternalCeQueue); + }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); Error expected = new Error("faking Error thrown by cancelWornOuts"); doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts(); @@ -77,36 +82,74 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD } catch (Error e) { assertThat(e).isSameAs(expected); } + verify(mockedInternalCeQueue).cancelWornOuts(); } @Test - public void startScheduling_calls_cancelWornOuts_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() { + public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); - long initialDelay = 10L; - long delay = 20L; - CeConfiguration mockedCeConfiguration = mockCeConfiguration(initialDelay, delay); + CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + // synchronously execute command + command.run(); + return null; + } + }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); + Error expected = new Error("faking Error thrown by cancelWornOuts"); + doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); + + try { + underTest.startScheduling(); + fail("the error should have been thrown"); + } catch (Error e) { + assertThat(e).isSameAs(expected); + } + verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); + } + + @Test + public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() { + InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); + long wornOutInitialDelay = 10L; + long wornOutDelay = 20L; + long unknownWorkerInitialDelay = 11L; + long unknownWorkerDelay = 21L; + CeConfiguration mockedCeConfiguration = mockCeConfiguration(wornOutInitialDelay, wornOutDelay); CeCleaningAdapter executorService = new CeCleaningAdapter() { @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initDelay, long period, TimeUnit unit) { - assertThat(initDelay).isEqualTo(initialDelay); - assertThat(period).isEqualTo(delay); - assertThat(unit).isEqualTo(TimeUnit.MINUTES); + schedulerCounter++; + switch(schedulerCounter) { + case 1: + assertThat(initDelay).isEqualTo(wornOutInitialDelay); + assertThat(period).isEqualTo(wornOutDelay); + assertThat(unit).isEqualTo(TimeUnit.MINUTES); + break; + case 2: + assertThat(initDelay).isEqualTo(unknownWorkerInitialDelay); + assertThat(period).isEqualTo(unknownWorkerDelay); + assertThat(unit).isEqualTo(TimeUnit.MINUTES); + break; + default: + fail("Unknwon call of scheduleWithFixedDelay"); + } // synchronously execute command command.run(); return null; } }; - CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue); + CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class)); underTest.startScheduling(); - + assertThat(executorService.schedulerCounter).isEqualTo(1); verify(mockedInternalCeQueue).cancelWornOuts(); } - private CeConfiguration mockCeConfiguration(long initialDelay, long delay) { + private CeConfiguration mockCeConfiguration(long cleanCeTasksInitialDelay, long cleanCeTasksDelay) { CeConfiguration mockedCeConfiguration = mock(CeConfiguration.class); - when(mockedCeConfiguration.getCancelWornOutsInitialDelay()).thenReturn(initialDelay); - when(mockedCeConfiguration.getCancelWornOutsDelay()).thenReturn(delay); + when(mockedCeConfiguration.getCleanCeTasksInitialDelay()).thenReturn(cleanCeTasksInitialDelay); + when(mockedCeConfiguration.getCleanCeTasksDelay()).thenReturn(cleanCeTasksDelay); return mockedCeConfiguration; } @@ -115,6 +158,7 @@ private CeConfiguration mockCeConfiguration(long initialDelay, long delay) { * method. */ private static class CeCleaningAdapter implements CeCleaningExecutorService { + protected int schedulerCounter = 0; @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java b/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java index 1c69bb40c25d..61e22fbbdc8d 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java @@ -54,21 +54,21 @@ public void setQueuePollingDelay(int queuePollingDelay) { } @Override - public long getCancelWornOutsInitialDelay() { + public long getCleanCeTasksInitialDelay() { return cancelWornOutsInitialDelay; } - public void setCancelWornOutsInitialDelay(long cancelWornOutsInitialDelay) { + public void setCleanCeTasksInitialDelay(long cancelWornOutsInitialDelay) { checkArgument(cancelWornOutsInitialDelay > 0, "cancel worn-outs polling initial delay must be >= 1"); this.cancelWornOutsInitialDelay = cancelWornOutsInitialDelay; } @Override - public long getCancelWornOutsDelay() { + public long getCleanCeTasksDelay() { return cancelWornOutsDelay; } - public void setCancelWornOutsDelay(long cancelWornOutsDelay) { + public void setCleanCeTasksDelay(long cancelWornOutsDelay) { checkArgument(cancelWornOutsDelay > 0, "cancel worn-outs polling delay must be >= 1"); this.cancelWornOutsDelay = cancelWornOutsDelay; } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java index 1ec6f9321157..0fd5c4de753e 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java @@ -135,14 +135,15 @@ public long getQueuePollingDelay() { } @Override - public long getCancelWornOutsInitialDelay() { - throw new UnsupportedOperationException("getCancelWornOutsInitialDelay is not implemented"); + public long getCleanCeTasksInitialDelay() { + throw new UnsupportedOperationException("getCleanCeTasksInitialDelay is not implemented"); } @Override - public long getCancelWornOutsDelay() { - throw new UnsupportedOperationException("getCancelWornOutsDelay is not implemented"); + public long getCleanCeTasksDelay() { + throw new UnsupportedOperationException("getCleanCeTasksDelay is not implemented"); } + } @CheckForNull diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index 93340a397583..7162d2967a92 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -19,6 +19,7 @@ */ package org.sonar.ce.queue; +import com.google.common.collect.ImmutableSet; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.List; @@ -530,17 +531,113 @@ public void cancelWornOuts_cancels_pending_tasks_with_executionCount_greater_or_ underTest.cancelWornOuts(); - verifyUnmodifiedByCancelWornOuts(u1); - verifyUnmodifiedByCancelWornOuts(u2); + verifyUnmodified(u1); + verifyUnmodified(u2); verifyCanceled(u3); verifyCanceled(u4); - verifyUnmodifiedByCancelWornOuts(u5); - verifyUnmodifiedByCancelWornOuts(u6); - verifyUnmodifiedByCancelWornOuts(u7); - verifyUnmodifiedByCancelWornOuts(u8); + verifyUnmodified(u5); + verifyUnmodified(u6); + verifyUnmodified(u7); + verifyUnmodified(u8); } - private void verifyUnmodifiedByCancelWornOuts(CeQueueDto original) { + @Test + public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() { + CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null); + CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1"); + CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null); + CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2"); + CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null); + CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1"); + CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2"); + CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3"); + + underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3")); + + // Pending tasks must not be modified even if a workerUUID is not present + verifyUnmodified(u1); + verifyUnmodified(u2); + verifyUnmodified(u3); + verifyUnmodified(u4); + + // Unknown worker : null, "worker1" + verifyReset(u5); + verifyReset(u6); + + // Known workers : "worker2", "worker3" + verifyUnmodified(u7); + verifyUnmodified(u8); + } + + @Test + public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() { + CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null); + CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1"); + CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null); + CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2"); + CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null); + CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1"); + CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2"); + CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3"); + + underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of()); + + // Pending tasks must not be modified even if a workerUUID is not present + verifyUnmodified(u1); + verifyUnmodified(u2); + verifyUnmodified(u3); + verifyUnmodified(u4); + + // Unknown worker : null, "worker1" + verifyReset(u5); + verifyReset(u6); + verifyReset(u7); + verifyReset(u8); + } + + @Test + public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() { + CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null); + CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1"); + CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null); + CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2"); + CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null); + CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1"); + CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2"); + CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3"); + + underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001")); + + // Pending tasks must not be modified even if a workerUUID is not present + verifyUnmodified(u1); + verifyUnmodified(u2); + verifyUnmodified(u3); + verifyUnmodified(u4); + + // Unknown worker : null, "worker1" + verifyReset(u5); + verifyReset(u6); + verifyReset(u7); + verifyReset(u8); + } + + private void verifyReset(CeQueueDto original) { + CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get(); + // We do not touch ExecutionCount nor CreatedAt + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); + + // Status must have changed to PENDING and must not be equal to previous status + assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus()); + // UpdatedAt must have been updated + assertThat(dto.getUpdatedAt()).isNotEqualTo(original.getUpdatedAt()); + // StartedAt must be null + assertThat(dto.getStartedAt()).isNull(); + // WorkerUuid must be null + assertThat(dto.getWorkerUuid()).isNull(); + } + + private void verifyUnmodified(CeQueueDto original) { CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get(); assertThat(dto.getStatus()).isEqualTo(original.getStatus()); assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java index c4a747cd0785..54f69953a44e 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; import javax.annotation.Nullable; import org.apache.ibatis.session.RowBounds; import org.sonar.api.utils.System2; @@ -29,6 +31,7 @@ import org.sonar.db.Pagination; import static java.util.Collections.emptyList; +import static org.sonar.db.DatabaseUtils.executeLargeUpdates; import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; import static org.sonar.db.ce.CeQueueDto.Status.PENDING; @@ -84,6 +87,17 @@ public List selectPendingByMinimumExecutionCount(DbSession dbSession return mapper(dbSession).selectPendingByMinimumExecutionCount(minExecutionCount); } + public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set knownWorkerUUIDs) { + if (knownWorkerUUIDs.isEmpty()) { + mapper(dbSession).resetAllInProgressTasks(system2.now()); + } else { + // executeLargeUpdates won't call the SQL command if knownWorkerUUIDs is empty + executeLargeUpdates(knownWorkerUUIDs, + (Consumer>) uuids -> mapper(dbSession).resetTasksWithUnknownWorkerUUIDs(uuids, system2.now()) + ); + } + } + public CeQueueDto insert(DbSession session, CeQueueDto dto) { if (dto.getCreatedAt() == 0L || dto.getUpdatedAt() == 0L) { long now = system2.now(); diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java index e08b952c1b70..42c64f500401 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java @@ -46,6 +46,17 @@ public interface CeQueueMapper { */ List selectPendingByMinimumExecutionCount(@Param("minExecutionCount") int minExecutionCount); + /** + * Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs} + */ + void resetTasksWithUnknownWorkerUUIDs(@Param("knownWorkerUUIDs") List knownWorkerUUIDs, @Param("updatedAt") long updatedAt); + + /** + * Reset all IN_PROGRESS TASKS + */ + void resetAllInProgressTasks(@Param("updatedAt") long updatedAt); + + int countByStatusAndComponentUuid(@Param("status") CeQueueDto.Status status, @Nullable @Param("componentUuid") String componentUuid); void insert(CeQueueDto dto); diff --git a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml index 90f60a858063..d8d38a0cb012 100644 --- a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml +++ b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml @@ -252,4 +252,30 @@ uuid=#{uuid,jdbcType=VARCHAR} + + update ce_queue set + status='PENDING', + worker_uuid=NULL, + started_at=NULL, + updated_at=#{updatedAt,jdbcType=BIGINT} + where + status = 'IN_PROGRESS' + and ( + worker_uuid is NULL + or worker_uuid not in + + #{workerUUID,jdbcType=VARCHAR} + + ) + + + + update ce_queue set + status='PENDING', + worker_uuid=NULL, + started_at=NULL, + updated_at=#{updatedAt,jdbcType=BIGINT} + where + status = 'IN_PROGRESS' + diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java index 15605396f9dd..9da7bd49eec2 100644 --- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -280,6 +281,65 @@ public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_ verifyUnchangedByResetToPendingForWorker(o4); } + + @Test + public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() { + long startedAt = 2_099_888L; + CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt); + CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt); + CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt); + CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt); + CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt); + CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt); + CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt); + CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt); + + underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of()); + + verifyResetByResetTasks(u1); + verifyUnchangedByResetToPendingForWorker(u2); + verifyUnchangedByResetToPendingForWorker(u3); + verifyResetByResetTasks(u4); + verifyResetByResetTasks(o1); + verifyUnchangedByResetToPendingForWorker(o2); + verifyUnchangedByResetToPendingForWorker(o3); + verifyResetByResetTasks(o4); + } + + @Test + public void resetTasksWithUnknownWorkerUUIDs_set_resets_status_of_all_pending_tasks_with_unknown_workers() { + long startedAt = 2_099_888L; + CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_1, startedAt); + CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_1, startedAt); + CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_1, startedAt); + CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_1, startedAt); + CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, 1, WORKER_UUID_2, startedAt); + CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, 1, WORKER_UUID_2, startedAt); + CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, 0, WORKER_UUID_2, startedAt); + CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, 2, WORKER_UUID_2, startedAt); + + underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of(WORKER_UUID_1, "unknown")); + + verifyUnchangedByResetToPendingForWorker(u1); + verifyUnchangedByResetToPendingForWorker(u2); + verifyUnchangedByResetToPendingForWorker(u3); + verifyUnchangedByResetToPendingForWorker(u4); + verifyResetByResetTasks(o1); + verifyUnchangedByResetToPendingForWorker(o2); + verifyUnchangedByResetToPendingForWorker(o3); + verifyResetByResetTasks(o4); + } + + private void verifyResetByResetTasks(CeQueueDto original) { + CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus()); + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getStartedAt()).isNull(); + assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt()); + assertThat(dto.getWorkerUuid()).isNull(); + } + private void verifyResetToPendingForWorker(CeQueueDto original) { CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get(); assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);