Skip to content

Commit

Permalink
SONAR-8986 add purge of tasks of non existing workers to ce clean job
Browse files Browse the repository at this point in the history
  • Loading branch information
ehartmann committed Apr 27, 2017
1 parent e4d3426 commit a5fb1f9
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 52 deletions.
Expand Up @@ -27,18 +27,19 @@ public interface CeConfiguration {
int getWorkerCount();

/**
* The delay in millisecond before a {@link CeWorker} shall try and find a task
* The delay in millisecond before a {@link org.sonar.ce.taskprocessor.CeWorker} shall try and find a task
* to process when it's previous execution had nothing to do.
*/
long getQueuePollingDelay();

/**
* Delay before running job that cancels worn out tasks for the first time (in minutes).
* Delay before running job that cleans CE tasks for the first time (in minutes).
*/
long getCancelWornOutsInitialDelay();
long getCleanCeTasksInitialDelay();

/**
* Delay between the end of a run and the start of the next one of the job that cancels worn out CE tasks (in minutes).
* Delay between the end of a run and the start of the next one of the job that cleans CE tasks (in minutes).
*/
long getCancelWornOutsDelay();
long getCleanCeTasksDelay();

}
Expand Up @@ -99,12 +99,13 @@ public long getQueuePollingDelay() {
}

@Override
public long getCancelWornOutsInitialDelay() {
public long getCleanCeTasksInitialDelay() {
return CANCEL_WORN_OUTS_INITIAL_DELAY;
}

@Override
public long getCancelWornOutsDelay() {
public long getCleanCeTasksDelay() {
return CANCEL_WORN_OUTS_DELAY;
}

}
Expand Up @@ -111,14 +111,14 @@ private void expectMessageException(int value) {
}

@Test
public void getCancelWornOutsInitialDelay_returns_1() {
assertThat(new CeConfigurationImpl(settings).getCancelWornOutsInitialDelay())
public void getCleanCeTasksInitialDelay_returns_1() {
assertThat(new CeConfigurationImpl(settings).getCleanCeTasksInitialDelay())
.isEqualTo(1L);
}

@Test
public void getCancelWornOutsDelay_returns_10() {
assertThat(new CeConfigurationImpl(settings).getCancelWornOutsDelay())
public void getCleanCeTasksDelay_returns_10() {
assertThat(new CeConfigurationImpl(settings).getCleanCeTasksDelay())
.isEqualTo(10L);
}
}
Expand Up @@ -21,6 +21,7 @@

import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.CeDistributedInformation;
import org.sonar.ce.configuration.CeConfiguration;
import org.sonar.ce.queue.InternalCeQueue;

Expand All @@ -32,27 +33,44 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
private final CeCleaningExecutorService executorService;
private final CeConfiguration ceConfiguration;
private final InternalCeQueue internalCeQueue;
private final CeDistributedInformation ceDistributedInformation;

public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration, InternalCeQueue internalCeQueue) {
public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration,
InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) {
this.executorService = executorService;
this.internalCeQueue = internalCeQueue;
this.ceConfiguration = ceConfiguration;
this.ceDistributedInformation = ceDistributedInformation;
}

@Override
public void startScheduling() {
executorService.scheduleWithFixedDelay(this::cancelWornOuts,
ceConfiguration.getCancelWornOutsInitialDelay(),
ceConfiguration.getCancelWornOutsDelay(),
executorService.scheduleWithFixedDelay(this::cleanCeQueue,
ceConfiguration.getCleanCeTasksInitialDelay(),
ceConfiguration.getCleanCeTasksDelay(),
MINUTES);
}

private void cleanCeQueue() {
cancelWornOuts();
resetTasksWithUnknownWorkerUUIDs();
}

private void cancelWornOuts() {
try {
LOG.info("Deleting any worn out task");
LOG.debug("Deleting any worn out task");
internalCeQueue.cancelWornOuts();
} catch (Exception e) {
LOG.warn("Failed to cancel worn out tasks", e);
}
}

private void resetTasksWithUnknownWorkerUUIDs() {
try {
LOG.debug("Resetting state of tasks with unknown worker UUIDs");
internalCeQueue.resetTasksWithUnknownWorkerUUIDs(ceDistributedInformation.getWorkerUUIDs());
} catch (Exception e) {
LOG.warn("Failed to reset tasks with unknown worker UUIDs", e);
}
}
}
Expand Up @@ -20,10 +20,8 @@
package org.sonar.ce.queue;

import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.sonar.ce.queue.CeQueue;
import org.sonar.ce.queue.CeTask;
import org.sonar.ce.queue.CeTaskResult;
import org.sonar.db.ce.CeActivityDto.Status;

/**
Expand Down Expand Up @@ -70,6 +68,8 @@ public interface InternalCeQueue extends CeQueue {

void cancelWornOuts();

void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs);

void pausePeek();

void resumePeek();
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,7 +87,6 @@ public Optional<CeTask> peek(String workerUuid) {
queueStatus.addInProgress();
}
return Optional.ofNullable(task);

}
}

Expand Down Expand Up @@ -172,6 +172,14 @@ public void cancelWornOuts() {
}
}

@Override
public void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs) {
try (DbSession dbSession = dbClient.openSession(false)) {
dbClient.ceQueueDao().resetTasksWithUnknownWorkerUUIDs(dbSession, knownWorkerUUIDs);
dbSession.commit();
}
}

@Override
public void pausePeek() {
this.peekPaused.set(true);
Expand Down
Expand Up @@ -28,19 +28,21 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.sonar.ce.CeDistributedInformation;
import org.sonar.ce.configuration.CeConfiguration;
import org.sonar.ce.queue.InternalCeQueue;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class CeCleaningSchedulerImplTest {
@Test
public void startScheduling_does_not_fail_if_cancelWornOuts_send_even_an_Exception() {
public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
@Override
Expand All @@ -49,16 +51,19 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
command.run();
return null;
}
}, mockCeConfiguration(1, 10), mockedInternalCeQueue);
doThrow(new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts")).when(mockedInternalCeQueue).cancelWornOuts();
}, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts");
doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts();
doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());

underTest.startScheduling();

verify(mockedInternalCeQueue).cancelWornOuts();
verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
}

@Test
public void startScheduling_fails_if_cancelWornOuts_send_even_an_Error() {
public void startScheduling_fails_if_cancelWornOuts_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
@Override
Expand All @@ -67,7 +72,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
command.run();
return null;
}
}, mockCeConfiguration(1, 10), mockedInternalCeQueue);
}, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
Error expected = new Error("faking Error thrown by cancelWornOuts");
doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts();

Expand All @@ -77,36 +82,74 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
} catch (Error e) {
assertThat(e).isSameAs(expected);
}
verify(mockedInternalCeQueue).cancelWornOuts();
}

@Test
public void startScheduling_calls_cancelWornOuts_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
long initialDelay = 10L;
long delay = 20L;
CeConfiguration mockedCeConfiguration = mockCeConfiguration(initialDelay, delay);
CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
// synchronously execute command
command.run();
return null;
}
}, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
Error expected = new Error("faking Error thrown by cancelWornOuts");
doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());

try {
underTest.startScheduling();
fail("the error should have been thrown");
} catch (Error e) {
assertThat(e).isSameAs(expected);
}
verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
}

@Test
public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
long wornOutInitialDelay = 10L;
long wornOutDelay = 20L;
long unknownWorkerInitialDelay = 11L;
long unknownWorkerDelay = 21L;
CeConfiguration mockedCeConfiguration = mockCeConfiguration(wornOutInitialDelay, wornOutDelay);
CeCleaningAdapter executorService = new CeCleaningAdapter() {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initDelay, long period, TimeUnit unit) {
assertThat(initDelay).isEqualTo(initialDelay);
assertThat(period).isEqualTo(delay);
assertThat(unit).isEqualTo(TimeUnit.MINUTES);
schedulerCounter++;
switch(schedulerCounter) {
case 1:
assertThat(initDelay).isEqualTo(wornOutInitialDelay);
assertThat(period).isEqualTo(wornOutDelay);
assertThat(unit).isEqualTo(TimeUnit.MINUTES);
break;
case 2:
assertThat(initDelay).isEqualTo(unknownWorkerInitialDelay);
assertThat(period).isEqualTo(unknownWorkerDelay);
assertThat(unit).isEqualTo(TimeUnit.MINUTES);
break;
default:
fail("Unknwon call of scheduleWithFixedDelay");
}
// synchronously execute command
command.run();
return null;
}
};
CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue);
CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class));

underTest.startScheduling();

assertThat(executorService.schedulerCounter).isEqualTo(1);
verify(mockedInternalCeQueue).cancelWornOuts();
}

private CeConfiguration mockCeConfiguration(long initialDelay, long delay) {
private CeConfiguration mockCeConfiguration(long cleanCeTasksInitialDelay, long cleanCeTasksDelay) {
CeConfiguration mockedCeConfiguration = mock(CeConfiguration.class);
when(mockedCeConfiguration.getCancelWornOutsInitialDelay()).thenReturn(initialDelay);
when(mockedCeConfiguration.getCancelWornOutsDelay()).thenReturn(delay);
when(mockedCeConfiguration.getCleanCeTasksInitialDelay()).thenReturn(cleanCeTasksInitialDelay);
when(mockedCeConfiguration.getCleanCeTasksDelay()).thenReturn(cleanCeTasksDelay);
return mockedCeConfiguration;
}

Expand All @@ -115,6 +158,7 @@ private CeConfiguration mockCeConfiguration(long initialDelay, long delay) {
* method.
*/
private static class CeCleaningAdapter implements CeCleaningExecutorService {
protected int schedulerCounter = 0;

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
Expand Down
Expand Up @@ -54,21 +54,21 @@ public void setQueuePollingDelay(int queuePollingDelay) {
}

@Override
public long getCancelWornOutsInitialDelay() {
public long getCleanCeTasksInitialDelay() {
return cancelWornOutsInitialDelay;
}

public void setCancelWornOutsInitialDelay(long cancelWornOutsInitialDelay) {
public void setCleanCeTasksInitialDelay(long cancelWornOutsInitialDelay) {
checkArgument(cancelWornOutsInitialDelay > 0, "cancel worn-outs polling initial delay must be >= 1");
this.cancelWornOutsInitialDelay = cancelWornOutsInitialDelay;
}

@Override
public long getCancelWornOutsDelay() {
public long getCleanCeTasksDelay() {
return cancelWornOutsDelay;
}

public void setCancelWornOutsDelay(long cancelWornOutsDelay) {
public void setCleanCeTasksDelay(long cancelWornOutsDelay) {
checkArgument(cancelWornOutsDelay > 0, "cancel worn-outs polling delay must be >= 1");
this.cancelWornOutsDelay = cancelWornOutsDelay;
}
Expand Down
Expand Up @@ -135,14 +135,15 @@ public long getQueuePollingDelay() {
}

@Override
public long getCancelWornOutsInitialDelay() {
throw new UnsupportedOperationException("getCancelWornOutsInitialDelay is not implemented");
public long getCleanCeTasksInitialDelay() {
throw new UnsupportedOperationException("getCleanCeTasksInitialDelay is not implemented");
}

@Override
public long getCancelWornOutsDelay() {
throw new UnsupportedOperationException("getCancelWornOutsDelay is not implemented");
public long getCleanCeTasksDelay() {
throw new UnsupportedOperationException("getCleanCeTasksDelay is not implemented");
}

}

@CheckForNull
Expand Down

0 comments on commit a5fb1f9

Please sign in to comment.