-
Notifications
You must be signed in to change notification settings - Fork 374
/
daemon_dest.go
144 lines (126 loc) · 5.46 KB
/
daemon_dest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package daemon
import (
"context"
"io"
"github.com/containers/image/docker/reference"
"github.com/containers/image/docker/tarfile"
"github.com/containers/image/types"
"github.com/docker/docker/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type daemonImageDestination struct {
ref daemonReference
mustMatchRuntimeOS bool
*tarfile.Destination // Implements most of types.ImageDestination
// For talking to imageLoadGoroutine
goroutineCancel context.CancelFunc
statusChannel <-chan error
writer *io.PipeWriter
// Other state
committed bool // writer has been closed
}
// newImageDestination returns a types.ImageDestination for the specified image reference.
func newImageDestination(ctx context.Context, sys *types.SystemContext, ref daemonReference) (types.ImageDestination, error) {
if ref.ref == nil {
return nil, errors.Errorf("Invalid destination docker-daemon:%s: a destination must be a name:tag", ref.StringWithinTransport())
}
namedTaggedRef, ok := ref.ref.(reference.NamedTagged)
if !ok {
return nil, errors.Errorf("Invalid destination docker-daemon:%s: a destination must be a name:tag", ref.StringWithinTransport())
}
var mustMatchRuntimeOS = true
if sys != nil && sys.DockerDaemonHost != client.DefaultDockerHost {
mustMatchRuntimeOS = false
}
c, err := newDockerClient(sys)
if err != nil {
return nil, errors.Wrap(err, "Error initializing docker engine client")
}
reader, writer := io.Pipe()
// Commit() may never be called, so we may never read from this channel; so, make this buffered to allow imageLoadGoroutine to write status and terminate even if we never read it.
statusChannel := make(chan error, 1)
goroutineContext, goroutineCancel := context.WithCancel(ctx)
go imageLoadGoroutine(goroutineContext, c, reader, statusChannel)
return &daemonImageDestination{
ref: ref,
mustMatchRuntimeOS: mustMatchRuntimeOS,
Destination: tarfile.NewDestination(writer, namedTaggedRef),
goroutineCancel: goroutineCancel,
statusChannel: statusChannel,
writer: writer,
committed: false,
}, nil
}
// imageLoadGoroutine accepts tar stream on reader, sends it to c, and reports error or success by writing to statusChannel
func imageLoadGoroutine(ctx context.Context, c *client.Client, reader *io.PipeReader, statusChannel chan<- error) {
err := errors.New("Internal error: unexpected panic in imageLoadGoroutine")
defer func() {
logrus.Debugf("docker-daemon: sending done, status %v", err)
statusChannel <- err
}()
defer func() {
if err == nil {
reader.Close()
} else {
reader.CloseWithError(err)
}
}()
resp, err := c.ImageLoad(ctx, reader, true)
if err != nil {
err = errors.Wrap(err, "Error saving image to docker engine")
return
}
defer resp.Body.Close()
}
// DesiredLayerCompression indicates if layers must be compressed, decompressed or preserved
func (d *daemonImageDestination) DesiredLayerCompression() types.LayerCompression {
return types.PreserveOriginal
}
// MustMatchRuntimeOS returns true iff the destination can store only images targeted for the current runtime OS. False otherwise.
func (d *daemonImageDestination) MustMatchRuntimeOS() bool {
return d.mustMatchRuntimeOS
}
// Close removes resources associated with an initialized ImageDestination, if any.
func (d *daemonImageDestination) Close() error {
if !d.committed {
logrus.Debugf("docker-daemon: Closing tar stream to abort loading")
// In principle, goroutineCancel() should abort the HTTP request and stop the process from continuing.
// In practice, though, various HTTP implementations used by client.Client.ImageLoad() (including
// https://github.com/golang/net/blob/master/context/ctxhttp/ctxhttp_pre17.go and the
// net/http version with native Context support in Go 1.7) do not always actually immediately cancel
// the operation: they may process the HTTP request, or a part of it, to completion in a goroutine, and
// return early if the context is canceled without terminating the goroutine at all.
// So we need this CloseWithError to terminate sending the HTTP request Body
// immediately, and hopefully, through terminating the sending which uses "Transfer-Encoding: chunked"" without sending
// the terminating zero-length chunk, prevent the docker daemon from processing the tar stream at all.
// Whether that works or not, closing the PipeWriter seems desirable in any case.
d.writer.CloseWithError(errors.New("Aborting upload, daemonImageDestination closed without a previous .Commit()"))
}
d.goroutineCancel()
return nil
}
func (d *daemonImageDestination) Reference() types.ImageReference {
return d.ref
}
// Commit marks the process of storing the image as successful and asks for the image to be persisted.
// WARNING: This does not have any transactional semantics:
// - Uploaded data MAY be visible to others before Commit() is called
// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed)
func (d *daemonImageDestination) Commit(ctx context.Context) error {
logrus.Debugf("docker-daemon: Closing tar stream")
if err := d.Destination.Commit(ctx); err != nil {
return err
}
if err := d.writer.Close(); err != nil {
return err
}
d.committed = true // We may still fail, but we are done sending to imageLoadGoroutine.
logrus.Debugf("docker-daemon: Waiting for status")
select {
case <-ctx.Done():
return ctx.Err()
case err := <-d.statusChannel:
return err
}
}