From cfe289b4a385c626582e7ab2818d0b264bcf653a Mon Sep 17 00:00:00 2001 From: Tobias Gesellchen Date: Sun, 18 Jul 2021 01:02:56 +0200 Subject: [PATCH] Add a multiplexed stream decoder. See https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach for the stream format documentation. Handling the .../logs endpoints would work without the custom interceptor, when the response content-type would be set correctly :-/ See https://github.com/gesellix/docker-client/issues/21 for details. --- .../engine/client/infrastructure/Frame.java | 79 +++++++++++++++++++ .../EnsureRawStreamContentTypeInterceptor.kt | 23 ++++++ .../client/infrastructure/FrameReader.kt | 26 ++++++ 3 files changed, 128 insertions(+) create mode 100644 engine-api/src/main/java/de/gesellix/docker/engine/client/infrastructure/Frame.java create mode 100644 engine-api/src/main/kotlin/de/gesellix/docker/engine/client/infrastructure/EnsureRawStreamContentTypeInterceptor.kt create mode 100644 engine-api/src/main/kotlin/de/gesellix/docker/engine/client/infrastructure/FrameReader.kt diff --git a/engine-api/src/main/java/de/gesellix/docker/engine/client/infrastructure/Frame.java b/engine-api/src/main/java/de/gesellix/docker/engine/client/infrastructure/Frame.java new file mode 100644 index 000000000..9a926a824 --- /dev/null +++ b/engine-api/src/main/java/de/gesellix/docker/engine/client/infrastructure/Frame.java @@ -0,0 +1,79 @@ +package de.gesellix.docker.engine.client.infrastructure; + +import java.nio.charset.StandardCharsets; + +public class Frame { + + private final StreamType streamType; + private final byte[] payload; + + public Frame(StreamType streamType, byte[] payload) { + this.streamType = streamType; + this.payload = payload; + } + + public StreamType getStreamType() { + return streamType; + } + + public byte[] getPayload() { + return payload; + } + + public String getPayloadAsString() { + return new String(payload, StandardCharsets.UTF_8).trim(); + } + + @Override + public String toString() { + return "Frame{" + + "streamType=" + streamType + + ", payload=" + getPayloadAsString() + + '}'; + } + + /** + * STREAM_TYPE can be: + * + * See the paragraph _Stream format_ at https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach. + * Reference implementation: https://github.com/moby/moby/blob/master/pkg/stdcopy/stdcopy.go. + * Docker client GoDoc: https://godoc.org/github.com/moby/moby/client#Client.ContainerAttach. + */ + public enum StreamType { + + STDIN((byte) 0), + STDOUT((byte) 1), + STDERR((byte) 2), + SYSTEMERR((byte) 3); + + StreamType(Object streamTypeId) { + this.streamTypeId = ((byte) (streamTypeId)); + } + + public static StreamType valueOf(final byte b) { + switch (b) { + case 0: + return STDIN; + case 1: + return STDOUT; + case 2: + return STDERR; + case 3: + return SYSTEMERR; + default: + throw new IllegalArgumentException("no enum value for " + String.valueOf(b) + " found."); + } + } + + public byte getStreamTypeId() { + return streamTypeId; + } + + private final byte streamTypeId; + } +} diff --git a/engine-api/src/main/kotlin/de/gesellix/docker/engine/client/infrastructure/EnsureRawStreamContentTypeInterceptor.kt b/engine-api/src/main/kotlin/de/gesellix/docker/engine/client/infrastructure/EnsureRawStreamContentTypeInterceptor.kt new file mode 100644 index 000000000..5f55f778a --- /dev/null +++ b/engine-api/src/main/kotlin/de/gesellix/docker/engine/client/infrastructure/EnsureRawStreamContentTypeInterceptor.kt @@ -0,0 +1,23 @@ +package de.gesellix.docker.engine.client.infrastructure + +import okhttp3.Interceptor +import okhttp3.Response + +data class MultiplexedStreamConfig(val expectMultiplexedStream: Boolean) + +// This one would work automatically, when the response content-type would be set correctly :-/ +// see https://github.com/gesellix/docker-client/issues/21 +class EnsureRawStreamContentTypeInterceptor : Interceptor { + + override fun intercept(chain: Interceptor.Chain): Response { + val response = chain.proceed(chain.request()) + if (chain.request().tag(MultiplexedStreamConfig::class.java)?.expectMultiplexedStream == true) { + if (response.headers("Content-Type").isEmpty()) { + // TODO use a proper logger + println("Overriding Content-Type response header with application/vnd.docker.raw-stream") + return response.newBuilder().header("Content-Type", "application/vnd.docker.raw-stream").build() + } + } + return response + } +} diff --git a/engine-api/src/main/kotlin/de/gesellix/docker/engine/client/infrastructure/FrameReader.kt b/engine-api/src/main/kotlin/de/gesellix/docker/engine/client/infrastructure/FrameReader.kt new file mode 100644 index 000000000..70e8fd66d --- /dev/null +++ b/engine-api/src/main/kotlin/de/gesellix/docker/engine/client/infrastructure/FrameReader.kt @@ -0,0 +1,26 @@ +package de.gesellix.docker.engine.client.infrastructure + +import de.gesellix.docker.response.Reader +import okio.BufferedSource +import okio.Source +import okio.buffer + +class FrameReader(source: Source) : Reader { + + private val buffer: BufferedSource = source.buffer() + + override fun readNext(type: Class?): Frame { + // Stream format: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach + // header := [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4} + + val streamType = Frame.StreamType.valueOf(buffer.readByte()) + buffer.skip(3) + val frameSize = buffer.readInt() + + return Frame(streamType, buffer.readByteArray(frameSize.toLong())) + } + + override fun hasNext(): Boolean { + return !Thread.currentThread().isInterrupted && !buffer.exhausted() + } +}