Skip to content

Commit

Permalink
Publish PluginBroker pod events as runtime log (#15198)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergii Leshchenko <sleshche@redhat.com>
  • Loading branch information
sleshchenko committed Nov 21, 2019
1 parent 6e0ce6d commit 5a1344b
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.workspace.infrastructure.kubernetes;

import java.util.Set;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEvent;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEventHandler;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;

/** Listens pod events and publish them as runtime logs. */
public class RuntimeLogsPublisher implements PodEventHandler {

private final RuntimeEventsPublisher eventPublisher;
private final RuntimeIdentity runtimeIdentity;
private final Set<String> pods;

public RuntimeLogsPublisher(
RuntimeEventsPublisher eventPublisher, RuntimeIdentity runtimeIdentity, Set<String> pods) {
this.eventPublisher = eventPublisher;
this.pods = pods;
this.runtimeIdentity = runtimeIdentity;
}

@Override
public void handle(PodEvent event) {
final String podName = event.getPodName();
if (pods.contains(podName)) {
eventPublisher.sendRuntimeLogEvent(
event.getMessage(), event.getCreationTimeStamp(), runtimeIdentity);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ public void sendMachineLogEvent(
.withTime(time));
}

public void sendRuntimeLogEvent(String text, String time, RuntimeIdentity runtimeId) {
eventService.publish(
DtoFactory.newDto(RuntimeLogEvent.class)
.withRuntimeId(DtoConverter.asDto(runtimeId))
.withText(text)
.withTime(time));
}

public void sendAbnormalStoppedEvent(RuntimeIdentity runtimeId, String reason) {
eventService.publish(new RuntimeAbnormalStoppedEvent(runtimeId, reason));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespaceFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.pvc.EphemeralWorkspaceUtility;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.pvc.WorkspaceVolumesStrategy;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.brokerphases.BrokerEnvironmentFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.brokerphases.DeployBroker;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class PluginBrokerManager<E extends KubernetesEnvironment> {
private final BrokerEnvironmentFactory<E> brokerEnvironmentFactory;
private final KubernetesEnvironmentProvisioner<E> environmentProvisioner;
private final UnrecoverablePodEventListenerFactory unrecoverablePodEventListenerFactory;
private final RuntimeEventsPublisher runtimeEventsPublisher;
private final Tracer tracer;

@Inject
Expand All @@ -72,6 +74,7 @@ public PluginBrokerManager(
BrokerEnvironmentFactory<E> brokerEnvironmentFactory,
UnrecoverablePodEventListenerFactory unrecoverablePodEventListenerFactory,
@Named("che.workspace.plugin_broker.wait_timeout_min") int pluginBrokerWaitingTimeout,
RuntimeEventsPublisher runtimeEventsPublisher,
Tracer tracer) {
this.factory = factory;
this.eventService = eventService;
Expand All @@ -81,6 +84,7 @@ public PluginBrokerManager(
this.environmentProvisioner = environmentProvisioner;
this.pluginBrokerWaitingTimeout = pluginBrokerWaitingTimeout;
this.unrecoverablePodEventListenerFactory = unrecoverablePodEventListenerFactory;
this.runtimeEventsPublisher = runtimeEventsPublisher;
this.tracer = tracer;
}

Expand Down Expand Up @@ -114,8 +118,7 @@ public List<ChePlugin> getTooling(
getPrepareStoragePhase(workspaceId, startSynchronizer, brokerEnvironment);
WaitBrokerResult waitBrokerResult = getWaitBrokerPhase(workspaceId, brokersResult);
DeployBroker deployBroker =
getDeployBrokerPhase(
runtimeID.getWorkspaceId(), kubernetesNamespace, brokerEnvironment, brokersResult);
getDeployBrokerPhase(runtimeID, kubernetesNamespace, brokerEnvironment, brokersResult);
LOG.debug("Entering plugin brokers deployment chain workspace '{}'", workspaceId);
listenBrokerEvents.then(prepareStorage).then(deployBroker).then(waitBrokerResult);
return listenBrokerEvents.execute();
Expand All @@ -134,16 +137,17 @@ private PrepareStorage getPrepareStoragePhase(
}

private DeployBroker getDeployBrokerPhase(
String workspaceId,
RuntimeIdentity runtimeId,
KubernetesNamespace kubernetesNamespace,
KubernetesEnvironment brokerEnvironment,
BrokersResult brokersResult) {
return new DeployBroker(
workspaceId,
runtimeId,
kubernetesNamespace,
brokerEnvironment,
brokersResult,
unrecoverablePodEventListenerFactory,
runtimeEventsPublisher,
tracer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import io.opentracing.Tracer;
import java.util.List;
import java.util.Map;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
import org.eclipse.che.commons.tracing.TracingTags;
import org.eclipse.che.workspace.infrastructure.kubernetes.RuntimeLogsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.environment.KubernetesEnvironment;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesDeployments;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespace;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEvent;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListener;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.BrokersResult;
Expand All @@ -50,33 +53,36 @@ public class DeployBroker extends BrokerPhase {

private static final Logger LOG = getLogger(DeployBroker.class);

private final RuntimeEventsPublisher runtimeEventsPublisher;
private final KubernetesNamespace namespace;
private final KubernetesEnvironment brokerEnvironment;
private final BrokersResult brokersResult;
private final UnrecoverablePodEventListenerFactory factory;
private final String workspaceId;
private final RuntimeIdentity runtimeId;
private final Tracer tracer;

public DeployBroker(
String workspaceId,
RuntimeIdentity runtimeId,
KubernetesNamespace namespace,
KubernetesEnvironment brokerEnvironment,
BrokersResult brokersResult,
UnrecoverablePodEventListenerFactory factory,
RuntimeEventsPublisher runtimeEventsPublisher,
Tracer tracer) {
this.workspaceId = workspaceId;
this.runtimeId = runtimeId;
this.namespace = namespace;
this.brokerEnvironment = brokerEnvironment;
this.brokersResult = brokersResult;
this.factory = factory;
this.runtimeEventsPublisher = runtimeEventsPublisher;
this.tracer = tracer;
}

@Override
public List<ChePlugin> execute() throws InfrastructureException {
LOG.debug("Starting brokers pod for workspace '{}'", workspaceId);
LOG.debug("Starting brokers pod for workspace '{}'", runtimeId.getWorkspaceId());
Span tracingSpan = tracer.buildSpan(DEPLOY_BROKER_PHASE).start();
TracingTags.WORKSPACE_ID.set(tracingSpan, workspaceId);
TracingTags.WORKSPACE_ID.set(tracingSpan, runtimeId.getWorkspaceId());

KubernetesDeployments deployments = namespace.deployments();
try {
Expand All @@ -100,9 +106,17 @@ public List<ChePlugin> execute() throws InfrastructureException {
namespace.deployments().watchEvents(unrecoverableEventListener);
}

namespace
.deployments()
.watchEvents(
new RuntimeLogsPublisher(
runtimeEventsPublisher,
runtimeId,
ImmutableSet.of(pluginBrokerPod.getMetadata().getName())));

deployments.create(pluginBrokerPod);

LOG.debug("Brokers pod is created for workspace '{}'", workspaceId);
LOG.debug("Brokers pod is created for workspace '{}'", runtimeId.getWorkspaceId());
tracingSpan.finish();
return nextPhase.execute();
} catch (InfrastructureException e) {
Expand Down Expand Up @@ -135,7 +149,7 @@ private void handleUnrecoverableEvent(PodEvent podEvent) {
String message = podEvent.getMessage();
LOG.error(
"Unrecoverable event occurred during plugin brokering for workspace '{}' startup: {}, {}, {}",
workspaceId,
runtimeId.getWorkspaceId(),
reason,
message,
podEvent.getPodName());
Expand All @@ -152,7 +166,7 @@ private Pod getPluginBrokerPod(Map<String, Pod> pods) throws InfrastructureExcep
format(
"Plugin broker environment must have only "
+ "one pod. Workspace `%s` contains `%s` pods.",
workspaceId, pods.size()));
runtimeId.getWorkspaceId(), pods.size()));
}

return pods.values().iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import io.opentracing.Tracer;
import java.util.List;
import java.util.Set;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.api.workspace.server.model.impl.RuntimeIdentityImpl;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
import org.eclipse.che.workspace.infrastructure.kubernetes.RuntimeLogsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.environment.KubernetesEnvironment;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesConfigsMaps;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesDeployments;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespace;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesSecrets;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListener;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.BrokersResult;
Expand All @@ -55,7 +59,9 @@
@Listeners(MockitoTestNGListener.class)
public class DeployBrokerTest {

public static final String PLUGIN_BROKER_POD_NAME = "pluginBrokerPodName";
private static final String PLUGIN_BROKER_POD_NAME = "pluginBrokerPodName";
private static final RuntimeIdentity RUNTIME_ID =
new RuntimeIdentityImpl("workspaceId", "env", "userId");
@Mock private BrokerPhase nextBrokerPhase;

@Mock private KubernetesNamespace k8sNamespace;
Expand All @@ -70,6 +76,7 @@ public class DeployBrokerTest {

@Mock private BrokersResult brokersResult;
@Mock private UnrecoverablePodEventListenerFactory unrecoverableEventListenerFactory;
@Mock private RuntimeEventsPublisher runtimeEventPublisher;

@Mock(answer = Answers.RETURNS_MOCKS)
private Tracer tracer;
Expand All @@ -82,11 +89,12 @@ public class DeployBrokerTest {
public void setUp() throws Exception {
deployBrokerPhase =
new DeployBroker(
"workspaceId",
RUNTIME_ID,
k8sNamespace,
k8sEnvironment,
brokersResult,
unrecoverableEventListenerFactory,
runtimeEventPublisher,
tracer);
deployBrokerPhase.then(nextBrokerPhase);

Expand Down Expand Up @@ -139,6 +147,17 @@ public void shouldListenToUnrecoverableEventsIfFactoryIsConfigured() throws Exce
verify(k8sDeployments).stopWatch();
}

@Test
public void shouldListenToPodEventsToPropagateThemAsLogs() throws Exception {
// given
// when
deployBrokerPhase.execute();

// then
verify(k8sDeployments).watchEvents(any(RuntimeLogsPublisher.class));
verify(k8sDeployments).stopWatch();
}

@Test
public void shouldDoNotListenToUnrecoverableEventsIfFactoryIsConfigured() throws Exception {
// given
Expand All @@ -151,7 +170,7 @@ public void shouldDoNotListenToUnrecoverableEventsIfFactoryIsConfigured() throws
verify(unrecoverableEventListenerFactory).isConfigured();
verify(unrecoverableEventListenerFactory, never())
.create(eq(ImmutableSet.of(PLUGIN_BROKER_POD_NAME)), any());
verify(k8sDeployments, never()).watchEvents(any());
verify(k8sDeployments, never()).watchEvents(any(UnrecoverablePodEventListener.class));
verify(k8sDeployments).stopWatch();
}

Expand Down

0 comments on commit 5a1344b

Please sign in to comment.