Skip to content

Commit

Permalink
Prepare for 6.6.2 bug-fix release (#10053)
Browse files Browse the repository at this point in the history
* Watch connection manager never closed when trying to delete a non-existing POD (#9932)

Fix the root cause of a recurring 1006 web-socket error.

The fixed bug is described / discussed in the following issue: redhat-developer/rh-che#672
Signed-off-by: David Festal <dfestal@redhat.com>

* CHE-5918 Add an ability to interrupt Kubernetes/OpenShift runtime start

Signed-off-by: Sergii Leshchenko <sleshche@redhat.com>

* CHE-5918 Add checking of start interruption by KubernetesBootstrapper

It is needed to avoid '403 Pod doesn't exists' errors.
It happens when start is interrupted when any of machines is on bootstrapping phase.
As result connection leak happens TODO Create an issue for fabric8-client

* Improve ExecWatchdog to rethrow exception when it occurred while establishing a WebSocket connection

* CHE-5918 Fix K8s/OS runtime start failing when unrecoverable event
occurs
  • Loading branch information
davidfestal authored and Roman Iuvshyn committed Jun 14, 2018
1 parent 2f2c076 commit 6c21574
Show file tree
Hide file tree
Showing 22 changed files with 1,156 additions and 80 deletions.
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.che.api.workspace.server.WorkspaceLockService;
import org.eclipse.che.api.workspace.server.WorkspaceStatusCache;
import org.eclipse.che.multiuser.api.distributed.JGroupsServiceTermination;
import org.eclipse.che.multiuser.api.distributed.WorkspaceStopPropagator;
import org.eclipse.che.multiuser.api.distributed.subscription.DistributedRemoteSubscriptionStorage;
import org.eclipse.persistence.config.CacheCoordinationProtocol;
import org.eclipse.persistence.config.PersistenceUnitProperties;
Expand Down Expand Up @@ -53,5 +54,7 @@ protected void configure() {
Multibinder.newSetBinder(binder(), ServiceTermination.class)
.addBinding()
.to(JGroupsServiceTermination.class);

bind(WorkspaceStopPropagator.class).asEagerSingleton();
}
}
Expand Up @@ -70,6 +70,7 @@ protected void configure() {

install(new FactoryModuleBuilder().build(KubernetesRuntimeFactory.class));
install(new FactoryModuleBuilder().build(KubernetesBootstrapperFactory.class));
install(new FactoryModuleBuilder().build(StartSynchronizerFactory.class));
bind(WorkspacePVCCleaner.class).asEagerSingleton();
bind(RemoveNamespaceOnWorkspaceRemove.class).asEagerSingleton();

Expand Down
Expand Up @@ -22,12 +22,10 @@
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.extensions.Ingress;
import io.fabric8.kubernetes.client.Watcher.Action;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -59,6 +57,7 @@
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalRuntime;
import org.eclipse.che.api.workspace.server.spi.RuntimeStartInterruptedException;
import org.eclipse.che.api.workspace.server.spi.StateException;
import org.eclipse.che.api.workspace.server.spi.environment.InternalMachineConfig;
import org.eclipse.che.commons.env.EnvironmentContext;
Expand All @@ -74,7 +73,6 @@
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodActionHandler;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.pvc.WorkspaceVolumesStrategy;
import org.eclipse.che.workspace.infrastructure.kubernetes.server.KubernetesServerResolver;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.ContainerEvents;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.KubernetesSharedPool;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;
import org.slf4j.Logger;
Expand Down Expand Up @@ -106,6 +104,7 @@ public class KubernetesInternalRuntime<
private final Executor executor;
private final KubernetesRuntimeStateCache runtimeStates;
private final KubernetesMachineCache machines;
private final StartSynchronizer startSynchronizer;

@Inject
public KubernetesInternalRuntime(
Expand All @@ -122,6 +121,7 @@ public KubernetesInternalRuntime(
KubernetesSharedPool sharedPool,
KubernetesRuntimeStateCache runtimeStates,
KubernetesMachineCache machines,
StartSynchronizerFactory startSynchronizerFactory,
@Assisted T context,
@Assisted KubernetesNamespace namespace,
@Assisted List<Warning> warnings) {
Expand All @@ -139,22 +139,30 @@ public KubernetesInternalRuntime(
this.executor = sharedPool.getExecutor();
this.runtimeStates = runtimeStates;
this.machines = machines;
this.startSynchronizer = startSynchronizerFactory.create(context.getIdentity());
}

@Override
protected void internalStart(Map<String, String> startOptions) throws InfrastructureException {
KubernetesRuntimeContext<? extends KubernetesEnvironment> context = getContext();
String workspaceId = context.getIdentity().getWorkspaceId();
try {
final KubernetesEnvironment k8sEnv = context.getEnvironment();
volumesStrategy.prepare(k8sEnv, workspaceId);
startSynchronizer.setStartThread();
startSynchronizer.start();

volumesStrategy.prepare(context.getEnvironment(), workspaceId);

startSynchronizer.checkFailure();

startMachines();

final CompletableFuture<Void> failure = new CompletableFuture<>();
startSynchronizer.checkFailure();

final List<CompletableFuture<Void>> machinesFutures = new ArrayList<>();
// futures that must be cancelled explicitly
final List<CompletableFuture<?>> toCancelFutures = new CopyOnWriteArrayList<>();
final EnvironmentContext currentContext = EnvironmentContext.getCurrent();
CompletableFuture<Void> startFailure = startSynchronizer.getStartFailure();

for (KubernetesMachineImpl machine : machines.getMachines(context.getIdentity()).values()) {
String machineName = machine.getName();
Expand All @@ -163,35 +171,49 @@ protected void internalStart(Map<String, String> startOptions) throws Infrastruc
// since machine running future will be completed from the thread that is not from
// kubernetes pool it's needed to explicitly put the executor to not to delay
// processing in the external pool.
.thenComposeAsync(checkFailure(failure), executor)
.thenComposeAsync(checkFailure(startFailure), executor)
.thenRun(publishRunningStatus(machineName))
.thenCompose(checkFailure(failure))
.thenCompose(checkFailure(startFailure))
.thenCompose(setContext(currentContext, bootstrap(toCancelFutures, machine)))
// see comments above why executor is explicitly put into arguments
.thenComposeAsync(checkFailure(failure), executor)
.thenComposeAsync(checkFailure(startFailure), executor)
.thenCompose(setContext(currentContext, checkServers(toCancelFutures, machine)))
.exceptionally(publishFailedStatus(failure, machineName));
.exceptionally(publishFailedStatus(startFailure, machineName));
machinesFutures.add(machineBootChain);
}

waitMachines(machinesFutures, toCancelFutures, failure);
} catch (InfrastructureException | RuntimeException | InterruptedException e) {
waitMachines(machinesFutures, toCancelFutures, startFailure);
startSynchronizer.complete();
} catch (InfrastructureException | RuntimeException e) {
Exception startFailureCause = startSynchronizer.getStartFailureNow();
if (startFailureCause == null) {
startFailureCause = e;
}

LOG.warn(
"Failed to start Kubernetes runtime of workspace {}. Cause: {}",
workspaceId,
e.getMessage());
boolean interrupted = Thread.interrupted() || e instanceof InterruptedException;
startFailureCause.getMessage());
boolean interrupted =
Thread.interrupted() || startFailureCause instanceof RuntimeStartInterruptedException;
// Cancels workspace servers probes if any
probeScheduler.cancel(workspaceId);
// stop watching before namespace cleaning up
namespace.pods().stopWatch();
try {
namespace.cleanUp();
} catch (InfrastructureException ignored) {
} catch (InfrastructureException cleanUppingEx) {
LOG.warn(
"Failed to clean up namespace after workspace '{}' start failing. Cause: {}",
context.getIdentity().getWorkspaceId(),
cleanUppingEx.getMessage());
}

startSynchronizer.completeExceptionally(startFailureCause);
if (interrupted) {
throw new InfrastructureException("Kubernetes environment start was interrupted");
throw new RuntimeStartInterruptedException(getContext().getIdentity());
}
wrapAndRethrow(e);
wrapAndRethrow(startFailureCause);
} finally {
namespace.pods().stopWatch();
}
Expand All @@ -217,20 +239,21 @@ private <T, R> Function<T, R> setContext(EnvironmentContext context, Function<T,
* @param failure failure callback that is used to prevent subsequent steps when any error occurs
* @throws InfrastructureException when waiting for machines exceeds the timeout
* @throws InfrastructureException when any problem occurred while waiting
* @throws InterruptedException when the thread is interrupted while waiting machines
* @throws RuntimeStartInterruptedException when the thread is interrupted while waiting machines
*/
private void waitMachines(
List<CompletableFuture<Void>> machinesFutures,
List<CompletableFuture<?>> toCancelFutures,
CompletableFuture<Void> failure)
throws InfrastructureException, InterruptedException {
throws InfrastructureException {
try {
final CompletableFuture<Void> allDone =
CompletableFuture.allOf(
machinesFutures.toArray(new CompletableFuture[machinesFutures.size()]));
CompletableFuture.anyOf(allDone, failure).get(workspaceStartTimeout, TimeUnit.MINUTES);

if (failure.isCompletedExceptionally()) {
cancelAll(toCancelFutures);
// rethrow the failure cause
failure.get();
}
Expand All @@ -244,9 +267,11 @@ private void waitMachines(
+ getContext().getIdentity().getWorkspaceId()
+ "' reached timeout");
} catch (InterruptedException ex) {
failure.completeExceptionally(ex);
RuntimeStartInterruptedException runtimeInterruptedEx =
new RuntimeStartInterruptedException(getContext().getIdentity());
failure.completeExceptionally(runtimeInterruptedEx);
cancelAll(toCancelFutures);
throw ex;
throw runtimeInterruptedEx;
} catch (ExecutionException ex) {
failure.completeExceptionally(ex);
cancelAll(toCancelFutures);
Expand Down Expand Up @@ -312,7 +337,11 @@ private Function<Void, CompletionStage<Void>> bootstrap(
bootstrapperFuture =
bootstrapperFactory
.create(
getContext().getIdentity(), machineConfig.getInstallers(), machine, namespace)
getContext().getIdentity(),
machineConfig.getInstallers(),
machine,
namespace,
startSynchronizer)
.bootstrapAsync();
toCancelFutures.add(bootstrapperFuture);
} else {
Expand Down Expand Up @@ -404,10 +433,28 @@ private static void cancelAll(Collection<CompletableFuture<?>> toClose) {

@Override
protected void internalStop(Map<String, String> stopOptions) throws InfrastructureException {
// Cancels workspace servers probes if any
RuntimeIdentity identity = getContext().getIdentity();
probeScheduler.cancel(identity.getWorkspaceId());
namespace.cleanUp();
if (startSynchronizer.interrupt()) {
// runtime is STARTING. Need to wait until start will be interrupted properly
try {
if (!startSynchronizer.awaitInterruption(workspaceStartTimeout, TimeUnit.SECONDS)) {
// Runtime is not interrupted yet. It may occur when start was performing by another
// Che Server that is crashed so start is hung up in STOPPING phase.
// Need to clean up runtime resources
RuntimeIdentity identity = getContext().getIdentity();
probeScheduler.cancel(identity.getWorkspaceId());
namespace.cleanUp();
}
} catch (InterruptedException e) {
throw new InfrastructureException(
"Interrupted while waiting for start task cancellation", e);
}
} else {
// runtime is RUNNING. Clean up used resources
RuntimeIdentity identity = getContext().getIdentity();
// Cancels workspace servers probes if any
probeScheduler.cancel(identity.getWorkspaceId());
namespace.cleanUp();
}
}

@Override
Expand Down Expand Up @@ -498,11 +545,19 @@ protected void markRunning() throws InfrastructureException {

@Override
protected void markStopping() throws InfrastructureException {
RuntimeIdentity runtimeId = getContext().getIdentity();

// Check if runtime is in STARTING phase to actualize state of startSynchronizer.
WorkspaceStatus status = runtimeStates.getStatus(runtimeId);
if (status == WorkspaceStatus.STARTING) {
startSynchronizer.start();
}

if (!runtimeStates.updateStatus(
getContext().getIdentity(),
runtimeId,
s -> s == WorkspaceStatus.RUNNING || s == WorkspaceStatus.STARTING,
WorkspaceStatus.STOPPING)) {
throw new StateException("The environment must be running");
throw new StateException("The environment must be running or starting");
}
}

Expand Down Expand Up @@ -681,17 +736,12 @@ public void handle(ContainerEvent event) {
reason,
message,
event.getPodName());
try {
internalStop(emptyMap());
} catch (InfrastructureException e) {
LOG.error("Error occurred during runtime '{}' stopping. {}", workspaceId, e.getMessage());
} finally {
eventPublisher.sendRuntimeStoppedEvent(
format(
"Unrecoverable event occurred: '%s', '%s', '%s'",
reason, message, event.getPodName()),
getContext().getIdentity());
}

startSynchronizer.completeExceptionally(
new InfrastructureException(
format(
"Unrecoverable event occurred: '%s', '%s', '%s'",
reason, message, event.getPodName())));
}
}

Expand All @@ -701,16 +751,6 @@ private boolean isWorkspaceEvent(ContainerEvent event) {
return workspacePods.containsKey(podName);
}

private Date convertEventTimestampToDate(String timestamp) {
Date date = null;
try {
date = ContainerEvents.convertEventTimestampToDate(timestamp);
} catch (ParseException e) {
LOG.error("Error occurred during parsing the event timestamp '{}'", timestamp);
}
return date;
}

/**
* Returns true if event reason or message matches one of the comma separated values defined in
* 'che.infra.kubernetes.workspace_unrecoverable_events',false otherwise
Expand Down

0 comments on commit 6c21574

Please sign in to comment.