-
Notifications
You must be signed in to change notification settings - Fork 18.6k
/
service.go
292 lines (259 loc) · 8.9 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package service // import "github.com/docker/docker/volume/service"
import (
"context"
"strconv"
"sync/atomic"
"github.com/containerd/log"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
volumetypes "github.com/docker/docker/api/types/volume"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/directory"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/volume"
"github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/service/opts"
"github.com/pkg/errors"
)
type ds interface {
GetDriverList() []string
}
// VolumeEventLogger interface provides methods to log volume-related events
type VolumeEventLogger interface {
// LogVolumeEvent generates an event related to a volume.
LogVolumeEvent(volumeID string, action events.Action, attributes map[string]string)
}
// VolumesService manages access to volumes
// This is used as the main access point for volumes to higher level services and the API.
type VolumesService struct {
vs *VolumeStore
ds ds
pruneRunning int32
eventLogger VolumeEventLogger
}
// NewVolumeService creates a new volume service
func NewVolumeService(root string, pg plugingetter.PluginGetter, rootIDs idtools.Identity, logger VolumeEventLogger) (*VolumesService, error) {
ds := drivers.NewStore(pg)
if err := setupDefaultDriver(ds, root, rootIDs); err != nil {
return nil, err
}
vs, err := NewStore(root, ds, WithEventLogger(logger))
if err != nil {
return nil, err
}
return &VolumesService{vs: vs, ds: ds, eventLogger: logger}, nil
}
// GetDriverList gets the list of registered volume drivers
func (s *VolumesService) GetDriverList() []string {
return s.ds.GetDriverList()
}
// AnonymousLabel is the label used to indicate that a volume is anonymous
// This is set automatically on a volume when a volume is created without a name specified, and as such an id is generated for it.
const AnonymousLabel = "com.docker.volume.anonymous"
// Create creates a volume
// If the caller is creating this volume to be consumed immediately, it is
// expected that the caller specifies a reference ID.
// This reference ID will protect this volume from removal.
//
// A good example for a reference ID is a container's ID.
// When whatever is going to reference this volume is removed the caller should defeference the volume by calling `Release`.
func (s *VolumesService) Create(ctx context.Context, name, driverName string, options ...opts.CreateOption) (*volumetypes.Volume, error) {
if name == "" {
name = stringid.GenerateRandomID()
options = append(options, opts.WithCreateLabel(AnonymousLabel, ""))
}
v, err := s.vs.Create(ctx, name, driverName, options...)
if err != nil {
return nil, err
}
apiV := volumeToAPIType(v)
return &apiV, nil
}
// Get returns details about a volume
func (s *VolumesService) Get(ctx context.Context, name string, getOpts ...opts.GetOption) (*volumetypes.Volume, error) {
v, err := s.vs.Get(ctx, name, getOpts...)
if err != nil {
return nil, err
}
vol := volumeToAPIType(v)
var cfg opts.GetConfig
for _, o := range getOpts {
o(&cfg)
}
if cfg.ResolveStatus {
vol.Status = v.Status()
}
return &vol, nil
}
// Mount mounts the volume
// Callers should specify a uniqe reference for each Mount/Unmount pair.
//
// Example:
// ```go
// mountID := "randomString"
// s.Mount(ctx, vol, mountID)
// s.Unmount(ctx, vol, mountID)
// ```
func (s *VolumesService) Mount(ctx context.Context, vol *volumetypes.Volume, ref string) (string, error) {
v, err := s.vs.Get(ctx, vol.Name, opts.WithGetDriver(vol.Driver))
if err != nil {
if IsNotExist(err) {
err = errdefs.NotFound(err)
}
return "", err
}
return v.Mount(ref)
}
// Unmount unmounts the volume.
// Note that depending on the implementation, the volume may still be mounted due to other resources using it.
//
// The reference specified here should be the same reference specified during `Mount` and should be
// unique for each mount/unmount pair.
// See `Mount` documentation for an example.
func (s *VolumesService) Unmount(ctx context.Context, vol *volumetypes.Volume, ref string) error {
v, err := s.vs.Get(ctx, vol.Name, opts.WithGetDriver(vol.Driver))
if err != nil {
if IsNotExist(err) {
err = errdefs.NotFound(err)
}
return err
}
return v.Unmount(ref)
}
// Release releases a volume reference
func (s *VolumesService) Release(ctx context.Context, name string, ref string) error {
return s.vs.Release(ctx, name, ref)
}
// Remove removes a volume
// An error is returned if the volume is still referenced.
func (s *VolumesService) Remove(ctx context.Context, name string, rmOpts ...opts.RemoveOption) error {
var cfg opts.RemoveConfig
for _, o := range rmOpts {
o(&cfg)
}
v, err := s.vs.Get(ctx, name)
if err != nil {
if IsNotExist(err) && cfg.PurgeOnError {
return nil
}
return err
}
err = s.vs.Remove(ctx, v, rmOpts...)
if IsNotExist(err) {
err = nil
} else if IsInUse(err) {
err = errdefs.Conflict(err)
} else if IsNotExist(err) && cfg.PurgeOnError {
err = nil
}
return err
}
var acceptedPruneFilters = map[string]bool{
"label": true,
"label!": true,
// All tells the filter to consider all volumes not just anonymous ones.
"all": true,
}
var acceptedListFilters = map[string]bool{
"dangling": true,
"name": true,
"driver": true,
"label": true,
}
// LocalVolumesSize gets all local volumes and fetches their size on disk
// Note that this intentionally skips volumes which have mount options. Typically
// volumes with mount options are not really local even if they are using the
// local driver.
func (s *VolumesService) LocalVolumesSize(ctx context.Context) ([]*volumetypes.Volume, error) {
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool {
dv, ok := v.(volume.DetailedVolume)
return ok && len(dv.Options()) == 0
})))
if err != nil {
return nil, err
}
return s.volumesToAPI(ctx, ls, calcSize(true)), nil
}
// Prune removes (local) volumes which match the past in filter arguments.
// Note that this intentionally skips volumes with mount options as there would
// be no space reclaimed in this case.
func (s *VolumesService) Prune(ctx context.Context, filter filters.Args) (*types.VolumesPruneReport, error) {
if !atomic.CompareAndSwapInt32(&s.pruneRunning, 0, 1) {
return nil, errdefs.Conflict(errors.New("a prune operation is already running"))
}
defer atomic.StoreInt32(&s.pruneRunning, 0)
if err := withPrune(filter); err != nil {
return nil, err
}
by, err := filtersToBy(filter, acceptedPruneFilters)
if err != nil {
return nil, err
}
ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), ByReferenced(false), by, CustomFilter(func(v volume.Volume) bool {
dv, ok := v.(volume.DetailedVolume)
return ok && len(dv.Options()) == 0
})))
if err != nil {
return nil, err
}
rep := &types.VolumesPruneReport{VolumesDeleted: make([]string, 0, len(ls))}
for _, v := range ls {
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
err = nil
}
return rep, err
default:
}
vSize, err := directory.Size(ctx, v.Path())
if err != nil {
log.G(ctx).WithField("volume", v.Name()).WithError(err).Warn("could not determine size of volume")
}
if err := s.vs.Remove(ctx, v); err != nil {
log.G(ctx).WithError(err).WithField("volume", v.Name()).Warnf("Could not determine size of volume")
continue
}
rep.SpaceReclaimed += uint64(vSize)
rep.VolumesDeleted = append(rep.VolumesDeleted, v.Name())
}
s.eventLogger.LogVolumeEvent("", events.ActionPrune, map[string]string{
"reclaimed": strconv.FormatInt(int64(rep.SpaceReclaimed), 10),
})
return rep, nil
}
// List gets the list of volumes which match the past in filters
// If filters is nil or empty all volumes are returned.
func (s *VolumesService) List(ctx context.Context, filter filters.Args) (volumesOut []*volumetypes.Volume, warnings []string, err error) {
by, err := filtersToBy(filter, acceptedListFilters)
if err != nil {
return nil, nil, err
}
volumes, warnings, err := s.vs.Find(ctx, by)
if err != nil {
return nil, nil, err
}
return s.volumesToAPI(ctx, volumes, useCachedPath(true)), warnings, nil
}
// Shutdown shuts down the image service and dependencies
func (s *VolumesService) Shutdown() error {
return s.vs.Shutdown()
}
// LiveRestoreVolume passes through the LiveRestoreVolume call to the volume if it is implemented
// otherwise it is a no-op.
func (s *VolumesService) LiveRestoreVolume(ctx context.Context, vol *volumetypes.Volume, ref string) error {
v, err := s.vs.Get(ctx, vol.Name, opts.WithGetDriver(vol.Driver))
if err != nil {
return err
}
rlv, ok := v.(volume.LiveRestorer)
if !ok {
log.G(ctx).WithField("volume", vol.Name).Debugf("volume does not implement LiveRestoreVolume: %T", v)
return nil
}
return rlv.LiveRestoreVolume(ctx, ref)
}