Skip to content

Commit

Permalink
Make machine present in workspace runtime when starting event is sent (
Browse files Browse the repository at this point in the history
…#8210)

CHE-7900, CHE-8056
Also set machine RUNNING status before bootstrapping installers and
checking servers statuses.
Signed-off-by: Oleksandr Garagatyi <ogaragat@redhat.com>
  • Loading branch information
Oleksandr Garagatyi committed Jan 11, 2018
1 parent 7c15c95 commit 4f8f5e9
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.eclipse.che.api.core.model.workspace.Warning;
import org.eclipse.che.api.core.model.workspace.runtime.Machine;
Expand Down Expand Up @@ -58,11 +57,16 @@
import org.eclipse.che.workspace.infrastructure.docker.server.mapping.ExternalIpURLRewriter;
import org.slf4j.Logger;

/** @author Alexander Garagatyi */
/**
* Represents {@link InternalRuntime} for Docker infrastructure.
*
* @author Alexander Garagatyi
*/
public class DockerInternalRuntime extends InternalRuntime<DockerRuntimeContext> {

private static final Logger LOG = getLogger(DockerInternalRuntime.class);

private final RuntimeMachines runtimeMachines;
private final StartSynchronizer startSynchronizer;
private final Map<String, String> properties;
private final NetworkLifecycle networks;
Expand Down Expand Up @@ -145,7 +149,7 @@ public DockerInternalRuntime(
DockerMachine machine = machineCreator.create(container);
String name = Labels.newDeserializer(container.getLabels()).machineName();

startSynchronizer.addMachine(name, machine);
runtimeMachines.putMachine(name, machine);
stopDetector.startDetection(container.getId(), name, new AbnormalMachineStopHandlerImpl());
streamLogsAsync(name, container.getId());
}
Expand Down Expand Up @@ -173,6 +177,7 @@ private DockerInternalRuntime(
this.probesFactory = probesFactory;
this.properties = new HashMap<>();
this.startSynchronizer = new StartSynchronizer();
this.runtimeMachines = new RuntimeMachines();
this.loggers = loggers;
this.probeScheduler = probeScheduler;
}
Expand All @@ -187,11 +192,17 @@ protected void internalStart(Map<String, String> startOptions) throws Infrastruc
getContext().getEnvironment().getContainers().entrySet()) {
checkInterruption();
String machineName = containerEntry.getKey();
final DockerContainerConfig config = containerEntry.getValue();

runtimeMachines.putMachine(machineName, new DockerMachine.StartingDockerMachine());
sendStartingEvent(machineName);

try {
startMachine(machineName, config);
DockerMachine machine = startMachine(machineName, containerEntry.getValue());
sendRunningEvent(machineName);

bootstrapInstallers(machineName, machine);

checkServers(machineName, machine);
} catch (InfrastructureException e) {
sendFailedEvent(machineName, e.getMessage());
throw e;
Expand Down Expand Up @@ -250,7 +261,7 @@ protected void internalStop(Map<String, String> stopOptions) throws Infrastructu

@Override
public Map<String, ? extends Machine> getInternalMachines() {
return startSynchronizer
return runtimeMachines
.getMachines()
.entrySet()
.stream()
Expand All @@ -265,7 +276,7 @@ public Map<String, String> getProperties() {
/** Checks servers availability on all the machines. */
void checkServers() throws InfrastructureException {
for (Map.Entry<String, ? extends DockerMachine> entry :
startSynchronizer.getMachines().entrySet()) {
runtimeMachines.getMachines().entrySet()) {
String name = entry.getKey();
DockerMachine machine = entry.getValue();
ServersChecker checker =
Expand All @@ -279,10 +290,9 @@ void checkServers() throws InfrastructureException {
}
}

private void startMachine(String name, DockerContainerConfig containerConfig)
private DockerMachine startMachine(String name, DockerContainerConfig containerConfig)
throws InfrastructureException, InterruptedException {
RuntimeIdentity identity = getContext().getIdentity();
InternalMachineConfig machineCfg = getContext().getEnvironment().getMachines().get(name);

DockerMachine machine =
containerStarter.startContainer(
Expand All @@ -292,29 +302,46 @@ private void startMachine(String name, DockerContainerConfig containerConfig)
identity,
new AbnormalMachineStopHandlerImpl());
try {
startSynchronizer.addMachine(name, machine);
runtimeMachines.putMachine(name, machine);

return machine;
} catch (InfrastructureException e) {
// destroy machine only in case its addition fails
// in other cases cleanup of whole runtime will be performed
destroyMachineQuietly(name, machine);
throw e;
}
if (machineCfg != null && !machineCfg.getInstallers().isEmpty()) {
bootstrapperFactory.create(name, identity, machineCfg.getInstallers(), machine).bootstrap();
}
}

private void checkServers(String name, DockerMachine machine)
throws InterruptedException, InfrastructureException {
RuntimeIdentity identity = getContext().getIdentity();

checkInterruption();
// one-time check
ServersChecker readinessChecker =
serverCheckerFactory.create(identity, name, machine.getServers());
readinessChecker.startAsync(new ServerReadinessHandler(name));
readinessChecker.await();

machine.setStatus(MachineStatus.RUNNING);
checkInterruption();
// continuous checking
probeScheduler.schedule(
probesFactory.getProbes(identity.getWorkspaceId(), name, machine.getServers()),
new ServerLivenessHandler());
}

private void bootstrapInstallers(String name, DockerMachine machine)
throws InfrastructureException, InterruptedException {
InternalMachineConfig machineCfg = getContext().getEnvironment().getMachines().get(name);
RuntimeIdentity identity = getContext().getIdentity();

if (machineCfg != null && !machineCfg.getInstallers().isEmpty()) {
checkInterruption();
bootstrapperFactory.create(name, identity, machineCfg.getInstallers(), machine).bootstrap();
}
}

private void checkInterruption() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -339,7 +366,7 @@ public ServerReadinessHandler(String machineName) {

@Override
public void accept(String serverRef) {
DockerMachine machine = startSynchronizer.getMachines().get(machineName);
DockerMachine machine = runtimeMachines.getMachine(machineName);
if (machine == null) {
// Probably machine was removed from the list during server check start due to some reason
return;
Expand All @@ -360,7 +387,7 @@ private class ServerLivenessHandler implements Consumer<ProbeResult> {
@Override
public void accept(ProbeResult probeResult) {
String machineName = probeResult.getMachineName();
DockerMachine machine = startSynchronizer.getMachines().get(machineName);
DockerMachine machine = runtimeMachines.getMachine(machineName);
if (machine == null) {
// Probably machine was removed from the list during server check start due to some reason
return;
Expand All @@ -385,7 +412,7 @@ public void accept(ProbeResult probeResult) {
}

private void destroyRuntime(Map<String, String> stopOptions) throws InfrastructureException {
Map<String, DockerMachine> machines = startSynchronizer.removeMachines();
Map<String, DockerMachine> machines = runtimeMachines.removeMachines();
for (Map.Entry<String, DockerMachine> entry : machines.entrySet()) {
destroyMachineQuietly(entry.getKey(), entry.getValue());
sendStoppedEvent(entry.getKey());
Expand All @@ -401,8 +428,8 @@ private void destroyMachineQuietly(String machineName, DockerMachine machine) {
} catch (InfrastructureException e) {
LOG.error(
format(
"Error occurs on destroying of docker machine '%s' in workspace '%s'. Container '%s'",
machineName, getContext().getIdentity().getWorkspaceId(), machine.getContainer()),
"Error occurs on destroying of docker machine '%s' in workspace '%s'. Error: %s",
machineName, getContext().getIdentity().getWorkspaceId(), e.getMessage()),
e);
}
}
Expand Down Expand Up @@ -468,155 +495,4 @@ private void sendServerStatusEvent(String machineName, String serverName, Server
.withStatus(server.getStatus())
.withServerUrl(server.getUrl()));
}

/**
* Controls the runtime start flow and helps to cancel it.
*
* <p>The runtime start with cancellation using the Start Synchronizer might look like:
*
* <pre>
* ...
* public void startRuntime() {
* startSynchronizer.setStartThread();
* try {
* // .....
* startSynchronizer.complete();
* } catch (Exception ex) {
* startSynchronizer.completeExceptionally(ex);
* throw ex;
* }
* }
* ...
* </pre>
*
* <p>At the same time stopping might look like:
*
* <pre>
* ...
* public void stopRuntime() {
* if (startSynchronizer.interrupt()) {
* try {
* startSynchronizer.await();
* } catch (RuntimeStartInterruptedException ex) {
* // normal stop
* } catch (InterruptedException ex) {
* Thread.currentThread().interrupt();
* ...
* }
* }
* }
* ...
* </pre>
*/
static class StartSynchronizer {

private Exception exception;
private Thread startThread;
private Map<String, DockerMachine> machines;
private CountDownLatch completionLatch;

public StartSynchronizer() {
this.machines = new HashMap<>();
this.completionLatch = new CountDownLatch(1);
}

public synchronized Map<String, ? extends DockerMachine> getMachines() {
return machines != null ? machines : emptyMap();
}

public synchronized void addMachine(String name, DockerMachine machine)
throws InternalInfrastructureException {
if (machines != null) {
machines.put(name, machine);
} else {
throw new InternalInfrastructureException("Start of runtime is canceled.");
}
}

public synchronized Map<String, DockerMachine> removeMachines() throws InfrastructureException {
if (machines != null) {
Map<String, DockerMachine> machines = this.machines;
// unset to identify error if method called second time
this.machines = null;
return machines;
}
throw new InfrastructureException("Runtime doesn't have machines to remove");
}

/**
* Sets {@link Thread#currentThread()} as a {@link #startThread}.
*
* @throws InternalInfrastructureException when {@link #startThread} already setted.
*/
public synchronized void setStartThread() throws InternalInfrastructureException {
if (startThread != null) {
throw new InternalInfrastructureException(
"Docker infrastructure context of workspace already started");
}
startThread = Thread.currentThread();
}

/**
* Releases waiting task and reset the starting thread.
*
* @throws InterruptedException when execution thread was interrupted just before this method
* call
*/
public synchronized void complete() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
startThread = null;
completionLatch.countDown();
}

/**
* Releases waiting task, reset the starting thread and sets an exception if it is not null.
*
* @param ex completion exception might be null
*/
public synchronized void completeExceptionally(Exception ex) {
exception = ex;
startThread = null;
completionLatch.countDown();
}

/**
* Interrupts the {@link #startThread} if its value different to null.
*
* @return true if {@link #startThread} interruption flag setted, otherwise false will be
* returned
*/
public synchronized boolean interrupt() {
if (startThread != null) {
startThread.interrupt();
return true;
}
return false;
}

/**
* Waits until {@link #complete} is called and rethrow the {@link #exception} if it present.
* This call is blocking and it should be used with {@link #interrupt} method.
*
* @throws InterruptedException when this thread is interrupted while waiting for {@link
* #complete}
* @throws RuntimeStartInterruptedException when {@link #startThread} successfully interrupted
* @throws InfrastructureException when any error occurs while waiting for {@link #complete}
*/
public void await() throws InterruptedException, InfrastructureException {
completionLatch.await();
synchronized (this) {
if (exception != null) {
try {
throw exception;
} catch (InfrastructureException rethrow) {
throw rethrow;
} catch (Exception ex) {
throw new InternalInfrastructureException(ex);
}
}
}
}
}
}
Loading

0 comments on commit 4f8f5e9

Please sign in to comment.