diff --git a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ExecApi.kt b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ExecApi.kt index 29a255b5..9f5a71be 100644 --- a/api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ExecApi.kt +++ b/api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ExecApi.kt @@ -31,6 +31,7 @@ import de.gesellix.docker.remote.api.core.ServerError import de.gesellix.docker.remote.api.core.ServerException import de.gesellix.docker.remote.api.core.StreamCallback import de.gesellix.docker.remote.api.core.Success +import de.gesellix.docker.remote.api.core.SuccessBidirectionalStream import de.gesellix.docker.remote.api.core.SuccessStream import kotlinx.coroutines.cancel import kotlinx.coroutines.launch @@ -239,11 +240,6 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox ) { val localVariableConfig = execStartRequestConfig(id = id, execStartConfig = execStartConfig) - val expectMultiplexedResponse: Boolean = if (execStartConfig?.tty != null) { - !(execStartConfig.tty ?: false) - } else { - !(execInspect(id).processConfig?.tty ?: false) - } val localVarResponse = requestFrames( localVariableConfig ) @@ -256,18 +252,32 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox val actualCallback = callback ?: LoggingCallback() when (localVarResponse.responseType) { - ResponseType.Success -> { - runBlocking { - launch { - withTimeoutOrNull(timeout.toMillis()) { - actualCallback.onStarting(this@launch::cancel) - (localVarResponse as SuccessStream).data.collect { actualCallback.onNext(it) } - actualCallback.onFinished() + ResponseType.Success, + ResponseType.Informational -> { + when (localVarResponse) { + is SuccessBidirectionalStream -> + runBlocking { + launch { + withTimeoutOrNull(timeout.toMillis()) { + actualCallback.onStarting(this@launch::cancel) + actualCallback.attachInput(localVarResponse.socket.sink) + localVarResponse.data.collect { actualCallback.onNext(it) } + actualCallback.onFinished() + } + } + } + else -> + runBlocking { + launch { + withTimeoutOrNull(timeout.toMillis()) { + actualCallback.onStarting(this@launch::cancel) + (localVarResponse as SuccessStream).data.collect { actualCallback.onNext(it) } + actualCallback.onFinished() + } + } } - } } } - ResponseType.Informational -> throw UnsupportedOperationException("Client does not support Informational responses.") ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.") ResponseType.ClientError -> { val localVarError = localVarResponse as ClientError<*> @@ -292,6 +302,18 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox val localVariableQuery: MultiValueMap = mutableMapOf() val localVariableHeaders: MutableMap = mutableMapOf() +// val expectMultiplexedResponse: Boolean = if (execStartConfig?.tty != null) { +// !(execStartConfig.tty ?: false) +// } else { +// !(execInspect(id).processConfig?.tty ?: false) +// } + val requiresConnectionUpgrade = execStartConfig?.tty != null && execStartConfig.tty!! + if (requiresConnectionUpgrade) + localVariableHeaders.apply { + put("Connection", "Upgrade") + put("Upgrade", "tcp") + } + return RequestConfig( method = POST, path = "/exec/{id}/start".replace("{" + "id" + "}", id), diff --git a/api-client/src/test/java/de/gesellix/docker/remote/api/client/ExecApiIntegrationTest.java b/api-client/src/test/java/de/gesellix/docker/remote/api/client/ExecApiIntegrationTest.java index f79edd87..70c4395f 100644 --- a/api-client/src/test/java/de/gesellix/docker/remote/api/client/ExecApiIntegrationTest.java +++ b/api-client/src/test/java/de/gesellix/docker/remote/api/client/ExecApiIntegrationTest.java @@ -6,19 +6,37 @@ import de.gesellix.docker.remote.api.ExecInspectResponse; import de.gesellix.docker.remote.api.ExecStartConfig; import de.gesellix.docker.remote.api.IdResponse; +import de.gesellix.docker.remote.api.core.Frame; import de.gesellix.docker.remote.api.testutil.DockerEngineAvailable; import de.gesellix.docker.remote.api.testutil.InjectDockerClient; import de.gesellix.docker.remote.api.testutil.TestImage; +import okio.BufferedSink; +import okio.Okio; +import okio.Sink; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static de.gesellix.docker.remote.api.testutil.Constants.LABEL_KEY; import static de.gesellix.docker.remote.api.testutil.Constants.LABEL_VALUE; +import static de.gesellix.docker.remote.api.testutil.Failsafe.perform; import static de.gesellix.docker.remote.api.testutil.Failsafe.removeContainer; +import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.time.Duration; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; @DockerEngineAvailable class ExecApiIntegrationTest { @@ -83,4 +101,105 @@ public void containerExecNonInteractive() { removeContainer(engineApiClient, "container-exec-test"); } + + @Test + public void containerExecInteractive() { + removeContainer(engineApiClient, "container-exec-interactive-test"); + + imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null); + + ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest( + null, null, null, + true, true, true, + null, + true, true, null, + null, + null, + null, + null, + testImage.getImageWithTag(), + null, null, null, + null, null, + null, + singletonMap(LABEL_KEY, LABEL_VALUE), + null, null, + null, + null, + null + ); + containerApi.containerCreate(containerCreateRequest, "container-exec-interactive-test"); + containerApi.containerStart("container-exec-interactive-test", null); + + IdResponse exec = execApi.containerExec( + "container-exec-interactive-test", + new ExecConfig(true, true, true, null, null, true, + null, + singletonList("/cat"), + null, null, null)); + assertNotNull(exec.getId()); + + Duration timeout = Duration.of(5, SECONDS); + LogFrameStreamCallback callback = new LogFrameStreamCallback() { + @Override + public void attachInput(Sink sink) { + System.out.println("attachInput, sending data..."); + new Thread(() -> { + BufferedSink buffer = Okio.buffer(sink); + try { + buffer.writeUtf8("hello echo\n"); + buffer.flush(); + System.out.println("... data sent"); + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Failed to write to stdin: " + e.getMessage()); + } finally { + try { + Thread.sleep(100); + sink.close(); + } catch (Exception ignored) { + // ignore + } + } + }).start(); + } + }; + + new Thread(() -> execApi.execStart( + exec.getId(), + new ExecStartConfig(false, true, null), + callback, timeout.toMillis())).start(); + + CountDownLatch wait = new CountDownLatch(1); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + if (callback.job != null) { + callback.job.cancel(); + } + wait.countDown(); + } + }, 5000); + + try { + wait.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + ExecInspectResponse execInspect = execApi.execInspect(exec.getId()); + assertTrue(execInspect.getRunning()); + + assertSame(Frame.StreamType.RAW, callback.frames.stream().findAny().get().getStreamType()); + assertEquals( + "hello echo\nhello echo".replaceAll("[\\n\\r]", ""), + callback.frames.stream().map(Frame::getPayloadAsString).collect(Collectors.joining()).replaceAll("[\\n\\r]", "")); + + removeContainer(engineApiClient, "container-exec-interactive-test"); + + perform(() -> { + ExecInspectResponse execInspectAfterStop = execApi.execInspect(exec.getId()); + assertFalse(execInspectAfterStop.getRunning()); + }); + } } diff --git a/build.gradle.kts b/build.gradle.kts index 0ae983f1..44c49c3c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -41,7 +41,7 @@ subprojects { repositories { // mavenLocal() // listOf( -//// "gesellix/okhttp", +// "gesellix/okhttp", //// "docker-client/*", // ).forEach { slug -> //// fun findProperty(s: String) = project.findProperty(s) as String? @@ -71,6 +71,8 @@ allprojects { } } // dependencySubstitution { +// substitute(module("com.squareup.okhttp3:okhttp")) +// .using(module("de.gesellix.okhttp3-forked:okhttp:${libs.versions.okhttp.get()}")) // substitute(module("com.squareup.okhttp3:okhttp-jvm")) // .using(module("de.gesellix.okhttp3-forked:okhttp-jvm:${libs.versions.okhttp.get()}")) // }