Skip to content

Commit

Permalink
Use single unified plugin broker for all the plugins (#12942)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksandr Garagatyi <ogaragat@redhat.com>
  • Loading branch information
Oleksandr Garagatyi committed Mar 25, 2019
1 parent 1950315 commit 26f21eb
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 261 deletions.
Expand Up @@ -546,10 +546,8 @@ che.singleport.wildcard_domain.ipless=false

# Docker image of Che plugin broker app that resolves workspace tooling configuration and copies
# plugins dependencies to a workspace
che.workspace.plugin_broker.image=eclipse/che-plugin-broker:v0.14.0
che.workspace.plugin_broker.theia.image=eclipse/che-theia-plugin-broker:v0.14.0
che.workspace.plugin_broker.init.image=eclipse/che-init-plugin-broker:v0.14.0
che.workspace.plugin_broker.vscode.image=eclipse/che-vscode-extension-broker:v0.14.0
che.workspace.plugin_broker.unified.image=eclipse/che-unified-plugin-broker:v0.15.1

# Docker image of Che plugin broker app that resolves workspace tooling configuration and copies
# plugins dependencies to a workspace
Expand Down
Expand Up @@ -19,12 +19,10 @@
import static org.eclipse.che.workspace.infrastructure.kubernetes.server.external.SingleHostIngressExternalServerExposer.SINGLE_HOST_STRATEGY;

import com.google.inject.AbstractModule;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Names;
import java.util.Map;
import org.eclipse.che.api.system.server.ServiceTermination;
import org.eclipse.che.api.workspace.server.NoEnvironmentFactory;
Expand Down Expand Up @@ -170,24 +168,5 @@ protected void configure() {

bind(SidecarToolingProvisioner.class)
.to(new TypeLiteral<SidecarToolingProvisioner<KubernetesEnvironment>>() {});

MapBinder<String, String> pluginBrokers =
MapBinder.newMapBinder(
binder(),
String.class,
String.class,
Names.named("che.workspace.plugin_broker.images"));
pluginBrokers
.addBinding("Che Plugin")
.to(Key.get(String.class, Names.named("che.workspace.plugin_broker.image")));
pluginBrokers
.addBinding("Che Editor")
.to(Key.get(String.class, Names.named("che.workspace.plugin_broker.image")));
pluginBrokers
.addBinding("Theia plugin")
.to(Key.get(String.class, Names.named("che.workspace.plugin_broker.theia.image")));
pluginBrokers
.addBinding("VS Code extension")
.to(Key.get(String.class, Names.named("che.workspace.plugin_broker.vscode.image")));
}
}
Expand Up @@ -14,14 +14,12 @@
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;

Expand All @@ -30,32 +28,11 @@
public class BrokersResult {

private final CompletableFuture<List<ChePlugin>> future;
private final AtomicInteger brokersNumber;
private final AtomicBoolean started;
private final List<ChePlugin> plugins;

public BrokersResult() {
future = new CompletableFuture<>();
brokersNumber = new AtomicInteger();
started = new AtomicBoolean();
plugins = Collections.synchronizedList(new ArrayList<>());
}

/**
* Notifies {@code BrokerResult} that one more broker will be launched and we need to wait
* response from it.
*
* <p>It should be called before the call of {@link #get(long, TimeUnit)}, otherwise {@link
* IllegalStateException} would be thrown
*
* @throws IllegalStateException if called after call of {@link #get(long, TimeUnit)}
*/
public void oneMoreBroker() {
if (started.get()) {
throw new IllegalStateException(
"Call of BrokerResult#oneMoreBroker is not allowed after call BrokerResult#get");
}
brokersNumber.incrementAndGet();
}

/**
Expand All @@ -76,42 +53,34 @@ public void error(Exception e) {
}

/**
* Submits a result of a broker execution.
*
* <p>It also count down the number of brokers that are waited for the result submission.
* Submits the result of a broker execution.
*
* @param toolingFromBroker tooling evaluated by a broker that needs to be added into a workspace
* @throws InfrastructureException if called more times than {@link #oneMoreBroker()} which
* indicates incorrect usage of the {@link BrokersResult}
* @throws InfrastructureException if called second time which indicates incorrect usage of the
* {@link BrokersResult}
* @throws IllegalStateException if called before the call of {@link #get(long, TimeUnit)}
*/
public void addResult(List<ChePlugin> toolingFromBroker) throws InfrastructureException {
public void setResult(List<ChePlugin> toolingFromBroker) throws InfrastructureException {
if (!started.get()) {
throw new IllegalStateException(
"Submitting a broker result is not allowed before calling BrokerResult#get");
}
int previousBrokersNumber = brokersNumber.getAndDecrement();
if (previousBrokersNumber == 0) {
if (future.isDone()) {
throw new InfrastructureException(
"Broker result is submitted when no more results are expected");
}
plugins.addAll(toolingFromBroker);
if (previousBrokersNumber == 1) {
future.complete(new ArrayList<>(plugins));
"Plugins brokering result is unexpectedly submitted more than one time. This indicates unexpected behavior of the system");
}
future.complete(new ArrayList<>(toolingFromBroker));
}

/**
* Waits for the tooling that needs to be injected into a workspace being submitted by calls of
* {@link #addResult(List)}.
* Waits for the tooling that needs to be injected into a workspace being submitted by a call of
* {@link #setResult(List)}.
*
* <p>Number of calls of {@link #addResult(List)} needs to be the same as number of calls of
* {@link #oneMoreBroker()}. Returned list is a combination of lists submitted to {@link
* #addResult(List)}. If provided timeout elapses before all needed calls of {@link
* #addResult(List)} method ends with an exception. This method is based on {@link
* CompletableFuture#get(long, TimeUnit)} so it also inherits parameters and thrown exception.
* <p>If provided timeout elapses before needed call of {@link #setResult(List)} method ends with
* an exception. This method is based on {@link CompletableFuture#get(long, TimeUnit)} so it also
* inherits parameters and thrown exception.
*
* @return tooling submitted by one or several brokers that needs to be injected into a workspace
* @return tooling submitted by broker that needs to be injected into a workspace
* @throws IllegalStateException if called more than one time
* @see CompletableFuture#get(long, TimeUnit)
*/
Expand Down
Expand Up @@ -11,16 +11,12 @@
*/
package org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.brokerphases;

import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.util.Collections.singletonMap;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Container;
Expand All @@ -36,7 +32,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
Expand Down Expand Up @@ -82,7 +77,7 @@ public abstract class BrokerEnvironmentFactory<E extends KubernetesEnvironment>
private final String brokerPullPolicy;
private final AgentAuthEnableEnvVarProvider authEnableEnvVarProvider;
private final MachineTokenEnvVarProvider machineTokenEnvVarProvider;
private final Map<String, String> pluginTypeToImage;
private final String unifiedBrokerImage;
private final String initBrokerImage;

@Inject
Expand All @@ -91,13 +86,13 @@ public BrokerEnvironmentFactory(
@Named("che.workspace.plugin_broker.pull_policy") String brokerPullPolicy,
AgentAuthEnableEnvVarProvider authEnableEnvVarProvider,
MachineTokenEnvVarProvider machineTokenEnvVarProvider,
@Named("che.workspace.plugin_broker.images") Map<String, String> pluginTypeToImage,
@Named("che.workspace.plugin_broker.unified.image") String unifiedBrokerImage,
@Named("che.workspace.plugin_broker.init.image") String initBrokerImage) {
this.cheWebsocketEndpoint = cheWebsocketEndpoint;
this.brokerPullPolicy = brokerPullPolicy;
this.authEnableEnvVarProvider = authEnableEnvVarProvider;
this.machineTokenEnvVarProvider = machineTokenEnvVarProvider;
this.pluginTypeToImage = pluginTypeToImage;
this.unifiedBrokerImage = unifiedBrokerImage;
this.initBrokerImage = initBrokerImage;
}

Expand Down Expand Up @@ -126,32 +121,25 @@ public E create(
.map(this::asEnvVar)
.collect(Collectors.toList());

Multimap<String, PluginMeta> brokersImageToMetas = sortByBrokerImage(pluginsMeta);
for (Entry<String, Collection<PluginMeta>> brokerImageToMetas :
brokersImageToMetas.asMap().entrySet()) {
BrokerConfig brokerConfig =
createBrokerConfig(
runtimeID, brokerImageToMetas.getValue(), envVars, brokerImageToMetas.getKey(), pod);

brokersConfigs.machines.put(brokerConfig.machineName, brokerConfig.machineConfig);
brokersConfigs.configMaps.put(brokerConfig.configMapName, brokerConfig.configMap);
spec.getContainers().add(brokerConfig.container);
spec.getVolumes()
.add(
new VolumeBuilder()
.withName(brokerConfig.configMapVolume)
.withNewConfigMap()
.withName(brokerConfig.configMapName)
.endConfigMap()
.build());

brokersResult.oneMoreBroker();
}
BrokerConfig brokerConfig =
createBrokerConfig(runtimeID, pluginsMeta, envVars, unifiedBrokerImage, pod);
brokersConfigs.machines.put(brokerConfig.machineName, brokerConfig.machineConfig);
brokersConfigs.configMaps.put(brokerConfig.configMapName, brokerConfig.configMap);
spec.getContainers().add(brokerConfig.container);
spec.getVolumes()
.add(
new VolumeBuilder()
.withName(brokerConfig.configMapVolume)
.withNewConfigMap()
.withName(brokerConfig.configMapName)
.endConfigMap()
.build());

// Add init broker that cleans up /plugins
BrokerConfig brokerConfig = createBrokerConfig(runtimeID, null, envVars, initBrokerImage, pod);
pod.getSpec().getInitContainers().add(brokerConfig.container);
brokersConfigs.machines.put(brokerConfig.machineName, brokerConfig.machineConfig);
BrokerConfig initBrokerConfig =
createBrokerConfig(runtimeID, null, envVars, initBrokerImage, pod);
pod.getSpec().getInitContainers().add(initBrokerConfig.container);
brokersConfigs.machines.put(initBrokerConfig.machineName, initBrokerConfig.machineConfig);

return doCreate(brokersConfigs);
}
Expand Down Expand Up @@ -249,31 +237,8 @@ private BrokerConfig createBrokerConfig(
return brokerConfig;
}

private Multimap<String, PluginMeta> sortByBrokerImage(Collection<PluginMeta> pluginMetas)
throws InfrastructureException {
Multimap<String, PluginMeta> sortedPlugins = ArrayListMultimap.create();
for (PluginMeta pluginMeta : pluginMetas) {
String type = pluginMeta.getType();
if (isNullOrEmpty(type)) {
throw new InfrastructureException(
format(
"Plugin '%s:%s' has invalid type '%s'",
pluginMeta.getId(), pluginMeta.getVersion(), type));
}
String image = pluginTypeToImage.get(type);
if (isNullOrEmpty(image)) {
throw new InfrastructureException(
format(
"Plugin '%s:%s' has unsupported type '%s'",
pluginMeta.getId(), pluginMeta.getVersion(), type));
}
sortedPlugins.put(image, pluginMeta);
}

return sortedPlugins;
}

private static class BrokerConfig {

String configMapName;
ConfigMap configMap;
Container container;
Expand All @@ -283,6 +248,7 @@ private static class BrokerConfig {
}

public static class BrokersConfigs {

public Map<String, InternalMachineConfig> machines;
public Map<String, ConfigMap> configMaps;
public Pod pod;
Expand Down
Expand Up @@ -14,7 +14,6 @@
import static java.util.Collections.singletonMap;

import com.google.common.annotations.Beta;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.che.api.workspace.server.spi.provision.env.AgentAuthEnableEnvVarProvider;
Expand All @@ -39,13 +38,13 @@ public KubernetesBrokerEnvironmentFactory(
AgentAuthEnableEnvVarProvider authEnableEnvVarProvider,
MachineTokenEnvVarProvider machineTokenEnvVarProvider,
@Named("che.workspace.plugin_broker.init.image") String initBrokerImage,
@Named("che.workspace.plugin_broker.images") Map<String, String> pluginTypeToImage) {
@Named("che.workspace.plugin_broker.unified.image") String unifiedBrokerImage) {
super(
cheWebsocketEndpoint,
brokerPullPolicy,
authEnableEnvVarProvider,
machineTokenEnvVarProvider,
pluginTypeToImage,
unifiedBrokerImage,
initBrokerImage);
}

Expand Down
Expand Up @@ -69,7 +69,7 @@ public void onEvent(BrokerEvent event) {
return;
}
try {
brokersResult.addResult(tooling);
brokersResult.setResult(tooling);
} catch (InfrastructureException e) {
LOG.error(e.getLocalizedMessage(), e);
}
Expand Down

0 comments on commit 26f21eb

Please sign in to comment.