Skip to content

Commit

Permalink
Add a multiplexed stream decoder.
Browse files Browse the repository at this point in the history
See https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach for the stream format documentation.

Handling the .../logs endpoints would work without the custom interceptor, when the response content-type would be set correctly :-/
See #21 for details.
  • Loading branch information
gesellix committed Sep 24, 2021
1 parent d2eda58 commit cfe289b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package de.gesellix.docker.engine.client.infrastructure;

import java.nio.charset.StandardCharsets;

public class Frame {

private final StreamType streamType;
private final byte[] payload;

public Frame(StreamType streamType, byte[] payload) {
this.streamType = streamType;
this.payload = payload;
}

public StreamType getStreamType() {
return streamType;
}

public byte[] getPayload() {
return payload;
}

public String getPayloadAsString() {
return new String(payload, StandardCharsets.UTF_8).trim();
}

@Override
public String toString() {
return "Frame{" +
"streamType=" + streamType +
", payload=" + getPayloadAsString() +
'}';
}

/**
* STREAM_TYPE can be:
* <ul>
* <li>0: stdin (will be written on stdout)</li>
* <li>1: stdout</li>
* <li>2: stderr</li>
* <li>3: systemerr</li>
* </ul>
* See the paragraph _Stream format_ at https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach.
* Reference implementation: https://github.com/moby/moby/blob/master/pkg/stdcopy/stdcopy.go.
* Docker client GoDoc: https://godoc.org/github.com/moby/moby/client#Client.ContainerAttach.
*/
public enum StreamType {

STDIN((byte) 0),
STDOUT((byte) 1),
STDERR((byte) 2),
SYSTEMERR((byte) 3);

StreamType(Object streamTypeId) {
this.streamTypeId = ((byte) (streamTypeId));
}

public static StreamType valueOf(final byte b) {
switch (b) {
case 0:
return STDIN;
case 1:
return STDOUT;
case 2:
return STDERR;
case 3:
return SYSTEMERR;
default:
throw new IllegalArgumentException("no enum value for " + String.valueOf(b) + " found.");
}
}

public byte getStreamTypeId() {
return streamTypeId;
}

private final byte streamTypeId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.gesellix.docker.engine.client.infrastructure

import okhttp3.Interceptor
import okhttp3.Response

data class MultiplexedStreamConfig(val expectMultiplexedStream: Boolean)

// This one would work automatically, when the response content-type would be set correctly :-/
// see https://github.com/gesellix/docker-client/issues/21
class EnsureRawStreamContentTypeInterceptor : Interceptor {

override fun intercept(chain: Interceptor.Chain): Response {
val response = chain.proceed(chain.request())
if (chain.request().tag(MultiplexedStreamConfig::class.java)?.expectMultiplexedStream == true) {
if (response.headers("Content-Type").isEmpty()) {
// TODO use a proper logger
println("Overriding Content-Type response header with application/vnd.docker.raw-stream")
return response.newBuilder().header("Content-Type", "application/vnd.docker.raw-stream").build()
}
}
return response
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package de.gesellix.docker.engine.client.infrastructure

import de.gesellix.docker.response.Reader
import okio.BufferedSource
import okio.Source
import okio.buffer

class FrameReader(source: Source) : Reader<Frame> {

private val buffer: BufferedSource = source.buffer()

override fun readNext(type: Class<Frame>?): Frame {
// Stream format: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
// header := [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4}

val streamType = Frame.StreamType.valueOf(buffer.readByte())
buffer.skip(3)
val frameSize = buffer.readInt()

return Frame(streamType, buffer.readByteArray(frameSize.toLong()))
}

override fun hasNext(): Boolean {
return !Thread.currentThread().isInterrupted && !buffer.exhausted()
}
}

0 comments on commit cfe289b

Please sign in to comment.