Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish PluginBroker pod events as runtime log #15198

Merged
merged 1 commit into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.workspace.infrastructure.kubernetes;

import java.util.Set;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEvent;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEventHandler;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;

/** Listens pod events and publish them as runtime logs. */
public class RuntimeLogsPublisher implements PodEventHandler {

private final RuntimeEventsPublisher eventPublisher;
private final RuntimeIdentity runtimeIdentity;
private final Set<String> pods;

public RuntimeLogsPublisher(
RuntimeEventsPublisher eventPublisher, RuntimeIdentity runtimeIdentity, Set<String> pods) {
this.eventPublisher = eventPublisher;
this.pods = pods;
this.runtimeIdentity = runtimeIdentity;
}

@Override
public void handle(PodEvent event) {
final String podName = event.getPodName();
if (pods.contains(podName)) {
eventPublisher.sendRuntimeLogEvent(
event.getMessage(), event.getCreationTimeStamp(), runtimeIdentity);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ public void sendMachineLogEvent(
.withTime(time));
}

public void sendRuntimeLogEvent(String text, String time, RuntimeIdentity runtimeId) {
eventService.publish(
DtoFactory.newDto(RuntimeLogEvent.class)
.withRuntimeId(DtoConverter.asDto(runtimeId))
.withText(text)
.withTime(time));
}

public void sendAbnormalStoppedEvent(RuntimeIdentity runtimeId, String reason) {
eventService.publish(new RuntimeAbnormalStoppedEvent(runtimeId, reason));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespaceFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.pvc.EphemeralWorkspaceUtility;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.pvc.WorkspaceVolumesStrategy;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.brokerphases.BrokerEnvironmentFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.brokerphases.DeployBroker;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class PluginBrokerManager<E extends KubernetesEnvironment> {
private final BrokerEnvironmentFactory<E> brokerEnvironmentFactory;
private final KubernetesEnvironmentProvisioner<E> environmentProvisioner;
private final UnrecoverablePodEventListenerFactory unrecoverablePodEventListenerFactory;
private final RuntimeEventsPublisher runtimeEventsPublisher;
private final Tracer tracer;

@Inject
Expand All @@ -72,6 +74,7 @@ public PluginBrokerManager(
BrokerEnvironmentFactory<E> brokerEnvironmentFactory,
UnrecoverablePodEventListenerFactory unrecoverablePodEventListenerFactory,
@Named("che.workspace.plugin_broker.wait_timeout_min") int pluginBrokerWaitingTimeout,
RuntimeEventsPublisher runtimeEventsPublisher,
Tracer tracer) {
this.factory = factory;
this.eventService = eventService;
Expand All @@ -81,6 +84,7 @@ public PluginBrokerManager(
this.environmentProvisioner = environmentProvisioner;
this.pluginBrokerWaitingTimeout = pluginBrokerWaitingTimeout;
this.unrecoverablePodEventListenerFactory = unrecoverablePodEventListenerFactory;
this.runtimeEventsPublisher = runtimeEventsPublisher;
this.tracer = tracer;
}

Expand Down Expand Up @@ -114,8 +118,7 @@ public List<ChePlugin> getTooling(
getPrepareStoragePhase(workspaceId, startSynchronizer, brokerEnvironment);
WaitBrokerResult waitBrokerResult = getWaitBrokerPhase(workspaceId, brokersResult);
DeployBroker deployBroker =
getDeployBrokerPhase(
runtimeID.getWorkspaceId(), kubernetesNamespace, brokerEnvironment, brokersResult);
getDeployBrokerPhase(runtimeID, kubernetesNamespace, brokerEnvironment, brokersResult);
LOG.debug("Entering plugin brokers deployment chain workspace '{}'", workspaceId);
listenBrokerEvents.then(prepareStorage).then(deployBroker).then(waitBrokerResult);
return listenBrokerEvents.execute();
Expand All @@ -134,16 +137,17 @@ private PrepareStorage getPrepareStoragePhase(
}

private DeployBroker getDeployBrokerPhase(
String workspaceId,
RuntimeIdentity runtimeId,
KubernetesNamespace kubernetesNamespace,
KubernetesEnvironment brokerEnvironment,
BrokersResult brokersResult) {
return new DeployBroker(
workspaceId,
runtimeId,
kubernetesNamespace,
brokerEnvironment,
brokersResult,
unrecoverablePodEventListenerFactory,
runtimeEventsPublisher,
tracer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import io.opentracing.Tracer;
import java.util.List;
import java.util.Map;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
import org.eclipse.che.commons.tracing.TracingTags;
import org.eclipse.che.workspace.infrastructure.kubernetes.RuntimeLogsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.environment.KubernetesEnvironment;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesDeployments;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespace;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEvent;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListener;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.BrokersResult;
Expand All @@ -50,33 +53,36 @@ public class DeployBroker extends BrokerPhase {

private static final Logger LOG = getLogger(DeployBroker.class);

private final RuntimeEventsPublisher runtimeEventsPublisher;
private final KubernetesNamespace namespace;
private final KubernetesEnvironment brokerEnvironment;
private final BrokersResult brokersResult;
private final UnrecoverablePodEventListenerFactory factory;
private final String workspaceId;
private final RuntimeIdentity runtimeId;
private final Tracer tracer;

public DeployBroker(
String workspaceId,
RuntimeIdentity runtimeId,
KubernetesNamespace namespace,
KubernetesEnvironment brokerEnvironment,
BrokersResult brokersResult,
UnrecoverablePodEventListenerFactory factory,
RuntimeEventsPublisher runtimeEventsPublisher,
Tracer tracer) {
this.workspaceId = workspaceId;
this.runtimeId = runtimeId;
this.namespace = namespace;
this.brokerEnvironment = brokerEnvironment;
this.brokersResult = brokersResult;
this.factory = factory;
this.runtimeEventsPublisher = runtimeEventsPublisher;
this.tracer = tracer;
}

@Override
public List<ChePlugin> execute() throws InfrastructureException {
LOG.debug("Starting brokers pod for workspace '{}'", workspaceId);
LOG.debug("Starting brokers pod for workspace '{}'", runtimeId.getWorkspaceId());
Span tracingSpan = tracer.buildSpan(DEPLOY_BROKER_PHASE).start();
TracingTags.WORKSPACE_ID.set(tracingSpan, workspaceId);
TracingTags.WORKSPACE_ID.set(tracingSpan, runtimeId.getWorkspaceId());

KubernetesDeployments deployments = namespace.deployments();
try {
Expand All @@ -100,9 +106,17 @@ public List<ChePlugin> execute() throws InfrastructureException {
namespace.deployments().watchEvents(unrecoverableEventListener);
}

namespace
.deployments()
.watchEvents(
new RuntimeLogsPublisher(
runtimeEventsPublisher,
runtimeId,
ImmutableSet.of(pluginBrokerPod.getMetadata().getName())));

deployments.create(pluginBrokerPod);

LOG.debug("Brokers pod is created for workspace '{}'", workspaceId);
LOG.debug("Brokers pod is created for workspace '{}'", runtimeId.getWorkspaceId());
tracingSpan.finish();
return nextPhase.execute();
} catch (InfrastructureException e) {
Expand Down Expand Up @@ -135,7 +149,7 @@ private void handleUnrecoverableEvent(PodEvent podEvent) {
String message = podEvent.getMessage();
LOG.error(
"Unrecoverable event occurred during plugin brokering for workspace '{}' startup: {}, {}, {}",
workspaceId,
runtimeId.getWorkspaceId(),
reason,
message,
podEvent.getPodName());
Expand All @@ -152,7 +166,7 @@ private Pod getPluginBrokerPod(Map<String, Pod> pods) throws InfrastructureExcep
format(
"Plugin broker environment must have only "
+ "one pod. Workspace `%s` contains `%s` pods.",
workspaceId, pods.size()));
runtimeId.getWorkspaceId(), pods.size()));
}

return pods.values().iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import io.opentracing.Tracer;
import java.util.List;
import java.util.Set;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.api.workspace.server.model.impl.RuntimeIdentityImpl;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
import org.eclipse.che.workspace.infrastructure.kubernetes.RuntimeLogsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.environment.KubernetesEnvironment;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesConfigsMaps;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesDeployments;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespace;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesSecrets;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListener;
import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.BrokersResult;
Expand All @@ -55,7 +59,9 @@
@Listeners(MockitoTestNGListener.class)
public class DeployBrokerTest {

public static final String PLUGIN_BROKER_POD_NAME = "pluginBrokerPodName";
private static final String PLUGIN_BROKER_POD_NAME = "pluginBrokerPodName";
private static final RuntimeIdentity RUNTIME_ID =
new RuntimeIdentityImpl("workspaceId", "env", "userId");
@Mock private BrokerPhase nextBrokerPhase;

@Mock private KubernetesNamespace k8sNamespace;
Expand All @@ -70,6 +76,7 @@ public class DeployBrokerTest {

@Mock private BrokersResult brokersResult;
@Mock private UnrecoverablePodEventListenerFactory unrecoverableEventListenerFactory;
@Mock private RuntimeEventsPublisher runtimeEventPublisher;

@Mock(answer = Answers.RETURNS_MOCKS)
private Tracer tracer;
Expand All @@ -82,11 +89,12 @@ public class DeployBrokerTest {
public void setUp() throws Exception {
deployBrokerPhase =
new DeployBroker(
"workspaceId",
RUNTIME_ID,
k8sNamespace,
k8sEnvironment,
brokersResult,
unrecoverableEventListenerFactory,
runtimeEventPublisher,
tracer);
deployBrokerPhase.then(nextBrokerPhase);

Expand Down Expand Up @@ -139,6 +147,17 @@ public void shouldListenToUnrecoverableEventsIfFactoryIsConfigured() throws Exce
verify(k8sDeployments).stopWatch();
}

@Test
public void shouldListenToPodEventsToPropagateThemAsLogs() throws Exception {
// given
// when
deployBrokerPhase.execute();

// then
verify(k8sDeployments).watchEvents(any(RuntimeLogsPublisher.class));
verify(k8sDeployments).stopWatch();
}

@Test
public void shouldDoNotListenToUnrecoverableEventsIfFactoryIsConfigured() throws Exception {
// given
Expand All @@ -151,7 +170,7 @@ public void shouldDoNotListenToUnrecoverableEventsIfFactoryIsConfigured() throws
verify(unrecoverableEventListenerFactory).isConfigured();
verify(unrecoverableEventListenerFactory, never())
.create(eq(ImmutableSet.of(PLUGIN_BROKER_POD_NAME)), any());
verify(k8sDeployments, never()).watchEvents(any());
verify(k8sDeployments, never()).watchEvents(any(UnrecoverablePodEventListener.class));
verify(k8sDeployments).stopWatch();
}

Expand Down