/
streaming.go
58 lines (50 loc) · 1.49 KB
/
streaming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package docker
import (
"context"
"io"
"github.com/aliyun/aliyun_assist_client/agent/log"
"github.com/docker/docker/api/types"
dockerstdcopy "github.com/docker/docker/pkg/stdcopy"
)
// TODO: FIXME: Support context, i.e., support canceling, for reading and writing
func streamStdoutFromHijacked(ctx context.Context, hijackedResponse *types.HijackedResponse,
stdoutWriter io.Writer, stderrWriter io.Writer) error {
if stdoutWriter == nil {
stdoutWriter = io.Discard
}
if stderrWriter == nil {
stderrWriter = io.Discard
}
_, err := dockerstdcopy.StdCopy(stdoutWriter, stderrWriter, hijackedResponse.Reader)
return err
}
func streamHijacked(ctx context.Context, hijackedResponse *types.HijackedResponse,
stdoutWriter io.Writer, stderrWriter io.Writer, stdinReader io.Reader) error {
stdoutReceived := make(chan error)
if stdoutWriter != nil || stderrWriter != nil {
go func() {
stdoutReceived <- streamStdoutFromHijacked(ctx, hijackedResponse, stdoutWriter, stderrWriter)
}()
}
stdinSent := make(chan error)
go func() {
if stdinReader != nil {
_, err := io.Copy(hijackedResponse.Conn, stdinReader)
stdinSent <- err
}
hijackedResponse.CloseWrite()
close(stdinSent)
}()
select {
case err := <- stdoutReceived:
return err
case err := <- stdinSent:
if err != nil {
log.GetLogger().WithError(err).Warnln("Error encountered during sending stdin data, which is ignored")
}
if stdoutWriter != nil || stderrWriter != nil {
return <- stdoutReceived
}
}
return nil
}