-
Notifications
You must be signed in to change notification settings - Fork 18.7k
/
streams.go
170 lines (147 loc) · 4.33 KB
/
streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package stream // import "github.com/docker/docker/container/stream"
import (
"context"
"fmt"
"io"
"strings"
"sync"
"github.com/containerd/containerd/cio"
"github.com/containerd/log"
"github.com/docker/docker/pkg/broadcaster"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/pools"
)
// Config holds information about I/O streams managed together.
//
// config.StdinPipe returns a WriteCloser which can be used to feed data
// to the standard input of the streamConfig's active process.
// config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
// which can be used to retrieve the standard output (and error) generated
// by the container's active process. The output (and error) are actually
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster".
type Config struct {
wg sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
dio *cio.DirectIO
}
// NewConfig creates a stream config and initializes
// the standard err and standard out to new unbuffered broadcasters.
func NewConfig() *Config {
return &Config{
stderr: new(broadcaster.Unbuffered),
stdout: new(broadcaster.Unbuffered),
}
}
// Stdout returns the standard output in the configuration.
func (c *Config) Stdout() *broadcaster.Unbuffered {
return c.stdout
}
// Stderr returns the standard error in the configuration.
func (c *Config) Stderr() *broadcaster.Unbuffered {
return c.stderr
}
// Stdin returns the standard input in the configuration.
func (c *Config) Stdin() io.ReadCloser {
return c.stdin
}
// StdinPipe returns an input writer pipe as an io.WriteCloser.
func (c *Config) StdinPipe() io.WriteCloser {
return c.stdinPipe
}
// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
// It adds this new out pipe to the Stdout broadcaster.
// This will block stdout if unconsumed.
func (c *Config) StdoutPipe() io.ReadCloser {
bytesPipe := ioutils.NewBytesPipe()
c.stdout.Add(bytesPipe)
return bytesPipe
}
// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
// It adds this new err pipe to the Stderr broadcaster.
// This will block stderr if unconsumed.
func (c *Config) StderrPipe() io.ReadCloser {
bytesPipe := ioutils.NewBytesPipe()
c.stderr.Add(bytesPipe)
return bytesPipe
}
// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
func (c *Config) NewInputPipes() {
c.stdin, c.stdinPipe = io.Pipe()
}
// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
func (c *Config) NewNopInputPipe() {
c.stdinPipe = ioutils.NopWriteCloser(io.Discard)
}
// CloseStreams ensures that the configured streams are properly closed.
func (c *Config) CloseStreams() error {
var errors []string
if c.stdin != nil {
if err := c.stdin.Close(); err != nil {
errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
}
}
if err := c.stdout.Clean(); err != nil {
errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
}
if err := c.stderr.Clean(); err != nil {
errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
}
if len(errors) > 0 {
return fmt.Errorf(strings.Join(errors, "\n"))
}
return nil
}
// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
ctx := context.TODO()
c.dio = iop
copyFunc := func(w io.Writer, r io.ReadCloser) {
c.wg.Add(1)
go func() {
if _, err := pools.Copy(w, r); err != nil {
log.G(ctx).Errorf("stream copy error: %v", err)
}
r.Close()
c.wg.Done()
}()
}
if iop.Stdout != nil {
copyFunc(c.Stdout(), iop.Stdout)
}
if iop.Stderr != nil {
copyFunc(c.Stderr(), iop.Stderr)
}
if stdin := c.Stdin(); stdin != nil {
if iop.Stdin != nil {
go func() {
pools.Copy(iop.Stdin, stdin)
if err := iop.Stdin.Close(); err != nil {
log.G(ctx).Warnf("failed to close stdin: %v", err)
}
}()
}
}
}
// Wait for the stream to close
// Wait supports timeouts via the context to unblock and forcefully
// close the io streams
func (c *Config) Wait(ctx context.Context) {
done := make(chan struct{}, 1)
go func() {
c.wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
if c.dio != nil {
c.dio.Cancel()
c.dio.Wait()
c.dio.Close()
}
}
}