Skip to content

Commit

Permalink
Add infrastructure output endpoint for messages/logs (#5466)
Browse files Browse the repository at this point in the history
  • Loading branch information
akorneta committed Jul 4, 2017
1 parent 72fd6d0 commit 8b4eaaf
Show file tree
Hide file tree
Showing 16 changed files with 471 additions and 847 deletions.
Expand Up @@ -134,6 +134,8 @@ protected void configure() {
bind(org.eclipse.che.api.workspace.server.event.RuntimeStatusJsonRpcMessenger.class).asEagerSingleton();
bind(org.eclipse.che.api.workspace.server.event.MachineStatusJsonRpcMessenger.class).asEagerSingleton();
bind(org.eclipse.che.api.workspace.server.event.ServerStatusJsonRpcMessenger.class).asEagerSingleton();
bind(org.eclipse.che.api.workspace.server.event.InstallerLogJsonRpcMessenger.class).asEagerSingleton();
bind(org.eclipse.che.api.workspace.server.event.MachineLogJsonRpcMessenger.class).asEagerSingleton();
//

bind(org.eclipse.che.security.oauth.OAuthAuthenticatorProvider.class)
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.che.api.workspace.shared.dto.RuntimeIdentityDto;
import org.eclipse.che.api.workspace.shared.dto.event.BootstrapperStatusEvent;
import org.eclipse.che.commons.lang.TarUtils;
import org.eclipse.che.workspace.infrastructure.docker.output.OutputEndpoint;
import org.eclipse.che.workspace.infrastructure.docker.server.InstallerEndpoint;

import javax.inject.Inject;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class Bootstrapper {
private final int bootstrappingTimeoutMinutes;
private final int serverCheckPeriodSeconds;
private final int installerTimeoutSeconds;
private final String outputEndpoint;
private final String installerEndpoint;
private final EventService eventService;
private final EventSubscriber<BootstrapperStatusEvent> bootstrapperStatusListener;
Expand All @@ -83,6 +85,7 @@ public Bootstrapper(@Assisted String machineName,
this.dockerMachine = dockerMachine;
this.agents = agents;
this.installerEndpoint = websocketBaseEndpoint + InstallerEndpoint.INSTALLER_WEBSOCKET_ENDPOINT_BASE;
this.outputEndpoint = websocketBaseEndpoint + OutputEndpoint.OUTPUT_WEBSOCKET_ENDPOINT_BASE;
this.bootstrappingTimeoutMinutes = bootstrappingTimeoutMinutes;
this.serverCheckPeriodSeconds = serverCheckPeriodSeconds;
this.installerTimeoutSeconds = installerTimeoutSeconds;
Expand Down Expand Up @@ -124,14 +127,13 @@ public void bootstrap() throws InfrastructureException {

this.eventService.subscribe(bootstrapperStatusListener, BootstrapperStatusEvent.class);
try {
String endpoint = installerEndpoint + ENDPOINT_IDS.getAndIncrement();
dockerMachine.exec(BOOTSTRAPPER_DIR + BOOTSTRAPPER_FILE +
" -machine-name " + machineName +
" -runtime-id " + String.format("%s:%s:%s", runtimeIdentity.getWorkspaceId(),
runtimeIdentity.getEnvName(),
runtimeIdentity.getOwner()) +
" -push-endpoint " + endpoint +
" -push-logs-endpoint " + endpoint +
" -push-endpoint " + installerEndpoint + ENDPOINT_IDS.getAndIncrement() +
" -push-logs-endpoint " + outputEndpoint + ENDPOINT_IDS.getAndIncrement() +
" -server-check-period " + serverCheckPeriodSeconds +
" -installer-timeout " + installerTimeoutSeconds +
" -file " + BOOTSTRAPPER_DIR + CONFIG_FILE,
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.che.workspace.infrastructure.docker.config.proxy.DockerProxyModule;
import org.eclipse.che.workspace.infrastructure.docker.config.volume.ExtraVolumeModule;
import org.eclipse.che.workspace.infrastructure.docker.environment.DockerEnvironmentTypeModule;
import org.eclipse.che.workspace.infrastructure.docker.output.OutputService;
import org.eclipse.che.workspace.infrastructure.docker.service.InstallerService;
import org.eclipse.che.workspace.infrastructure.docker.strategy.ServerEvaluationStrategyModule;

Expand Down Expand Up @@ -92,6 +93,7 @@ protected void configure() {
devMachineEnvVars.addBinding().toProvider(JavaOptsEnvVariableProvider.class);
allMachinesEnvVars.addBinding().toProvider(ApiEndpointEnvVariableProvider.class);

bind(OutputService.class);
bind(InstallerService.class);

install(new DnsResolversModule());
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.eclipse.che.plugin.docker.client.Exec;
import org.eclipse.che.plugin.docker.client.LogMessage;
import org.eclipse.che.plugin.docker.client.MessageProcessor;
import org.eclipse.che.plugin.docker.client.ProgressLineFormatterImpl;
import org.eclipse.che.plugin.docker.client.ProgressMonitor;
import org.eclipse.che.plugin.docker.client.json.ContainerInfo;
import org.eclipse.che.plugin.docker.client.params.CommitParams;
import org.eclipse.che.plugin.docker.client.params.CreateExecParams;
Expand Down Expand Up @@ -98,6 +98,7 @@ public class DockerMachine implements Machine {
private final boolean snapshotUseRegistry;
private final ContainerInfo info;
private final ServerEvaluationStrategyProvider provider;
private final ProgressMonitor progressMonitor;

private Map<String, ServerImpl> servers;

Expand All @@ -109,14 +110,16 @@ public DockerMachine(DockerConnector docker,
@Assisted("container") String container,
@Assisted("image") String image,
ServerEvaluationStrategyProvider provider,
DockerMachineStopDetector dockerMachineStopDetector) throws InfrastructureException {
DockerMachineStopDetector dockerMachineStopDetector,
ProgressMonitor progressMonitor) throws InfrastructureException {
this.container = container;
this.docker = docker;
this.image = image;
this.registry = registry;
this.registryNamespace = registryNamespace;
this.snapshotUseRegistry = snapshotUseRegistry;
this.dockerMachineStopDetector = dockerMachineStopDetector;
this.progressMonitor = progressMonitor;
try {
this.info = docker.inspectContainer(container);
} catch (IOException e) {
Expand Down Expand Up @@ -223,14 +226,7 @@ public DockerMachineSource saveToSnapshot() throws SnapshotException {
commitContainer(fullRepo, LATEST_TAG);
//TODO fix this workaround. Docker image is not visible after commit when using swarm
Thread.sleep(2000);
final ProgressLineFormatterImpl lineFormatter = new ProgressLineFormatterImpl();
final String digest = docker.push(pushParams,
progressMonitor -> {
// try {
// outputConsumer.writeLine(lineFormatter.format(progressMonitor));
// } catch (IOException ignored) {
// }
});
final String digest = docker.push(pushParams, progressMonitor);
docker.removeImage(RemoveImageParams.create(fullRepo).withForce(false));
return new DockerMachineSource(image).withRegistry(registry).withDigest(digest).withTag(LATEST_TAG);
} catch (IOException ioEx) {
Expand Down
Expand Up @@ -14,6 +14,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.eclipse.che.api.core.model.workspace.config.ServerConfig;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
Expand All @@ -24,10 +25,14 @@
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
import org.eclipse.che.commons.annotation.Nullable;
import org.eclipse.che.commons.env.EnvironmentContext;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.eclipse.che.commons.lang.os.WindowsPathEscaper;
import org.eclipse.che.plugin.docker.client.DockerConnector;
import org.eclipse.che.plugin.docker.client.LogMessage;
import org.eclipse.che.plugin.docker.client.MessageProcessor;
import org.eclipse.che.plugin.docker.client.ProgressMonitor;
import org.eclipse.che.plugin.docker.client.UserSpecificDockerRegistryCredentialsProvider;
import org.eclipse.che.plugin.docker.client.exception.ContainerNotFoundException;
import org.eclipse.che.plugin.docker.client.exception.ImageNotFoundException;
import org.eclipse.che.plugin.docker.client.json.ContainerConfig;
import org.eclipse.che.plugin.docker.client.json.ContainerInfo;
Expand All @@ -41,6 +46,7 @@
import org.eclipse.che.plugin.docker.client.json.network.EndpointConfig;
import org.eclipse.che.plugin.docker.client.params.BuildImageParams;
import org.eclipse.che.plugin.docker.client.params.CreateContainerParams;
import org.eclipse.che.plugin.docker.client.params.GetContainerLogsParams;
import org.eclipse.che.plugin.docker.client.params.ListImagesParams;
import org.eclipse.che.plugin.docker.client.params.PullParams;
import org.eclipse.che.plugin.docker.client.params.RemoveContainerParams;
Expand All @@ -59,17 +65,21 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.regex.Pattern;

import static java.lang.String.format;
import static java.lang.Thread.sleep;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -118,8 +128,7 @@ public class DockerMachineStarter {

private final DockerConnector docker;
private final UserSpecificDockerRegistryCredentialsProvider dockerCredentials;
// TODO spi fix in #5102
// private final ExecutorService executor;
private final ExecutorService executor;
private final DockerMachineStopDetector dockerInstanceStopDetector;
private final boolean doForcePullImage;
private final boolean privilegedMode;
Expand All @@ -144,6 +153,7 @@ public class DockerMachineStarter {
private final String[] dnsResolvers;
private ServerEvaluationStrategyProvider serverEvaluationStrategyProvider;
private final Map<String, String> buildArgs;
private final MachineLoggersFactory machineLoggerFactory;

@Inject
public DockerMachineStarter(DockerConnector docker,
Expand Down Expand Up @@ -171,7 +181,8 @@ public DockerMachineStarter(DockerConnector docker,
@Named("che.docker.extra_hosts") Set<Set<String>> additionalHosts,
@Nullable @Named("che.docker.dns_resolvers") String[] dnsResolvers,
ServerEvaluationStrategyProvider serverEvaluationStrategyProvider,
@Named("che.docker.build_args") Map<String, String> buildArgs) {
@Named("che.docker.build_args") Map<String, String> buildArgs,
MachineLoggersFactory machineLogger) {
// TODO spi should we move all configuration stuff into infrastructure provisioner and left logic of container start here only
this.docker = docker;
this.dockerCredentials = dockerCredentials;
Expand Down Expand Up @@ -199,6 +210,7 @@ public DockerMachineStarter(DockerConnector docker,
this.dnsResolvers = dnsResolvers;
this.buildArgs = buildArgs;
this.serverEvaluationStrategyProvider = serverEvaluationStrategyProvider;
this.machineLoggerFactory = machineLogger;

allMachinesSystemVolumes = removeEmptyAndNullValues(allMachinesSystemVolumes);
devMachineSystemVolumes = removeEmptyAndNullValues(devMachineSystemVolumes);
Expand Down Expand Up @@ -258,14 +270,12 @@ public DockerMachineStarter(DockerConnector docker,
.flatMap(Set::stream)
.collect(toSet());

// TODO spi fix in #5102
// single point of failure in case of highly loaded system
/*executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("MachineLogsStreamer-%d")
.setUncaughtExceptionHandler(
LoggingUncaughtExceptionHandler
.getInstance())
.setDaemon(true)
.build());*/
executor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("MachineLogsStreamer-%d")
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setDaemon(true)
.build());
}

/**
Expand Down Expand Up @@ -298,24 +308,10 @@ public DockerMachine startService(String networkName,

// copy to not affect/be affected by changes in origin
containerConfig = new DockerContainerConfig(containerConfig);

// TODO spi fix in #5102
ProgressMonitor progressMonitor = ProgressMonitor.DEV_NULL;
/*LineConsumer machineLogger = new ListLineConsumer();
ProgressLineFormatterImpl progressLineFormatter = new ProgressLineFormatterImpl();
ProgressMonitor progressMonitor = currentProgressStatus -> {
try {
machineLogger.writeLine(progressLineFormatter.format(currentProgressStatus));
} catch (IOException e) {
LOG.error(e.getLocalizedMessage(), e);
}
};*/

final ProgressMonitor progressMonitor = machineLoggerFactory.newProgressMonitor(machineName, identity);
String container = null;
try {
String image = prepareImage(machineName,
containerConfig,
progressMonitor);
String image = prepareImage(machineName, containerConfig, progressMonitor);

container = createContainer(workspaceId,
machineName,
Expand All @@ -324,18 +320,16 @@ public DockerMachine startService(String networkName,
networkName,
containerConfig);

connectContainerToAdditionalNetworks(container,
containerConfig);
connectContainerToAdditionalNetworks(container, containerConfig);

docker.startContainer(StartContainerParams.create(container));

checkContainerIsRunning(container);

// TODO spi fix in #5102
/*readContainerLogsInSeparateThread(container,
readContainerLogsInSeparateThread(container,
workspaceId,
service.getId(),
machineLogger);*/
containerConfig.getId(),
machineLoggerFactory.newLogsProcessor(machineName, identity));

dockerInstanceStopDetector.startDetection(container,
containerConfig.getId(),
Expand All @@ -348,7 +342,8 @@ public DockerMachine startService(String networkName,
container,
image,
serverEvaluationStrategyProvider,
dockerInstanceStopDetector);
dockerInstanceStopDetector,
progressMonitor);
} catch (RuntimeException | IOException | InfrastructureException e) {
cleanUpContainer(container);
if (e instanceof InfrastructureException) {
Expand Down Expand Up @@ -655,11 +650,10 @@ protected void checkContainerIsRunning(String container) throws IOException, Inf
}
}

// TODO spi fix in #5102
/*private void readContainerLogsInSeparateThread(String container,
private void readContainerLogsInSeparateThread(String container,
String workspaceId,
String machineId,
LineConsumer outputConsumer) {
MessageProcessor<LogMessage> logsProcessor) {
executor.execute(() -> {
long lastProcessedLogDate = 0;
boolean isContainerRunning = true;
Expand All @@ -670,7 +664,7 @@ protected void checkContainerIsRunning(String container) throws IOException, Inf
docker.getContainerLogs(GetContainerLogsParams.create(container)
.withFollow(true)
.withSince(lastProcessedLogDate),
new LogMessagePrinter(outputConsumer));
logsProcessor);
isContainerRunning = false;
} catch (SocketTimeoutException ste) {
lastProcessedLogDate = System.currentTimeMillis() / 1000L;
Expand Down Expand Up @@ -712,7 +706,7 @@ protected void checkContainerIsRunning(String container) throws IOException, Inf
}
}
});
}*/
}

private void cleanUpContainer(String containerId) {
try {
Expand Down

0 comments on commit 8b4eaaf

Please sign in to comment.