/
utils.go
186 lines (154 loc) · 4.14 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package streams
import (
"context"
"io"
"github.com/alibaba/pouch/pkg/log"
"golang.org/x/sync/errgroup"
)
// Pipes is used to present any downstream pipe, for example, containerd's cio.
type Pipes struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}
// AttachConfig is used to describe how to attach the client's stream to
// the process's stream.
type AttachConfig struct {
Detach bool
Terminal bool
// CloseStdin means if the stdin of client's stream is closed by the
// caller, the stdin of process's stream should be closed.
CloseStdin bool
// UseStdin/UseStdout/UseStderr can be used to check the client's stream
// is nil or not. It is hard to check io.Write/io.ReadCloser != nil
// directly, because they might be specific type, which means
// (typ != nil) always is true.
UseStdin, UseStdout, UseStderr bool
Stdin io.ReadCloser
Stdout, Stderr io.Writer
}
// CopyPipes will watchs the data pipe's channel, like sticked to the pipe.
//
// NOTE: don't assign the specific type to the Pipes because the Std* != nil
// always return true.
func (s *Stream) CopyPipes(p Pipes) {
copyfn := func(styp string, w io.WriteCloser, r io.ReadCloser) {
s.Add(1)
go func() {
log.With(nil).Debugf("start to copy %s from pipe", styp)
defer log.With(nil).Debugf("stop copy %s from pipe", styp)
defer s.Done()
defer r.Close()
if _, err := io.Copy(w, r); err != nil {
log.With(nil).WithError(err).Error("failed to copy pipe data")
}
}()
}
if p.Stdout != nil {
copyfn("stdout", s.Stdout(), p.Stdout)
}
if p.Stderr != nil {
copyfn("stderr", s.Stderr(), p.Stderr)
}
if s.stdin != nil && p.Stdin != nil {
go func() {
log.With(nil).Debug("start to copy stdin from pipe")
defer log.With(nil).Debug("stop copy stdin from pipe")
io.Copy(p.Stdin, s.stdin)
if err := p.Stdin.Close(); err != nil {
log.With(nil).WithError(err).Error("failed to close pipe stdin")
}
}()
}
}
// Attach will use stream defined by AttachConfig to attach the Stream.
func (s *Stream) Attach(ctx context.Context, cfg *AttachConfig) <-chan error {
var (
group errgroup.Group
stdout, stderr io.ReadCloser
)
if cfg.UseStdin {
group.Go(func() error {
log.With(nil).Debug("start to attach stdin to stream")
defer log.With(nil).Debug("stop attach stdin to stream")
defer func() {
if cfg.CloseStdin {
s.StdinPipe().Close()
}
}()
_, err := io.Copy(s.StdinPipe(), cfg.Stdin)
if err == io.ErrClosedPipe {
err = nil
}
return err
})
}
attachFn := func(styp string, w io.Writer, r io.ReadCloser) error {
log.With(nil).Debugf("start to attach %s to stream", styp)
defer log.With(nil).Debugf("stop attach %s to stream", styp)
defer func() {
// NOTE: when the stdout/stderr is closed, the stdin
// should be closed. for example, caller types the exit
// command, the stdout will be closed. in this case,
// the stdin should be closed. Otherwise, the caller
// will wait for close signal forever.
if cfg.UseStdin {
cfg.Stdin.Close()
}
r.Close()
}()
_, err := io.Copy(w, r)
if err == io.ErrClosedPipe {
err = nil
}
return err
}
if cfg.UseStdout {
stdout = s.NewStdoutPipe()
group.Go(func() error {
return attachFn("stdout", cfg.Stdout, stdout)
})
}
if cfg.UseStderr {
stderr = s.NewStderrPipe()
group.Go(func() error {
return attachFn("stderr", cfg.Stderr, stderr)
})
}
var (
errCh = make(chan error, 1)
groupErrCh = make(chan error, 1)
)
go func() {
defer close(groupErrCh)
groupErrCh <- group.Wait()
}()
go func() {
defer log.With(nil).Debug("the goroutine for attaching is done")
defer close(errCh)
select {
case <-ctx.Done():
if cfg.UseStdin {
cfg.Stdin.Close()
}
// NOTE: the stdout writer will be evicted from stream in
// next Write call.
if cfg.UseStdout {
stdout.Close()
}
// NOTE: the stderr writer will be evicted from stream in
// next Write call.
if cfg.UseStderr {
stderr.Close()
}
if err := group.Wait(); err != nil {
errCh <- err
return
}
errCh <- ctx.Err()
case err := <-groupErrCh:
errCh <- err
}
}()
return errCh
}