From 9bfcd9eb422ffd1dfbdcd8c57d6a50f038621975 Mon Sep 17 00:00:00 2001 From: Sergii Leshchenko Date: Thu, 14 Nov 2019 18:06:31 +0200 Subject: [PATCH] Publish PluginBroker pod events as runtime log Signed-off-by: Sergii Leshchenko --- .../kubernetes/RuntimeLogsPublisher.java | 42 +++++++++++++++++++ .../util/RuntimeEventsPublisher.java | 8 ++++ .../wsplugins/PluginBrokerManager.java | 12 ++++-- .../wsplugins/brokerphases/DeployBroker.java | 30 +++++++++---- .../brokerphases/DeployBrokerTest.java | 25 +++++++++-- 5 files changed, 102 insertions(+), 15 deletions(-) create mode 100644 infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/RuntimeLogsPublisher.java diff --git a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/RuntimeLogsPublisher.java b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/RuntimeLogsPublisher.java new file mode 100644 index 000000000000..b4bc1a37b6b5 --- /dev/null +++ b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/RuntimeLogsPublisher.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2012-2018 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package org.eclipse.che.workspace.infrastructure.kubernetes; + +import java.util.Set; +import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity; +import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEvent; +import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEventHandler; +import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher; + +/** Listens pod events and publish them as runtime logs. */ +public class RuntimeLogsPublisher implements PodEventHandler { + + private final RuntimeEventsPublisher eventPublisher; + private final RuntimeIdentity runtimeIdentity; + private final Set pods; + + public RuntimeLogsPublisher( + RuntimeEventsPublisher eventPublisher, RuntimeIdentity runtimeIdentity, Set pods) { + this.eventPublisher = eventPublisher; + this.pods = pods; + this.runtimeIdentity = runtimeIdentity; + } + + @Override + public void handle(PodEvent event) { + final String podName = event.getPodName(); + if (pods.contains(podName)) { + eventPublisher.sendRuntimeLogEvent( + event.getMessage(), event.getCreationTimeStamp(), runtimeIdentity); + } + } +} diff --git a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/util/RuntimeEventsPublisher.java b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/util/RuntimeEventsPublisher.java index e857220d6da1..71187f4c8397 100644 --- a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/util/RuntimeEventsPublisher.java +++ b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/util/RuntimeEventsPublisher.java @@ -94,6 +94,14 @@ public void sendMachineLogEvent( .withTime(time)); } + public void sendRuntimeLogEvent(String text, String time, RuntimeIdentity runtimeId) { + eventService.publish( + DtoFactory.newDto(RuntimeLogEvent.class) + .withRuntimeId(DtoConverter.asDto(runtimeId)) + .withText(text) + .withTime(time)); + } + public void sendAbnormalStoppedEvent(RuntimeIdentity runtimeId, String reason) { eventService.publish(new RuntimeAbnormalStoppedEvent(runtimeId, reason)); } diff --git a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/PluginBrokerManager.java b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/PluginBrokerManager.java index 7d87cd974a85..a4184e72dbcd 100644 --- a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/PluginBrokerManager.java +++ b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/PluginBrokerManager.java @@ -30,6 +30,7 @@ import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespaceFactory; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.pvc.EphemeralWorkspaceUtility; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.pvc.WorkspaceVolumesStrategy; +import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher; import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory; import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.brokerphases.BrokerEnvironmentFactory; import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.brokerphases.DeployBroker; @@ -60,6 +61,7 @@ public class PluginBrokerManager { private final BrokerEnvironmentFactory brokerEnvironmentFactory; private final KubernetesEnvironmentProvisioner environmentProvisioner; private final UnrecoverablePodEventListenerFactory unrecoverablePodEventListenerFactory; + private final RuntimeEventsPublisher runtimeEventsPublisher; private final Tracer tracer; @Inject @@ -72,6 +74,7 @@ public PluginBrokerManager( BrokerEnvironmentFactory brokerEnvironmentFactory, UnrecoverablePodEventListenerFactory unrecoverablePodEventListenerFactory, @Named("che.workspace.plugin_broker.wait_timeout_min") int pluginBrokerWaitingTimeout, + RuntimeEventsPublisher runtimeEventsPublisher, Tracer tracer) { this.factory = factory; this.eventService = eventService; @@ -81,6 +84,7 @@ public PluginBrokerManager( this.environmentProvisioner = environmentProvisioner; this.pluginBrokerWaitingTimeout = pluginBrokerWaitingTimeout; this.unrecoverablePodEventListenerFactory = unrecoverablePodEventListenerFactory; + this.runtimeEventsPublisher = runtimeEventsPublisher; this.tracer = tracer; } @@ -114,8 +118,7 @@ public List getTooling( getPrepareStoragePhase(workspaceId, startSynchronizer, brokerEnvironment); WaitBrokerResult waitBrokerResult = getWaitBrokerPhase(workspaceId, brokersResult); DeployBroker deployBroker = - getDeployBrokerPhase( - runtimeID.getWorkspaceId(), kubernetesNamespace, brokerEnvironment, brokersResult); + getDeployBrokerPhase(runtimeID, kubernetesNamespace, brokerEnvironment, brokersResult); LOG.debug("Entering plugin brokers deployment chain workspace '{}'", workspaceId); listenBrokerEvents.then(prepareStorage).then(deployBroker).then(waitBrokerResult); return listenBrokerEvents.execute(); @@ -134,16 +137,17 @@ private PrepareStorage getPrepareStoragePhase( } private DeployBroker getDeployBrokerPhase( - String workspaceId, + RuntimeIdentity runtimeId, KubernetesNamespace kubernetesNamespace, KubernetesEnvironment brokerEnvironment, BrokersResult brokersResult) { return new DeployBroker( - workspaceId, + runtimeId, kubernetesNamespace, brokerEnvironment, brokersResult, unrecoverablePodEventListenerFactory, + runtimeEventsPublisher, tracer); } diff --git a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/brokerphases/DeployBroker.java b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/brokerphases/DeployBroker.java index bd8b495f90ae..85b98a319d32 100644 --- a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/brokerphases/DeployBroker.java +++ b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/brokerphases/DeployBroker.java @@ -24,14 +24,17 @@ import io.opentracing.Tracer; import java.util.List; import java.util.Map; +import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity; import org.eclipse.che.api.workspace.server.spi.InfrastructureException; import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException; import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin; import org.eclipse.che.commons.tracing.TracingTags; +import org.eclipse.che.workspace.infrastructure.kubernetes.RuntimeLogsPublisher; import org.eclipse.che.workspace.infrastructure.kubernetes.environment.KubernetesEnvironment; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesDeployments; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespace; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.event.PodEvent; +import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher; import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListener; import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory; import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.BrokersResult; @@ -50,33 +53,36 @@ public class DeployBroker extends BrokerPhase { private static final Logger LOG = getLogger(DeployBroker.class); + private final RuntimeEventsPublisher runtimeEventsPublisher; private final KubernetesNamespace namespace; private final KubernetesEnvironment brokerEnvironment; private final BrokersResult brokersResult; private final UnrecoverablePodEventListenerFactory factory; - private final String workspaceId; + private final RuntimeIdentity runtimeId; private final Tracer tracer; public DeployBroker( - String workspaceId, + RuntimeIdentity runtimeId, KubernetesNamespace namespace, KubernetesEnvironment brokerEnvironment, BrokersResult brokersResult, UnrecoverablePodEventListenerFactory factory, + RuntimeEventsPublisher runtimeEventsPublisher, Tracer tracer) { - this.workspaceId = workspaceId; + this.runtimeId = runtimeId; this.namespace = namespace; this.brokerEnvironment = brokerEnvironment; this.brokersResult = brokersResult; this.factory = factory; + this.runtimeEventsPublisher = runtimeEventsPublisher; this.tracer = tracer; } @Override public List execute() throws InfrastructureException { - LOG.debug("Starting brokers pod for workspace '{}'", workspaceId); + LOG.debug("Starting brokers pod for workspace '{}'", runtimeId.getWorkspaceId()); Span tracingSpan = tracer.buildSpan(DEPLOY_BROKER_PHASE).start(); - TracingTags.WORKSPACE_ID.set(tracingSpan, workspaceId); + TracingTags.WORKSPACE_ID.set(tracingSpan, runtimeId.getWorkspaceId()); KubernetesDeployments deployments = namespace.deployments(); try { @@ -100,9 +106,17 @@ public List execute() throws InfrastructureException { namespace.deployments().watchEvents(unrecoverableEventListener); } + namespace + .deployments() + .watchEvents( + new RuntimeLogsPublisher( + runtimeEventsPublisher, + runtimeId, + ImmutableSet.of(pluginBrokerPod.getMetadata().getName()))); + deployments.create(pluginBrokerPod); - LOG.debug("Brokers pod is created for workspace '{}'", workspaceId); + LOG.debug("Brokers pod is created for workspace '{}'", runtimeId.getWorkspaceId()); tracingSpan.finish(); return nextPhase.execute(); } catch (InfrastructureException e) { @@ -135,7 +149,7 @@ private void handleUnrecoverableEvent(PodEvent podEvent) { String message = podEvent.getMessage(); LOG.error( "Unrecoverable event occurred during plugin brokering for workspace '{}' startup: {}, {}, {}", - workspaceId, + runtimeId.getWorkspaceId(), reason, message, podEvent.getPodName()); @@ -152,7 +166,7 @@ private Pod getPluginBrokerPod(Map pods) throws InfrastructureExcep format( "Plugin broker environment must have only " + "one pod. Workspace `%s` contains `%s` pods.", - workspaceId, pods.size())); + runtimeId.getWorkspaceId(), pods.size())); } return pods.values().iterator().next(); diff --git a/infrastructures/kubernetes/src/test/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/brokerphases/DeployBrokerTest.java b/infrastructures/kubernetes/src/test/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/brokerphases/DeployBrokerTest.java index 08d0db1a6471..dc82b8816b9b 100644 --- a/infrastructures/kubernetes/src/test/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/brokerphases/DeployBrokerTest.java +++ b/infrastructures/kubernetes/src/test/java/org/eclipse/che/workspace/infrastructure/kubernetes/wsplugins/brokerphases/DeployBrokerTest.java @@ -30,13 +30,17 @@ import io.opentracing.Tracer; import java.util.List; import java.util.Set; +import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity; +import org.eclipse.che.api.workspace.server.model.impl.RuntimeIdentityImpl; import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException; import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin; +import org.eclipse.che.workspace.infrastructure.kubernetes.RuntimeLogsPublisher; import org.eclipse.che.workspace.infrastructure.kubernetes.environment.KubernetesEnvironment; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesConfigsMaps; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesDeployments; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespace; import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesSecrets; +import org.eclipse.che.workspace.infrastructure.kubernetes.util.RuntimeEventsPublisher; import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListener; import org.eclipse.che.workspace.infrastructure.kubernetes.util.UnrecoverablePodEventListenerFactory; import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.BrokersResult; @@ -55,7 +59,9 @@ @Listeners(MockitoTestNGListener.class) public class DeployBrokerTest { - public static final String PLUGIN_BROKER_POD_NAME = "pluginBrokerPodName"; + private static final String PLUGIN_BROKER_POD_NAME = "pluginBrokerPodName"; + private static final RuntimeIdentity RUNTIME_ID = + new RuntimeIdentityImpl("workspaceId", "env", "userId"); @Mock private BrokerPhase nextBrokerPhase; @Mock private KubernetesNamespace k8sNamespace; @@ -70,6 +76,7 @@ public class DeployBrokerTest { @Mock private BrokersResult brokersResult; @Mock private UnrecoverablePodEventListenerFactory unrecoverableEventListenerFactory; + @Mock private RuntimeEventsPublisher runtimeEventPublisher; @Mock(answer = Answers.RETURNS_MOCKS) private Tracer tracer; @@ -82,11 +89,12 @@ public class DeployBrokerTest { public void setUp() throws Exception { deployBrokerPhase = new DeployBroker( - "workspaceId", + RUNTIME_ID, k8sNamespace, k8sEnvironment, brokersResult, unrecoverableEventListenerFactory, + runtimeEventPublisher, tracer); deployBrokerPhase.then(nextBrokerPhase); @@ -139,6 +147,17 @@ public void shouldListenToUnrecoverableEventsIfFactoryIsConfigured() throws Exce verify(k8sDeployments).stopWatch(); } + @Test + public void shouldListenToPodEventsToPropagateThemAsLogs() throws Exception { + // given + // when + deployBrokerPhase.execute(); + + // then + verify(k8sDeployments).watchEvents(any(RuntimeLogsPublisher.class)); + verify(k8sDeployments).stopWatch(); + } + @Test public void shouldDoNotListenToUnrecoverableEventsIfFactoryIsConfigured() throws Exception { // given @@ -151,7 +170,7 @@ public void shouldDoNotListenToUnrecoverableEventsIfFactoryIsConfigured() throws verify(unrecoverableEventListenerFactory).isConfigured(); verify(unrecoverableEventListenerFactory, never()) .create(eq(ImmutableSet.of(PLUGIN_BROKER_POD_NAME)), any()); - verify(k8sDeployments, never()).watchEvents(any()); + verify(k8sDeployments, never()).watchEvents(any(UnrecoverablePodEventListener.class)); verify(k8sDeployments).stopWatch(); }