forked from kevpar/cri
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
269 lines (241 loc) · 9.48 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"fmt"
"io"
"net/http"
"path/filepath"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/plugin"
cni "github.com/containerd/go-cni"
runcapparmor "github.com/opencontainers/runc/libcontainer/apparmor"
runcseccomp "github.com/opencontainers/runc/libcontainer/seccomp"
"github.com/opencontainers/selinux/go-selinux"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
api "github.com/containerd/cri/pkg/api/v1"
"github.com/containerd/cri/pkg/atomic"
criconfig "github.com/containerd/cri/pkg/config"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
osinterface "github.com/containerd/cri/pkg/os"
"github.com/containerd/cri/pkg/registrar"
containerstore "github.com/containerd/cri/pkg/store/container"
imagestore "github.com/containerd/cri/pkg/store/image"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
snapshotstore "github.com/containerd/cri/pkg/store/snapshot"
)
// grpcServices are all the grpc services provided by cri containerd.
type grpcServices interface {
runtime.RuntimeServiceServer
runtime.ImageServiceServer
api.CRIPluginServiceServer
}
// CRIService is the interface implement CRI remote service server.
type CRIService interface {
Run() error
// io.Closer is used by containerd to gracefully stop cri service.
io.Closer
plugin.Service
grpcServices
}
// criService implements CRIService.
type criService struct {
// config contains all configurations.
config criconfig.Config
// imageFSPath is the path to image filesystem.
imageFSPath string
// apparmorEnabled indicates whether apparmor is enabled.
apparmorEnabled bool
// seccompEnabled indicates whether seccomp is enabled.
seccompEnabled bool
// os is an interface for all required os operations.
os osinterface.OS
// sandboxStore stores all resources associated with sandboxes.
sandboxStore *sandboxstore.Store
// sandboxNameIndex stores all sandbox names and make sure each name
// is unique.
sandboxNameIndex *registrar.Registrar
// containerStore stores all resources associated with containers.
containerStore *containerstore.Store
// containerNameIndex stores all container names and make sure each
// name is unique.
containerNameIndex *registrar.Registrar
// imageStore stores all resources associated with images.
imageStore *imagestore.Store
// snapshotStore stores information of all snapshots.
snapshotStore *snapshotstore.Store
// netPlugin is used to setup and teardown network when run/stop pod sandbox.
netPlugin cni.CNI
// client is an instance of the containerd client
client *containerd.Client
// streamServer is the streaming server serves container streaming request.
streamServer streaming.Server
// eventMonitor is the monitor monitors containerd events.
eventMonitor *eventMonitor
// initialized indicates whether the server is initialized. All GRPC services
// should return error before the server is initialized.
initialized atomic.Bool
}
// NewCRIService returns a new instance of CRIService
func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) {
var err error
c := &criService{
config: config,
client: client,
apparmorEnabled: runcapparmor.IsEnabled(),
seccompEnabled: runcseccomp.IsEnabled(),
os: osinterface.RealOS{},
sandboxStore: sandboxstore.NewStore(),
containerStore: containerstore.NewStore(),
imageStore: imagestore.NewStore(),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
initialized: atomic.NewBool(false),
}
if c.config.EnableSelinux {
if !selinux.GetEnabled() {
logrus.Warn("Selinux is not supported")
}
} else {
selinux.SetDisabled()
}
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
return nil, errors.Errorf("failed to find snapshotter %q", c.config.ContainerdConfig.Snapshotter)
}
c.imageFSPath = imageFSPath(config.ContainerdRootDir, config.ContainerdConfig.Snapshotter)
logrus.Infof("Get image filesystem path %q", c.imageFSPath)
// Pod needs to attach to atleast loopback network and a non host network,
// hence networkAttachCount is 2. If there are more network configs the
// pod will be attached to all the networks but we will only use the ip
// of the default network interface as the pod IP.
c.netPlugin, err = cni.New(cni.WithMinNetworkCount(networkAttachCount),
cni.WithPluginConfDir(config.NetworkPluginConfDir),
cni.WithPluginDir([]string{config.NetworkPluginBinDir}))
if err != nil {
return nil, errors.Wrap(err, "failed to initialize cni")
}
// Try to load the config if it exists. Just log the error if load fails
// This is not disruptive for containerd to panic
if err := c.netPlugin.Load(cni.WithLoNetwork, cni.WithDefaultConf); err != nil {
logrus.WithError(err).Error("Failed to load cni during init, please check CRI plugin status before setting up network for pods")
}
// prepare streaming server
c.streamServer, err = newStreamServer(c, config.StreamServerAddress, config.StreamServerPort)
if err != nil {
return nil, errors.Wrap(err, "failed to create stream server")
}
c.eventMonitor = newEventMonitor(c.containerStore, c.sandboxStore)
return c, nil
}
// Register registers all required services onto a specific grpc server.
// This is used by containerd cri plugin.
func (c *criService) Register(s *grpc.Server) error {
instrumented := newInstrumentedService(c)
runtime.RegisterRuntimeServiceServer(s, instrumented)
runtime.RegisterImageServiceServer(s, instrumented)
api.RegisterCRIPluginServiceServer(s, instrumented)
return nil
}
// Run starts the CRI service.
func (c *criService) Run() error {
logrus.Info("Start subscribing containerd event")
c.eventMonitor.subscribe(c.client)
logrus.Infof("Start recovering state")
if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
return errors.Wrap(err, "failed to recover state")
}
// Start event handler.
logrus.Info("Start event monitor")
eventMonitorErrCh := c.eventMonitor.start()
// Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer")
snapshotsSyncer := newSnapshotsSyncer(
c.snapshotStore,
c.client.SnapshotService(c.config.ContainerdConfig.Snapshotter),
time.Duration(c.config.StatsCollectPeriod)*time.Second,
)
snapshotsSyncer.start()
// Start streaming server.
logrus.Info("Start streaming server")
streamServerErrCh := make(chan error)
go func() {
defer close(streamServerErrCh)
if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Error("Failed to start streaming server")
streamServerErrCh <- err
}
}()
// Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set()
var eventMonitorErr, streamServerErr error
// Stop the whole CRI service if any of the critical service exits.
select {
case eventMonitorErr = <-eventMonitorErrCh:
case streamServerErr = <-streamServerErrCh:
}
if err := c.Close(); err != nil {
return errors.Wrap(err, "failed to stop cri service")
}
// If the error is set above, err from channel must be nil here, because
// the channel is supposed to be closed. Or else, we wait and set it.
if err := <-eventMonitorErrCh; err != nil {
eventMonitorErr = err
}
logrus.Info("Event monitor stopped")
// There is a race condition with http.Server.Serve.
// When `Close` is called at the same time with `Serve`, `Close`
// may finish first, and `Serve` may still block.
// See https://github.com/golang/go/issues/20239.
// Here we set a 2 second timeout for the stream server wait,
// if it timeout, an error log is generated.
// TODO(random-liu): Get rid of this after https://github.com/golang/go/issues/20239
// is fixed.
const streamServerStopTimeout = 2 * time.Second
select {
case err := <-streamServerErrCh:
if err != nil {
streamServerErr = err
}
logrus.Info("Stream server stopped")
case <-time.After(streamServerStopTimeout):
logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout)
}
if eventMonitorErr != nil {
return errors.Wrap(eventMonitorErr, "event monitor error")
}
if streamServerErr != nil {
return errors.Wrap(streamServerErr, "stream server error")
}
return nil
}
// Close stops the CRI service.
// TODO(random-liu): Make close synchronous.
func (c *criService) Close() error {
logrus.Info("Stop CRI service")
c.eventMonitor.stop()
if err := c.streamServer.Stop(); err != nil {
return errors.Wrap(err, "failed to stop stream server")
}
return nil
}
// imageFSPath returns containerd image filesystem path.
// Note that if containerd changes directory layout, we also needs to change this.
func imageFSPath(rootDir, snapshotter string) string {
return filepath.Join(rootDir, fmt.Sprintf("%s.%s", plugin.SnapshotPlugin, snapshotter))
}