Skip to content

Commit

Permalink
TMP Rework ServiceTermination strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
sleshchenko committed Mar 30, 2018
1 parent dffc018 commit aca3dc1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Singleton;
Expand Down Expand Up @@ -554,6 +555,13 @@ public Set<String> getRuntimesIds() {
return ImmutableSet.copyOf(runtimes.keySet());
}

public Set<String> getInProgress() {
return statuses.entrySet().stream()
.filter(e -> e.getValue() == STARTING || e.getValue() == STOPPING)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

/**
* Returns true if there is at least one workspace running(it's status is different from {@link
* WorkspaceStatus#STOPPED}), otherwise returns false.
Expand All @@ -562,6 +570,10 @@ public boolean isAnyRunning() {
return !runtimes.isEmpty();
}

public boolean isAnyInProgress() {
return statuses.containsValue(STARTING) || statuses.containsValue(STOPPING);
}

/**
* Returns an optional wrapping the runtime context of the workspace with the given identifier, an
* empty optional is returned in case the workspace doesn't have the runtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,19 @@
import static org.eclipse.che.api.system.server.DtoConverter.asDto;

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.eclipse.che.api.core.ConflictException;
import org.eclipse.che.api.core.NotFoundException;
import org.eclipse.che.api.core.ServerException;
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.core.notification.EventSubscriber;
import org.eclipse.che.api.system.server.ServiceTermination;
import org.eclipse.che.api.system.shared.event.service.SystemServiceItemStoppedEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceStoppedEvent;
import org.eclipse.che.api.workspace.shared.dto.event.WorkspaceStatusEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Terminates workspace service.
Expand All @@ -40,23 +34,18 @@
*/
public class WorkspaceServiceTermination implements ServiceTermination {

private static final Logger LOG = LoggerFactory.getLogger(WorkspaceServiceTermination.class);

/** Delay in MS between runtimes stopped checks. The value is experimental. */
private static final long DEFAULT_PULL_RUNTIMES_PERIOD_MS = TimeUnit.SECONDS.toMillis(1);

private final WorkspaceManager manager;
private final WorkspaceSharedPool sharedPool;
private final WorkspaceRuntimes runtimes;
private final EventService eventService;

@Inject
public WorkspaceServiceTermination(
WorkspaceManager manager,
WorkspaceSharedPool sharedPool,
WorkspaceRuntimes runtimes,
EventService eventService) {
this.manager = manager;
this.sharedPool = sharedPool;
this.runtimes = runtimes;
this.eventService = eventService;
Expand All @@ -74,46 +63,27 @@ public void terminate() throws InterruptedException {
WorkspaceStoppedEventsPropagator propagator = new WorkspaceStoppedEventsPropagator();
eventService.subscribe(propagator);
try {
stopRunningAndStartingWorkspacesAsync();
waitAllWorkspacesStopped();
waitAllWorkspacesRunningOrStopped();
sharedPool.shutdown();
} finally {
eventService.unsubscribe(propagator);
}
}

private void stopRunningAndStartingWorkspacesAsync() {
for (String workspaceId : runtimes.getRuntimesIds()) {
WorkspaceStatus status = runtimes.getStatus(workspaceId);
if (status == WorkspaceStatus.RUNNING || status == WorkspaceStatus.STARTING) {
try {
manager.stopWorkspace(workspaceId, Collections.emptyMap());
} catch (ServerException | ConflictException | NotFoundException x) {
if (runtimes.hasRuntime(workspaceId)) {
LOG.error(
"Couldn't get the workspace '{}' while it's running, the occurred error: '{}'",
workspaceId,
x.getMessage());
}
}
}
}
}

/** Propagates workspace stopped events as {@link SystemServiceStoppedEvent} events. */
private class WorkspaceStoppedEventsPropagator implements EventSubscriber<WorkspaceStatusEvent> {

private final int totalRunning;
private final AtomicInteger currentlyStopped;

private WorkspaceStoppedEventsPropagator() {
this.totalRunning = runtimes.getRuntimesIds().size();
this.totalRunning = runtimes.getInProgress().size();
this.currentlyStopped = new AtomicInteger(0);
}

@Override
public void onEvent(WorkspaceStatusEvent event) {
if (event.getStatus() == WorkspaceStatus.STOPPED) {
if (event.getStatus() == WorkspaceStatus.STOPPED || event.getStatus() == WorkspaceStatus.RUNNING) {
eventService.publish(
asDto(
new SystemServiceItemStoppedEvent(
Expand All @@ -125,14 +95,14 @@ public void onEvent(WorkspaceStatusEvent event) {
}
}

private void waitAllWorkspacesStopped() throws InterruptedException {
private void waitAllWorkspacesRunningOrStopped() throws InterruptedException {
Timer timer = new Timer("RuntimesStoppedTracker", false);
CountDownLatch latch = new CountDownLatch(1);
timer.schedule(
new TimerTask() {
@Override
public void run() {
if (!runtimes.isAnyRunning()) {
if (!runtimes.isAnyInProgress()) {
timer.cancel();
latch.countDown();
}
Expand Down

0 comments on commit aca3dc1

Please sign in to comment.