Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make machine present in workspace runtime when starting event is sent #8210

Merged
merged 4 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.eclipse.che.api.core.model.workspace.Warning;
import org.eclipse.che.api.core.model.workspace.runtime.Machine;
Expand Down Expand Up @@ -58,11 +57,16 @@
import org.eclipse.che.workspace.infrastructure.docker.server.mapping.ExternalIpURLRewriter;
import org.slf4j.Logger;

/** @author Alexander Garagatyi */
/**
* Represents {@link InternalRuntime} for Docker infrastructure.
*
* @author Alexander Garagatyi
*/
public class DockerInternalRuntime extends InternalRuntime<DockerRuntimeContext> {

private static final Logger LOG = getLogger(DockerInternalRuntime.class);

private final RuntimeMachines runtimeMachines;
private final StartSynchronizer startSynchronizer;
private final Map<String, String> properties;
private final NetworkLifecycle networks;
Expand Down Expand Up @@ -145,7 +149,7 @@ public DockerInternalRuntime(
DockerMachine machine = machineCreator.create(container);
String name = Labels.newDeserializer(container.getLabels()).machineName();

startSynchronizer.addMachine(name, machine);
runtimeMachines.putMachine(name, machine);
stopDetector.startDetection(container.getId(), name, new AbnormalMachineStopHandlerImpl());
streamLogsAsync(name, container.getId());
}
Expand Down Expand Up @@ -173,6 +177,7 @@ private DockerInternalRuntime(
this.probesFactory = probesFactory;
this.properties = new HashMap<>();
this.startSynchronizer = new StartSynchronizer();
this.runtimeMachines = new RuntimeMachines();
this.loggers = loggers;
this.probeScheduler = probeScheduler;
}
Expand All @@ -187,11 +192,17 @@ protected void internalStart(Map<String, String> startOptions) throws Infrastruc
getContext().getEnvironment().getContainers().entrySet()) {
checkInterruption();
String machineName = containerEntry.getKey();
final DockerContainerConfig config = containerEntry.getValue();

runtimeMachines.putMachine(machineName, new DockerMachine.StartingDockerMachine());
sendStartingEvent(machineName);

try {
startMachine(machineName, config);
DockerMachine machine = startMachine(machineName, containerEntry.getValue());
Copy link
Member

@sleshchenko sleshchenko Jan 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put running machine into runtimeMachines here instead of in startMachine method. I think it will make it more clear that starting machine will be updated.

sendRunningEvent(machineName);

bootstrapInstallers(machineName, machine);

checkServers(machineName, machine);
} catch (InfrastructureException e) {
sendFailedEvent(machineName, e.getMessage());
throw e;
Expand Down Expand Up @@ -250,7 +261,7 @@ protected void internalStop(Map<String, String> stopOptions) throws Infrastructu

@Override
public Map<String, ? extends Machine> getInternalMachines() {
return startSynchronizer
return runtimeMachines
.getMachines()
.entrySet()
.stream()
Expand All @@ -265,7 +276,7 @@ public Map<String, String> getProperties() {
/** Checks servers availability on all the machines. */
void checkServers() throws InfrastructureException {
for (Map.Entry<String, ? extends DockerMachine> entry :
startSynchronizer.getMachines().entrySet()) {
runtimeMachines.getMachines().entrySet()) {
String name = entry.getKey();
DockerMachine machine = entry.getValue();
ServersChecker checker =
Expand All @@ -279,10 +290,9 @@ void checkServers() throws InfrastructureException {
}
}

private void startMachine(String name, DockerContainerConfig containerConfig)
private DockerMachine startMachine(String name, DockerContainerConfig containerConfig)
throws InfrastructureException, InterruptedException {
RuntimeIdentity identity = getContext().getIdentity();
InternalMachineConfig machineCfg = getContext().getEnvironment().getMachines().get(name);

DockerMachine machine =
containerStarter.startContainer(
Expand All @@ -292,29 +302,46 @@ private void startMachine(String name, DockerContainerConfig containerConfig)
identity,
new AbnormalMachineStopHandlerImpl());
try {
startSynchronizer.addMachine(name, machine);
runtimeMachines.putMachine(name, machine);

return machine;
} catch (InfrastructureException e) {
// destroy machine only in case its addition fails
// in other cases cleanup of whole runtime will be performed
destroyMachineQuietly(name, machine);
throw e;
}
if (machineCfg != null && !machineCfg.getInstallers().isEmpty()) {
bootstrapperFactory.create(name, identity, machineCfg.getInstallers(), machine).bootstrap();
}
}

private void checkServers(String name, DockerMachine machine)
throws InterruptedException, InfrastructureException {
RuntimeIdentity identity = getContext().getIdentity();

checkInterruption();
// one-time check
ServersChecker readinessChecker =
serverCheckerFactory.create(identity, name, machine.getServers());
readinessChecker.startAsync(new ServerReadinessHandler(name));
readinessChecker.await();

machine.setStatus(MachineStatus.RUNNING);
checkInterruption();
// continuous checking
probeScheduler.schedule(
probesFactory.getProbes(identity.getWorkspaceId(), name, machine.getServers()),
new ServerLivenessHandler());
}

private void bootstrapInstallers(String name, DockerMachine machine)
throws InfrastructureException, InterruptedException {
InternalMachineConfig machineCfg = getContext().getEnvironment().getMachines().get(name);
RuntimeIdentity identity = getContext().getIdentity();

if (machineCfg != null && !machineCfg.getInstallers().isEmpty()) {
checkInterruption();
bootstrapperFactory.create(name, identity, machineCfg.getInstallers(), machine).bootstrap();
}
}

private void checkInterruption() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -339,7 +366,7 @@ public ServerReadinessHandler(String machineName) {

@Override
public void accept(String serverRef) {
DockerMachine machine = startSynchronizer.getMachines().get(machineName);
DockerMachine machine = runtimeMachines.getMachine(machineName);
if (machine == null) {
// Probably machine was removed from the list during server check start due to some reason
return;
Expand All @@ -360,7 +387,7 @@ private class ServerLivenessHandler implements Consumer<ProbeResult> {
@Override
public void accept(ProbeResult probeResult) {
String machineName = probeResult.getMachineName();
DockerMachine machine = startSynchronizer.getMachines().get(machineName);
DockerMachine machine = runtimeMachines.getMachine(machineName);
if (machine == null) {
// Probably machine was removed from the list during server check start due to some reason
return;
Expand All @@ -385,7 +412,7 @@ public void accept(ProbeResult probeResult) {
}

private void destroyRuntime(Map<String, String> stopOptions) throws InfrastructureException {
Map<String, DockerMachine> machines = startSynchronizer.removeMachines();
Map<String, DockerMachine> machines = runtimeMachines.removeMachines();
for (Map.Entry<String, DockerMachine> entry : machines.entrySet()) {
destroyMachineQuietly(entry.getKey(), entry.getValue());
sendStoppedEvent(entry.getKey());
Expand All @@ -401,8 +428,8 @@ private void destroyMachineQuietly(String machineName, DockerMachine machine) {
} catch (InfrastructureException e) {
LOG.error(
format(
"Error occurs on destroying of docker machine '%s' in workspace '%s'. Container '%s'",
machineName, getContext().getIdentity().getWorkspaceId(), machine.getContainer()),
"Error occurs on destroying of docker machine '%s' in workspace '%s'. Error: %s",
machineName, getContext().getIdentity().getWorkspaceId(), e.getMessage()),
e);
}
}
Expand Down Expand Up @@ -468,155 +495,4 @@ private void sendServerStatusEvent(String machineName, String serverName, Server
.withStatus(server.getStatus())
.withServerUrl(server.getUrl()));
}

/**
* Controls the runtime start flow and helps to cancel it.
*
* <p>The runtime start with cancellation using the Start Synchronizer might look like:
*
* <pre>
* ...
* public void startRuntime() {
* startSynchronizer.setStartThread();
* try {
* // .....
* startSynchronizer.complete();
* } catch (Exception ex) {
* startSynchronizer.completeExceptionally(ex);
* throw ex;
* }
* }
* ...
* </pre>
*
* <p>At the same time stopping might look like:
*
* <pre>
* ...
* public void stopRuntime() {
* if (startSynchronizer.interrupt()) {
* try {
* startSynchronizer.await();
* } catch (RuntimeStartInterruptedException ex) {
* // normal stop
* } catch (InterruptedException ex) {
* Thread.currentThread().interrupt();
* ...
* }
* }
* }
* ...
* </pre>
*/
static class StartSynchronizer {

private Exception exception;
private Thread startThread;
private Map<String, DockerMachine> machines;
private CountDownLatch completionLatch;

public StartSynchronizer() {
this.machines = new HashMap<>();
this.completionLatch = new CountDownLatch(1);
}

public synchronized Map<String, ? extends DockerMachine> getMachines() {
return machines != null ? machines : emptyMap();
}

public synchronized void addMachine(String name, DockerMachine machine)
throws InternalInfrastructureException {
if (machines != null) {
machines.put(name, machine);
} else {
throw new InternalInfrastructureException("Start of runtime is canceled.");
}
}

public synchronized Map<String, DockerMachine> removeMachines() throws InfrastructureException {
if (machines != null) {
Map<String, DockerMachine> machines = this.machines;
// unset to identify error if method called second time
this.machines = null;
return machines;
}
throw new InfrastructureException("Runtime doesn't have machines to remove");
}

/**
* Sets {@link Thread#currentThread()} as a {@link #startThread}.
*
* @throws InternalInfrastructureException when {@link #startThread} already setted.
*/
public synchronized void setStartThread() throws InternalInfrastructureException {
if (startThread != null) {
throw new InternalInfrastructureException(
"Docker infrastructure context of workspace already started");
}
startThread = Thread.currentThread();
}

/**
* Releases waiting task and reset the starting thread.
*
* @throws InterruptedException when execution thread was interrupted just before this method
* call
*/
public synchronized void complete() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
startThread = null;
completionLatch.countDown();
}

/**
* Releases waiting task, reset the starting thread and sets an exception if it is not null.
*
* @param ex completion exception might be null
*/
public synchronized void completeExceptionally(Exception ex) {
exception = ex;
startThread = null;
completionLatch.countDown();
}

/**
* Interrupts the {@link #startThread} if its value different to null.
*
* @return true if {@link #startThread} interruption flag setted, otherwise false will be
* returned
*/
public synchronized boolean interrupt() {
if (startThread != null) {
startThread.interrupt();
return true;
}
return false;
}

/**
* Waits until {@link #complete} is called and rethrow the {@link #exception} if it present.
* This call is blocking and it should be used with {@link #interrupt} method.
*
* @throws InterruptedException when this thread is interrupted while waiting for {@link
* #complete}
* @throws RuntimeStartInterruptedException when {@link #startThread} successfully interrupted
* @throws InfrastructureException when any error occurs while waiting for {@link #complete}
*/
public void await() throws InterruptedException, InfrastructureException {
completionLatch.await();
synchronized (this) {
if (exception != null) {
try {
throw exception;
} catch (InfrastructureException rethrow) {
throw rethrow;
} catch (Exception ex) {
throw new InternalInfrastructureException(ex);
}
}
}
}
}
}
Loading