Skip to content

Commit

Permalink
Make BrokerEvent suitable to be used for STARTED status event
Browse files Browse the repository at this point in the history
  • Loading branch information
sleshchenko committed Sep 27, 2018
1 parent b5b187e commit d9c210b
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 39 deletions.
Expand Up @@ -54,6 +54,7 @@ public List<ChePlugin> execute() throws InfrastructureException {
for (ConfigMap configMap : brokerEnvironment.getConfigMaps().values()) {
namespace.configMaps().create(configMap);
}

for (Pod toCreate : brokerEnvironment.getPods().values()) {
deployments.deploy(toCreate);
}
Expand Down
Expand Up @@ -18,11 +18,11 @@
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events.BrokerEvent;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events.BrokerResultListener;
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events.BrokerStatusListener;

/**
* Subscribes to Che plugin broker events, passes future that should be completed upon broker result
* received to {@link BrokerResultListener} and calls next {@link BrokerPhase}.
* received to {@link BrokerStatusListener} and calls next {@link BrokerPhase}.
*
* <p>This API is in <b>Beta</b> and is subject to changes or removal.
*
Expand All @@ -45,14 +45,14 @@ public ListenBrokerEvents(
}

public List<ChePlugin> execute() throws InfrastructureException {
BrokerResultListener brokerResultListener =
new BrokerResultListener(workspaceId, toolingFuture);
BrokerStatusListener brokerStatusListener =
new BrokerStatusListener(workspaceId, toolingFuture);
try {
eventService.subscribe(brokerResultListener, BrokerEvent.class);
eventService.subscribe(brokerStatusListener, BrokerEvent.class);

return nextPhase.execute();
} finally {
eventService.unsubscribe(brokerResultListener);
eventService.unsubscribe(brokerStatusListener);
}
}
}
Expand Up @@ -14,18 +14,18 @@
import com.google.common.annotations.Beta;
import java.util.List;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
import org.eclipse.che.api.workspace.shared.dto.BrokerResultEvent;
import org.eclipse.che.api.workspace.shared.dto.BrokerStatus;
import org.eclipse.che.api.workspace.shared.dto.BrokerStatusChangedEvent;

/**
* Event sent by a plugin broker with results of broker invocation.
*
* <p>This class differs from {@link BrokerResultEvent} it is version of latter with a prettier
* format. It has workspace tooling in a POJO representation instead of stringified JSON.
* <p>This class differs from {@link BrokerStatusChangedEvent} it is version of latter with a
* prettier format. It has workspace tooling in a POJO representation instead of stringified JSON.
*
* <p>This API is in <b>Beta</b> and is subject to changes or removal.
*
* @see BrokerResultEvent
* @see BrokerStatusChangedEvent
*/
@Beta
public class BrokerEvent {
Expand All @@ -37,7 +37,7 @@ public class BrokerEvent {
@SuppressWarnings("unused")
public BrokerEvent() {}

public BrokerEvent(BrokerResultEvent resultEvent, List<ChePlugin> tooling) {
public BrokerEvent(BrokerStatusChangedEvent resultEvent, List<ChePlugin> tooling) {
this.error = resultEvent.getError();
this.status = resultEvent.getStatus();
this.workspaceId = resultEvent.getWorkspaceId();
Expand Down
Expand Up @@ -24,13 +24,13 @@
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
import org.eclipse.che.api.workspace.shared.dto.BrokerResultEvent;
import org.eclipse.che.api.workspace.shared.dto.BrokerStatusChangedEvent;
import org.eclipse.che.commons.annotation.Nullable;
import org.slf4j.Logger;

/**
* Configure JSON_RPC consumers of Che plugin broker events. Also converts {@link BrokerResultEvent}
* to {@link BrokerEvent}.
* Configure JSON_RPC consumers of Che plugin broker events. Also converts {@link
* BrokerStatusChangedEvent} to {@link BrokerEvent}.
*
* <p>This API is in <b>Beta</b> and is subject to changes or removal.
*
Expand Down Expand Up @@ -58,24 +58,22 @@ public void configureMethods(RequestHandlerConfigurator requestHandler) {
requestHandler
.newConfiguration()
.methodName(BROKER_STATUS_CHANGED_METHOD)
.paramsAsDto(BrokerResultEvent.class)
.paramsAsDto(BrokerStatusChangedEvent.class)
.noResult()
.withConsumer(this::handle);

requestHandler
.newConfiguration()
.methodName(BROKER_RESULT_METHOD)
.paramsAsDto(BrokerResultEvent.class)
.paramsAsDto(BrokerStatusChangedEvent.class)
.noResult()
.withConsumer(this::handle);
}

private void handle(BrokerResultEvent event) {
private void handle(BrokerStatusChangedEvent event) {
// Tooling has fields that can't be parsed by DTO and JSON_RPC framework works with DTO only
String encodedTooling = event.getTooling();
if (event.getStatus() == null
|| event.getWorkspaceId() == null
|| (event.getError() == null && event.getTooling() == null)) {
if (event.getStatus() == null || event.getWorkspaceId() == null) {
LOG.error("Broker event skipped due to illegal content: {}", event);
return;
}
Expand Down
Expand Up @@ -11,28 +11,31 @@
*/
package org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events;

import static java.lang.String.format;

import com.google.common.annotations.Beta;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.eclipse.che.api.core.notification.EventSubscriber;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;

/**
* Listens for {@link BrokerEvent} and completes or exceptionally completes a future depending on
* the event state.
* Listens for {@link BrokerEvent} and completes or exceptionally completes a start and done futures
* depending on the event state.
*
* <p>This API is in <b>Beta</b> and is subject to changes or removal.
*
* @author Oleksandr Garagatyi
*/
@Beta
public class BrokerResultListener implements EventSubscriber<BrokerEvent> {
public class BrokerStatusListener implements EventSubscriber<BrokerEvent> {

private final String workspaceId;
private final CompletableFuture<List<ChePlugin>> finishFuture;

public BrokerResultListener(String workspaceId, CompletableFuture<List<ChePlugin>> finishFuture) {
public BrokerStatusListener(String workspaceId, CompletableFuture<List<ChePlugin>> finishFuture) {
this.workspaceId = workspaceId;
this.finishFuture = finishFuture;
}
Expand All @@ -45,12 +48,25 @@ public void onEvent(BrokerEvent event) {

switch (event.getStatus()) {
case DONE:
finishFuture.complete(event.getTooling());
List<ChePlugin> tooling = event.getTooling();
if (tooling != null) {
finishFuture.complete(tooling);
} else {
finishFuture.completeExceptionally(
new InternalInfrastructureException(
format(
"Plugin brokering process for workspace `%s` is finished but plugins list is missing",
workspaceId)));
}
break;
case FAILED:
finishFuture.completeExceptionally(
new InfrastructureException("Broker process failed with error: " + event.getError()));
new InfrastructureException(
format(
"Plugin broking process for workspace %s failed with error: %s",
workspaceId, event.getError())));
break;
case STARTED:
default:
// do nothing
}
Expand Down
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events;

import static java.util.Collections.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
import org.eclipse.che.api.workspace.shared.dto.BrokerStatus;
import org.mockito.Mock;
import org.mockito.testng.MockitoTestNGListener;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Listeners;
import org.testng.annotations.Test;

/**
* Tests {@link BrokerStatusListener}.
*
* @author Sergii Leshchenko
*/
@Listeners(MockitoTestNGListener.class)
public class BrokerStatusListenerTest {

public static final String WORKSPACE_ID = "workspace123";
@Mock private CompletableFuture<List<ChePlugin>> finishFuture;

private BrokerStatusListener brokerStatusListener;

@BeforeMethod
public void setUp() {
brokerStatusListener = new BrokerStatusListener(WORKSPACE_ID, finishFuture);
}

@Test
public void shouldDoNothingIfEventWithForeignWorkspaceIdIsReceived() {
// given
BrokerEvent event = new BrokerEvent().withWorkspaceId("foreignWorkspace");

// when
brokerStatusListener.onEvent(event);

// then
verifyNoMoreInteractions(finishFuture);
}

@Test
public void shouldDoNothingWhenStartedEventIsReceived() {
// given
BrokerEvent event =
new BrokerEvent().withWorkspaceId(WORKSPACE_ID).withStatus(BrokerStatus.STARTED);

// when
brokerStatusListener.onEvent(event);

// then
verifyNoMoreInteractions(finishFuture);
}

@Test
public void shouldCompleteFinishFutureWhenDoneEventIsReceivedAndToolingIsNotNull() {
// given
BrokerEvent event =
new BrokerEvent()
.withWorkspaceId(WORKSPACE_ID)
.withStatus(BrokerStatus.DONE)
.withTooling(emptyList());

// when
brokerStatusListener.onEvent(event);

// then
verify(finishFuture).complete(emptyList());
}

@Test
public void shouldCompleteExceptionallyFinishFutureWhenDoneEventIsReceivedButToolingIsNull() {
// given
BrokerEvent event =
new BrokerEvent()
.withWorkspaceId(WORKSPACE_ID)
.withStatus(BrokerStatus.DONE)
.withTooling(null);

// when
brokerStatusListener.onEvent(event);

// then
verify(finishFuture).completeExceptionally(any(InternalInfrastructureException.class));
}

@Test
public void shouldCompleteExceptionallyFinishFutureWhenFailedEventIsReceived() {
// given
BrokerEvent event =
new BrokerEvent()
.withWorkspaceId(WORKSPACE_ID)
.withStatus(BrokerStatus.FAILED)
.withError("error");

// when
brokerStatusListener.onEvent(event);

// then
verify(finishFuture).completeExceptionally(any(InfrastructureException.class));
}
}
Expand Up @@ -18,7 +18,7 @@
import javax.inject.Singleton;
import org.eclipse.che.api.core.ForbiddenException;
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerManager;
import org.eclipse.che.api.workspace.shared.dto.BrokerResultEvent;
import org.eclipse.che.api.workspace.shared.dto.BrokerStatusChangedEvent;
import org.eclipse.che.commons.env.EnvironmentContext;
import org.eclipse.che.commons.subject.Subject;
import org.eclipse.che.multiuser.api.permission.server.jsonrpc.JsonRpcPermissionsFilterAdapter;
Expand All @@ -44,7 +44,7 @@ public void doAccept(String method, Object... params) throws ForbiddenException
switch (method) {
case BROKER_STATUS_CHANGED_METHOD:
case BROKER_RESULT_METHOD:
workspaceId = ((BrokerResultEvent) params[0]).getWorkspaceId();
workspaceId = ((BrokerStatusChangedEvent) params[0]).getWorkspaceId();
break;
default:
throw new ForbiddenException("Unknown method is configured to be filtered.");
Expand Down
Expand Up @@ -18,7 +18,7 @@

import org.eclipse.che.api.core.ForbiddenException;
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerManager;
import org.eclipse.che.api.workspace.shared.dto.BrokerResultEvent;
import org.eclipse.che.api.workspace.shared.dto.BrokerStatusChangedEvent;
import org.eclipse.che.commons.env.EnvironmentContext;
import org.eclipse.che.commons.subject.Subject;
import org.eclipse.che.dto.server.DtoFactory;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void shouldThrowExceptionIfUserDoesNotHaveRunPermission(String method) th

// when
permissionFilter.doAccept(
method, DtoFactory.newDto(BrokerResultEvent.class).withWorkspaceId("ws123"));
method, DtoFactory.newDto(BrokerStatusChangedEvent.class).withWorkspaceId("ws123"));
}

@Test(dataProvider = "coveredMethods")
Expand All @@ -89,7 +89,7 @@ public void shouldDoNothingIfUserHasRunPermissions(String method) throws Excepti

// when
permissionFilter.doAccept(
method, DtoFactory.newDto(BrokerResultEvent.class).withWorkspaceId("ws123"));
method, DtoFactory.newDto(BrokerStatusChangedEvent.class).withWorkspaceId("ws123"));
}

@Test(
Expand All @@ -98,7 +98,7 @@ public void shouldDoNothingIfUserHasRunPermissions(String method) throws Excepti
public void shouldThrowExceptionIfUnknownMethodIsInvoking() throws Exception {
// when
permissionFilter.doAccept(
"unknown", DtoFactory.newDto(BrokerResultEvent.class).withWorkspaceId("ws123"));
"unknown", DtoFactory.newDto(BrokerStatusChangedEvent.class).withWorkspaceId("ws123"));
}

@DataProvider
Expand Down

0 comments on commit d9c210b

Please sign in to comment.