From 33eaed869198775f9fb65257eb0d87a627addfa7 Mon Sep 17 00:00:00 2001 From: ilyee <493647673@qq.com> Date: Tue, 1 Jun 2021 19:51:14 +0800 Subject: [PATCH] Add fuse-manager Signed-off-by: Zuti He --- Makefile | 5 +- cmd/containerd-stargz-grpc/main.go | 65 +++- cmd/stargz-fuse-manager/main.go | 25 ++ fs/fs.go | 27 +- fusemanager/api/api.pb.go | 566 +++++++++++++++++++++++++++++ fusemanager/api/api.proto | 58 +++ fusemanager/api/generate.go | 19 + fusemanager/client.go | 144 ++++++++ fusemanager/fusemanager.go | 238 ++++++++++++ fusemanager/fusestore.go | 123 +++++++ fusemanager/service.go | 330 +++++++++++++++++ go.mod | 2 + service/service.go | 50 ++- snapshot/snapshot.go | 33 +- snapshot/snapshot_test.go | 11 +- 15 files changed, 1665 insertions(+), 31 deletions(-) create mode 100644 cmd/stargz-fuse-manager/main.go create mode 100644 fusemanager/api/api.pb.go create mode 100644 fusemanager/api/api.proto create mode 100644 fusemanager/api/generate.go create mode 100644 fusemanager/client.go create mode 100644 fusemanager/fusemanager.go create mode 100644 fusemanager/fusestore.go create mode 100644 fusemanager/service.go diff --git a/Makefile b/Makefile index 07316e5b2..6eb0ffc85 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ VERSION=$(shell git describe --match 'v[0-9]*' --dirty='.m' --always --tags) REVISION=$(shell git rev-parse HEAD)$(shell if ! git diff --no-ext-diff --quiet --exit-code; then echo .m; fi) GO_LD_FLAGS=-ldflags '-s -w -X $(PKG)/version.Version=$(VERSION) -X $(PKG)/version.Revision=$(REVISION) $(GO_EXTRA_LDFLAGS)' -CMD=containerd-stargz-grpc ctr-remote stargz-store +CMD=containerd-stargz-grpc ctr-remote stargz-store stargz-fuse-manager CMD_BINARIES=$(addprefix $(PREFIX),$(CMD)) @@ -44,6 +44,9 @@ ctr-remote: FORCE stargz-store: FORCE GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./cmd/stargz-store +stargz-fuse-manager: FORCE + GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./cmd/stargz-fuse-manager + check: @echo "$@" @GO111MODULE=$(GO111MODULE_VALUE) golangci-lint run diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go index 16c07dd54..36b573c59 100644 --- a/cmd/containerd-stargz-grpc/main.go +++ b/cmd/containerd-stargz-grpc/main.go @@ -24,6 +24,7 @@ import ( "net" "net/http" "os" + "os/exec" "os/signal" "path/filepath" "time" @@ -34,11 +35,13 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/dialer" "github.com/containerd/containerd/snapshots" + "github.com/containerd/stargz-snapshotter/fusemanager" "github.com/containerd/stargz-snapshotter/service" "github.com/containerd/stargz-snapshotter/service/keychain/cri" "github.com/containerd/stargz-snapshotter/service/keychain/dockerconfig" "github.com/containerd/stargz-snapshotter/service/keychain/kubeconfig" "github.com/containerd/stargz-snapshotter/service/resolver" + snbase "github.com/containerd/stargz-snapshotter/snapshot" "github.com/containerd/stargz-snapshotter/version" sddaemon "github.com/coreos/go-systemd/v22/daemon" metrics "github.com/docker/go-metrics" @@ -57,14 +60,19 @@ const ( defaultLogLevel = logrus.InfoLevel defaultRootDir = "/var/lib/containerd-stargz-grpc" defaultImageServiceAddress = "/run/containerd/containerd.sock" + defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-namanger.sock" + + fuseManagerBin = "stargz-fuse-manager" + fuseManagerAddress = "fuse-mananger.sock" ) var ( - address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server") - configPath = flag.String("config", defaultConfigPath, "path to the configuration file") - logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]") - rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter") - printVersion = flag.Bool("version", false, "print the version") + address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server") + configPath = flag.String("config", defaultConfigPath, "path to the configuration file") + logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]") + rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter") + detachFuseManager = flag.Bool("detach-fuse-manager", false, "whether detach fusemanager or not") + printVersion = flag.Bool("version", false, "print the version") ) type snapshotterConfig struct { @@ -72,6 +80,12 @@ type snapshotterConfig struct { // MetricsAddress is address for the metrics API MetricsAddress string `toml:"metrics_address"` + + // FuseManagerAddress is address for the fusemanager's GRPC server + FuseManagerAddress string `toml:"fusemanager_address"` + + // FuseManagerPath is path to the fusemanager's executable + FuseManagerPath string `toml:"fusemanager_path"` } func main() { @@ -153,9 +167,44 @@ func main() { runtime.RegisterImageServiceServer(rpc, criServer) credsFuncs = append(credsFuncs, f) } - rs, err := service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config, service.WithCredsFuncs(credsFuncs...)) - if err != nil { - log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter") + + var rs snapshots.Snapshotter + if *detachFuseManager { + fmPath := config.FuseManagerPath + if fmPath == "" { + var err error + fmPath, err = exec.LookPath(fuseManagerBin) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to find fusemanager bin") + } + } + fmAddr := config.FuseManagerAddress + if fmAddr == "" { + var err error + fmAddr, err = exec.LookPath(fuseManagerAddress) + if err != nil { + fmAddr = defaultFuseManagerAddress + } + } + err := service.StartFuseManager(ctx, fmPath, fmAddr, filepath.Join(*rootDir, "fusestore.db"), *logLevel, filepath.Join(*rootDir, "stargz-fuse-manager.log")) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to start fusemanager") + } + fs, err := fusemanager.NewManagerClient(ctx, *rootDir, fmAddr, &config.Config) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure fusemanager") + } + rs, err = snbase.NewSnapshotter(ctx, filepath.Join(*rootDir, "snapshotter"), fs, snbase.AsynchronousRemove) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter") + } + log.G(ctx).Infof("Start snapshotter with fusemanager mode") + } else { + rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config, service.WithCredsFuncs(credsFuncs...)) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter") + } + log.G(ctx).Infof("Start snapshotter with builtin mode") } cleanup, err := serve(ctx, rpc, *address, rs, config) diff --git a/cmd/stargz-fuse-manager/main.go b/cmd/stargz-fuse-manager/main.go new file mode 100644 index 000000000..f0ef0c7a5 --- /dev/null +++ b/cmd/stargz-fuse-manager/main.go @@ -0,0 +1,25 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + fusemanager "github.com/containerd/stargz-snapshotter/fusemanager" +) + +func main() { + fusemanager.Run() +} diff --git a/fs/fs.go b/fs/fs.go index cb85c244e..707701a27 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -68,6 +68,13 @@ const ( fusermountBin = "fusermount" ) +var ( + nsLock = sync.Mutex{} + + ns *metrics.Namespace + metricsCtr *fsmetrics.Controller +) + type Option func(*options) type options struct { @@ -101,13 +108,19 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F return nil, errors.Wrapf(err, "failed to setup resolver") } - var ns *metrics.Namespace - if !cfg.NoPrometheus { - ns = metrics.NewNamespace("stargz", "fs", nil) - } - c := fsmetrics.NewLayerMetrics(ns) - if ns != nil { - metrics.Register(ns) + nsLock.Lock() + defer nsLock.Unlock() + + var c *fsmetrics.Controller + if cfg.NoPrometheus { + c = fsmetrics.NewLayerMetrics(nil) + } else { + if ns == nil { + ns = metrics.NewNamespace("stargz", "fs", nil) + metrics.Register(ns) + metricsCtr = fsmetrics.NewLayerMetrics(ns) + } + c = metricsCtr } return &filesystem{ diff --git a/fusemanager/api/api.pb.go b/fusemanager/api/api.pb.go new file mode 100644 index 000000000..66017f1e1 --- /dev/null +++ b/fusemanager/api/api.pb.go @@ -0,0 +1,566 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: api.proto + +package api + +import ( + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type StatusRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusRequest) Reset() { *m = StatusRequest{} } +func (m *StatusRequest) String() string { return proto.CompactTextString(m) } +func (*StatusRequest) ProtoMessage() {} +func (*StatusRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{0} +} +func (m *StatusRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatusRequest.Unmarshal(m, b) +} +func (m *StatusRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatusRequest.Marshal(b, m, deterministic) +} +func (m *StatusRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatusRequest.Merge(m, src) +} +func (m *StatusRequest) XXX_Size() int { + return xxx_messageInfo_StatusRequest.Size(m) +} +func (m *StatusRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StatusRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StatusRequest proto.InternalMessageInfo + +type InitRequest struct { + Root string `protobuf:"bytes,1,opt,name=root,proto3" json:"root,omitempty"` + Config []byte `protobuf:"bytes,2,opt,name=config,proto3" json:"config,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *InitRequest) Reset() { *m = InitRequest{} } +func (m *InitRequest) String() string { return proto.CompactTextString(m) } +func (*InitRequest) ProtoMessage() {} +func (*InitRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{1} +} +func (m *InitRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_InitRequest.Unmarshal(m, b) +} +func (m *InitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_InitRequest.Marshal(b, m, deterministic) +} +func (m *InitRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_InitRequest.Merge(m, src) +} +func (m *InitRequest) XXX_Size() int { + return xxx_messageInfo_InitRequest.Size(m) +} +func (m *InitRequest) XXX_DiscardUnknown() { + xxx_messageInfo_InitRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_InitRequest proto.InternalMessageInfo + +func (m *InitRequest) GetRoot() string { + if m != nil { + return m.Root + } + return "" +} + +func (m *InitRequest) GetConfig() []byte { + if m != nil { + return m.Config + } + return nil +} + +type MountRequest struct { + Mountpoint string `protobuf:"bytes,1,opt,name=mountpoint,proto3" json:"mountpoint,omitempty"` + Labels map[string]string `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MountRequest) Reset() { *m = MountRequest{} } +func (m *MountRequest) String() string { return proto.CompactTextString(m) } +func (*MountRequest) ProtoMessage() {} +func (*MountRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{2} +} +func (m *MountRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MountRequest.Unmarshal(m, b) +} +func (m *MountRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MountRequest.Marshal(b, m, deterministic) +} +func (m *MountRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_MountRequest.Merge(m, src) +} +func (m *MountRequest) XXX_Size() int { + return xxx_messageInfo_MountRequest.Size(m) +} +func (m *MountRequest) XXX_DiscardUnknown() { + xxx_messageInfo_MountRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_MountRequest proto.InternalMessageInfo + +func (m *MountRequest) GetMountpoint() string { + if m != nil { + return m.Mountpoint + } + return "" +} + +func (m *MountRequest) GetLabels() map[string]string { + if m != nil { + return m.Labels + } + return nil +} + +type CheckRequest struct { + Mountpoint string `protobuf:"bytes,1,opt,name=mountpoint,proto3" json:"mountpoint,omitempty"` + Labels map[string]string `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CheckRequest) Reset() { *m = CheckRequest{} } +func (m *CheckRequest) String() string { return proto.CompactTextString(m) } +func (*CheckRequest) ProtoMessage() {} +func (*CheckRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{3} +} +func (m *CheckRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CheckRequest.Unmarshal(m, b) +} +func (m *CheckRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CheckRequest.Marshal(b, m, deterministic) +} +func (m *CheckRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CheckRequest.Merge(m, src) +} +func (m *CheckRequest) XXX_Size() int { + return xxx_messageInfo_CheckRequest.Size(m) +} +func (m *CheckRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CheckRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CheckRequest proto.InternalMessageInfo + +func (m *CheckRequest) GetMountpoint() string { + if m != nil { + return m.Mountpoint + } + return "" +} + +func (m *CheckRequest) GetLabels() map[string]string { + if m != nil { + return m.Labels + } + return nil +} + +type UnmountRequest struct { + Mountpoint string `protobuf:"bytes,1,opt,name=mountpoint,proto3" json:"mountpoint,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnmountRequest) Reset() { *m = UnmountRequest{} } +func (m *UnmountRequest) String() string { return proto.CompactTextString(m) } +func (*UnmountRequest) ProtoMessage() {} +func (*UnmountRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{4} +} +func (m *UnmountRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnmountRequest.Unmarshal(m, b) +} +func (m *UnmountRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnmountRequest.Marshal(b, m, deterministic) +} +func (m *UnmountRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnmountRequest.Merge(m, src) +} +func (m *UnmountRequest) XXX_Size() int { + return xxx_messageInfo_UnmountRequest.Size(m) +} +func (m *UnmountRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UnmountRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UnmountRequest proto.InternalMessageInfo + +func (m *UnmountRequest) GetMountpoint() string { + if m != nil { + return m.Mountpoint + } + return "" +} + +type StatusResponse struct { + Status int32 `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusResponse) Reset() { *m = StatusResponse{} } +func (m *StatusResponse) String() string { return proto.CompactTextString(m) } +func (*StatusResponse) ProtoMessage() {} +func (*StatusResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{5} +} +func (m *StatusResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatusResponse.Unmarshal(m, b) +} +func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic) +} +func (m *StatusResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatusResponse.Merge(m, src) +} +func (m *StatusResponse) XXX_Size() int { + return xxx_messageInfo_StatusResponse.Size(m) +} +func (m *StatusResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StatusResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StatusResponse proto.InternalMessageInfo + +func (m *StatusResponse) GetStatus() int32 { + if m != nil { + return m.Status + } + return 0 +} + +type Response struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{6} +} +func (m *Response) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Response.Unmarshal(m, b) +} +func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Response.Marshal(b, m, deterministic) +} +func (m *Response) XXX_Merge(src proto.Message) { + xxx_messageInfo_Response.Merge(m, src) +} +func (m *Response) XXX_Size() int { + return xxx_messageInfo_Response.Size(m) +} +func (m *Response) XXX_DiscardUnknown() { + xxx_messageInfo_Response.DiscardUnknown(m) +} + +var xxx_messageInfo_Response proto.InternalMessageInfo + +func init() { + proto.RegisterType((*StatusRequest)(nil), "fusemanager.StatusRequest") + proto.RegisterType((*InitRequest)(nil), "fusemanager.InitRequest") + proto.RegisterType((*MountRequest)(nil), "fusemanager.MountRequest") + proto.RegisterMapType((map[string]string)(nil), "fusemanager.MountRequest.LabelsEntry") + proto.RegisterType((*CheckRequest)(nil), "fusemanager.CheckRequest") + proto.RegisterMapType((map[string]string)(nil), "fusemanager.CheckRequest.LabelsEntry") + proto.RegisterType((*UnmountRequest)(nil), "fusemanager.UnmountRequest") + proto.RegisterType((*StatusResponse)(nil), "fusemanager.StatusResponse") + proto.RegisterType((*Response)(nil), "fusemanager.Response") +} + +func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) } + +var fileDescriptor_00212fb1f9d3bf1c = []byte{ + // 386 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x53, 0x51, 0x4b, 0xf3, 0x30, + 0x14, 0xa5, 0xdd, 0xd6, 0xef, 0xdb, 0xed, 0x9c, 0x12, 0x54, 0x6a, 0x05, 0x19, 0x05, 0xa1, 0x2f, + 0x6b, 0x65, 0x3e, 0xe8, 0x84, 0x3d, 0xa8, 0x28, 0x08, 0xee, 0xa5, 0xc3, 0x17, 0xdf, 0xb2, 0x92, + 0x75, 0x65, 0x6b, 0x52, 0x9b, 0x74, 0x30, 0x7f, 0x91, 0xff, 0xc5, 0x3f, 0x25, 0xcd, 0xba, 0x91, + 0x8a, 0x13, 0x84, 0xbd, 0xe5, 0x24, 0xf7, 0xdc, 0x9e, 0x7b, 0xcf, 0x29, 0x34, 0x71, 0x1a, 0x7b, + 0x69, 0xc6, 0x04, 0x43, 0xe6, 0x24, 0xe7, 0x24, 0xc1, 0x14, 0x47, 0x24, 0x73, 0xf6, 0x61, 0x6f, + 0x24, 0xb0, 0xc8, 0x79, 0x40, 0xde, 0x72, 0xc2, 0x85, 0xd3, 0x07, 0xf3, 0x89, 0xc6, 0xa2, 0x84, + 0x08, 0x41, 0x3d, 0x63, 0x4c, 0x58, 0x5a, 0x47, 0x73, 0x9b, 0x81, 0x3c, 0xa3, 0x63, 0x30, 0x42, + 0x46, 0x27, 0x71, 0x64, 0xe9, 0x1d, 0xcd, 0x6d, 0x05, 0x25, 0x72, 0x3e, 0x34, 0x68, 0x0d, 0x59, + 0x4e, 0x37, 0xe4, 0x33, 0x80, 0xa4, 0xc0, 0x29, 0x8b, 0xe9, 0xba, 0x85, 0x72, 0x83, 0x06, 0x60, + 0xcc, 0xf1, 0x98, 0xcc, 0xb9, 0xa5, 0x77, 0x6a, 0xae, 0xd9, 0x3b, 0xf7, 0x14, 0x69, 0x9e, 0xda, + 0xca, 0x7b, 0x96, 0x75, 0x0f, 0x54, 0x64, 0xcb, 0xa0, 0x24, 0xd9, 0x7d, 0x30, 0x95, 0x6b, 0x74, + 0x00, 0xb5, 0x19, 0x59, 0x96, 0x9f, 0x29, 0x8e, 0xe8, 0x10, 0x1a, 0x0b, 0x3c, 0xcf, 0x89, 0xd4, + 0xd9, 0x0c, 0x56, 0xe0, 0x46, 0xbf, 0xd6, 0xa4, 0xd4, 0xfb, 0x29, 0x09, 0x67, 0xbb, 0x91, 0xaa, + 0xb6, 0xda, 0xb5, 0xd4, 0x0b, 0x68, 0xbf, 0xd0, 0xe4, 0x0f, 0x6b, 0x75, 0x5c, 0x68, 0xaf, 0x3d, + 0xe5, 0x29, 0xa3, 0x9c, 0x14, 0x8e, 0x71, 0x79, 0x23, 0xab, 0x1b, 0x41, 0x89, 0x1c, 0x80, 0xff, + 0xeb, 0x9a, 0xde, 0xa7, 0x0e, 0xd6, 0x48, 0xe0, 0x2c, 0x7a, 0x7f, 0xcc, 0x39, 0x19, 0xae, 0x26, + 0x1b, 0x91, 0x6c, 0x11, 0x87, 0x04, 0xdd, 0x82, 0xb1, 0x6a, 0x89, 0xec, 0xca, 0xe0, 0x95, 0xec, + 0xd8, 0xa7, 0x3f, 0xbe, 0x95, 0x1a, 0xae, 0xa0, 0x5e, 0x04, 0x0b, 0x59, 0x95, 0x22, 0x25, 0x6b, + 0xf6, 0x51, 0xe5, 0x65, 0x43, 0xec, 0x43, 0x43, 0x46, 0x01, 0x9d, 0x6c, 0x8d, 0xc7, 0x2f, 0x54, + 0x69, 0xcd, 0x37, 0xaa, 0x6a, 0xd7, 0x36, 0xea, 0x00, 0xfe, 0x95, 0x6b, 0x47, 0xd5, 0xb1, 0xaa, + 0x66, 0x6c, 0xa1, 0xdf, 0xf9, 0xaf, 0xdd, 0x28, 0x16, 0xd3, 0x7c, 0xec, 0x85, 0x2c, 0xf1, 0xb9, + 0xdc, 0x6b, 0x97, 0x53, 0x9c, 0xf2, 0x29, 0x13, 0x82, 0x64, 0xbe, 0xc2, 0xf2, 0x71, 0x1a, 0x8f, + 0x0d, 0xf9, 0x73, 0x5e, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0x9d, 0x24, 0xe1, 0x41, 0xa9, 0x03, + 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// StargzFuseManagerServiceClient is the client API for StargzFuseManagerService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type StargzFuseManagerServiceClient interface { + Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Response, error) + Mount(ctx context.Context, in *MountRequest, opts ...grpc.CallOption) (*Response, error) + Check(ctx context.Context, in *CheckRequest, opts ...grpc.CallOption) (*Response, error) + Unmount(ctx context.Context, in *UnmountRequest, opts ...grpc.CallOption) (*Response, error) +} + +type stargzFuseManagerServiceClient struct { + cc *grpc.ClientConn +} + +func NewStargzFuseManagerServiceClient(cc *grpc.ClientConn) StargzFuseManagerServiceClient { + return &stargzFuseManagerServiceClient{cc} +} + +func (c *stargzFuseManagerServiceClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { + out := new(StatusResponse) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Status", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *stargzFuseManagerServiceClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Init", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *stargzFuseManagerServiceClient) Mount(ctx context.Context, in *MountRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Mount", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *stargzFuseManagerServiceClient) Check(ctx context.Context, in *CheckRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Check", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *stargzFuseManagerServiceClient) Unmount(ctx context.Context, in *UnmountRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Unmount", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// StargzFuseManagerServiceServer is the server API for StargzFuseManagerService service. +type StargzFuseManagerServiceServer interface { + Status(context.Context, *StatusRequest) (*StatusResponse, error) + Init(context.Context, *InitRequest) (*Response, error) + Mount(context.Context, *MountRequest) (*Response, error) + Check(context.Context, *CheckRequest) (*Response, error) + Unmount(context.Context, *UnmountRequest) (*Response, error) +} + +// UnimplementedStargzFuseManagerServiceServer can be embedded to have forward compatible implementations. +type UnimplementedStargzFuseManagerServiceServer struct { +} + +func (*UnimplementedStargzFuseManagerServiceServer) Status(ctx context.Context, req *StatusRequest) (*StatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") +} +func (*UnimplementedStargzFuseManagerServiceServer) Init(ctx context.Context, req *InitRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Init not implemented") +} +func (*UnimplementedStargzFuseManagerServiceServer) Mount(ctx context.Context, req *MountRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Mount not implemented") +} +func (*UnimplementedStargzFuseManagerServiceServer) Check(ctx context.Context, req *CheckRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Check not implemented") +} +func (*UnimplementedStargzFuseManagerServiceServer) Unmount(ctx context.Context, req *UnmountRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Unmount not implemented") +} + +func RegisterStargzFuseManagerServiceServer(s *grpc.Server, srv StargzFuseManagerServiceServer) { + s.RegisterService(&_StargzFuseManagerService_serviceDesc, srv) +} + +func _StargzFuseManagerService_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Status(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Status", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Status(ctx, req.(*StatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StargzFuseManagerService_Init_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(InitRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Init(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Init", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Init(ctx, req.(*InitRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StargzFuseManagerService_Mount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MountRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Mount(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Mount", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Mount(ctx, req.(*MountRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StargzFuseManagerService_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Check(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Check", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Check(ctx, req.(*CheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StargzFuseManagerService_Unmount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UnmountRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Unmount(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Unmount", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Unmount(ctx, req.(*UnmountRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _StargzFuseManagerService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "fusemanager.StargzFuseManagerService", + HandlerType: (*StargzFuseManagerServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Status", + Handler: _StargzFuseManagerService_Status_Handler, + }, + { + MethodName: "Init", + Handler: _StargzFuseManagerService_Init_Handler, + }, + { + MethodName: "Mount", + Handler: _StargzFuseManagerService_Mount_Handler, + }, + { + MethodName: "Check", + Handler: _StargzFuseManagerService_Check_Handler, + }, + { + MethodName: "Unmount", + Handler: _StargzFuseManagerService_Unmount_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "api.proto", +} diff --git a/fusemanager/api/api.proto b/fusemanager/api/api.proto new file mode 100644 index 000000000..3e13ca67a --- /dev/null +++ b/fusemanager/api/api.proto @@ -0,0 +1,58 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +option go_package = "github.com/stargz-snapshotter/fusemanager/api"; + +package fusemanager; + +service StargzFuseManagerService { + rpc Status (StatusRequest) returns (StatusResponse); + rpc Init (InitRequest) returns (Response); + rpc Mount (MountRequest) returns (Response); + rpc Check (CheckRequest) returns (Response); + rpc Unmount (UnmountRequest) returns (Response); +} + +message StatusRequest { +} + +message InitRequest { + string root = 1; + bytes config = 2; +} + +message MountRequest { + string mountpoint = 1; + map labels = 2; +} + +message CheckRequest { + string mountpoint = 1; + map labels = 2; +} + +message UnmountRequest { + string mountpoint = 1; +} + +message StatusResponse { + int32 status = 1; +} + +message Response { +} \ No newline at end of file diff --git a/fusemanager/api/generate.go b/fusemanager/api/generate.go new file mode 100644 index 000000000..2f61d5eee --- /dev/null +++ b/fusemanager/api/generate.go @@ -0,0 +1,19 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package api + +//go:generate protoc --gogo_out=paths=source_relative,plugins=grpc:. api.proto diff --git a/fusemanager/client.go b/fusemanager/client.go new file mode 100644 index 000000000..c6a5138d8 --- /dev/null +++ b/fusemanager/client.go @@ -0,0 +1,144 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fusemanager + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/containerd/containerd/defaults" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/dialer" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + + pb "github.com/containerd/stargz-snapshotter/fusemanager/api" + "github.com/containerd/stargz-snapshotter/service" + "github.com/containerd/stargz-snapshotter/snapshot" +) + +type Client struct { + client pb.StargzFuseManagerServiceClient +} + +func NewManagerClient(ctx context.Context, root, socket string, config *service.Config) (snapshot.FileSystem, error) { + timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + grpcCli, err := newClient(timeoutCtx, socket) + if err != nil { + return nil, err + } + + client := &Client{ + client: grpcCli, + } + + err = client.init(ctx, root, config) + if err != nil { + return nil, err + } + + return client, nil +} + +func newClient(ctx context.Context, socket string) (pb.StargzFuseManagerServiceClient, error) { + connParams := grpc.ConnectParams{ + Backoff: backoff.DefaultConfig, + } + gopts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.FailOnNonTempDialError(true), + grpc.WithConnectParams(connParams), + grpc.WithContextDialer(dialer.ContextDialer), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), + } + + conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix://%s", socket), gopts...) + if err != nil { + return nil, err + } + + return pb.NewStargzFuseManagerServiceClient(conn), nil +} + +func (cli *Client) init(ctx context.Context, root string, config *service.Config) error { + configBytes, err := json.Marshal(config) + if err != nil { + return err + } + + req := &pb.InitRequest{ + Root: root, + Config: configBytes, + } + + _, err = cli.client.Init(ctx, req) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Init") + return err + } + + return nil +} + +func (cli *Client) Mount(ctx context.Context, mountpoint string, labels map[string]string) error { + req := &pb.MountRequest{ + Mountpoint: mountpoint, + Labels: labels, + } + + _, err := cli.client.Mount(ctx, req) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Mount") + return err + } + + return nil +} + +func (cli *Client) Check(ctx context.Context, mountpoint string, labels map[string]string) error { + req := &pb.CheckRequest{ + Mountpoint: mountpoint, + Labels: labels, + } + + _, err := cli.client.Check(ctx, req) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Check") + return err + } + + return nil +} + +func (cli *Client) Unmount(ctx context.Context, mountpoint string) error { + req := &pb.UnmountRequest{ + Mountpoint: mountpoint, + } + + _, err := cli.client.Unmount(ctx, req) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Unmount") + return err + } + + return nil +} diff --git a/fusemanager/fusemanager.go b/fusemanager/fusemanager.go new file mode 100644 index 000000000..09cc67b98 --- /dev/null +++ b/fusemanager/fusemanager.go @@ -0,0 +1,238 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fusemanager + +import ( + "context" + "flag" + "fmt" + golog "log" + "net" + "os" + "os/exec" + "os/signal" + "path/filepath" + "syscall" + "time" + + "github.com/containerd/containerd/log" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" + "google.golang.org/grpc" + + pb "github.com/containerd/stargz-snapshotter/fusemanager/api" + "github.com/containerd/stargz-snapshotter/version" +) + +var ( + debugFlag bool + versionFlag bool + fuseStoreAddr string + address string + logLevel string + logPath string + action string +) + +func parseFlags() { + flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs") + flag.BoolVar(&versionFlag, "v", false, "show the fusemanager version and exit") + flag.StringVar(&action, "action", "", "action of fusemanager") + flag.StringVar(&fuseStoreAddr, "fusestore-path", "/var/lib/containerd-stargz-grpc/fusestore.db", "address for the fusemanager's store") + flag.StringVar(&address, "address", "/run/containerd-stargz-grpc/fuse-manager.sock", "address for the fusemanager's gRPC socket") + flag.StringVar(&logLevel, "log-level", logrus.InfoLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]") + flag.StringVar(&logPath, "log-path", "", "path to fusemanager's logs, no log recorded if empty") + + flag.Parse() +} + +func Run() { + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "failed to run fusemanager: %v", err) + os.Exit(1) + } +} + +func run() error { + parseFlags() + if versionFlag { + fmt.Printf("%s:\n", os.Args[0]) + fmt.Println(" Version: ", version.Version) + fmt.Println(" Revision:", version.Revision) + fmt.Println("") + return nil + } + + if fuseStoreAddr == "" || address == "" { + return fmt.Errorf("fusemanager fusestore and socket path cannot be empty") + } + + ctx := log.WithLogger(context.Background(), log.L) + + switch action { + case "start": + return startNew(ctx, logPath, address, fuseStoreAddr, logLevel) + default: + return runFuseManager(ctx) + } +} + +func startNew(ctx context.Context, logPath, address, fusestore, logLevel string) error { + self, err := os.Executable() + if err != nil { + return err + } + + cwd, err := os.Getwd() + if err != nil { + return err + } + + args := []string{ + "-address", address, + "-fusestore-path", fusestore, + "-log-level", logLevel, + } + + // we use shim-like approach to start new fusemanager process by self-invoking in the background + // and detach it from parent + cmd := exec.CommandContext(ctx, self, args...) + cmd.Dir = cwd + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + if logPath != "" { + err := os.Remove(logPath) + if err != nil && !os.IsNotExist(err) { + return err + } + file, err := os.Create(logPath) + if err != nil { + return err + } + cmd.Stdout = file + cmd.Stderr = file + } + + if err := cmd.Start(); err != nil { + return err + } + go cmd.Wait() + + if ready, err := waitUntilReady(ctx, 10); err != nil || !ready { + if err != nil { + return errors.Wrapf(err, "failed to start new fusemanager") + } + if !ready { + return errors.Errorf("failed to start new fusemanager, fusemanager not ready") + } + } + + return nil +} + +// waitUntilReady waits until fusemanager is ready to accept requests with timeout +func waitUntilReady(ctx context.Context, timeout int) (bool, error) { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) + defer cancel() + grpcCli, err := newClient(timeoutCtx, address) + if err != nil { + return false, err + } + + resp, err := grpcCli.Status(ctx, &pb.StatusRequest{}) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Status") + return false, err + } + + if resp.Status == FuseManagerNotReady { + return false, nil + } + + return true, nil +} + +func runFuseManager(ctx context.Context) error { + lvl, err := logrus.ParseLevel(logLevel) + if err != nil { + log.L.WithError(err).Fatal("failed to prepare logger") + } + + logrus.SetLevel(lvl) + logrus.SetFormatter(&logrus.JSONFormatter{ + TimestampFormat: log.RFC3339NanoFixed, + }) + + golog.SetOutput(log.G(ctx).WriterLevel(logrus.DebugLevel)) + + // Prepare the directory for the socket + if err := os.MkdirAll(filepath.Dir(address), 0700); err != nil { + log.G(ctx).WithError(err).Errorf("failed to create directory %s", filepath.Dir(address)) + return err + } + + // Try to remove the socket file to avoid EADDRINUSE + if err := os.Remove(address); err != nil && !os.IsNotExist(err) { + log.G(ctx).WithError(err).Error("failed to remove old socket file") + return err + } + + l, err := net.Listen("unix", address) + if err != nil { + log.G(ctx).WithError(err).Error("failed to listen socket") + return err + } + + server := grpc.NewServer() + fm, err := NewFuseManager(ctx, l, server, fuseStoreAddr) + if err != nil { + log.G(ctx).WithError(err).Error("failed to configure manager server") + return err + } + + err = fm.clearMounts(ctx) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to clear mounts") + return err + } + + pb.RegisterStargzFuseManagerServiceServer(server, fm) + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, unix.SIGINT, unix.SIGTERM) + go func() { + sig := <-sigCh + log.G(ctx).Infof("Got %v", sig) + fm.server.Stop() + }() + + if err = server.Serve(l); err != nil { + log.G(ctx).WithError(err).Error("failed to serve fuse manager") + return err + } + + err = fm.Close(ctx) + if err != nil { + log.G(ctx).WithError(err).Error("failed to close fusemanager") + return err + } + + return err +} diff --git a/fusemanager/fusestore.go b/fusemanager/fusestore.go new file mode 100644 index 000000000..ebef19d88 --- /dev/null +++ b/fusemanager/fusestore.go @@ -0,0 +1,123 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fusemanager + +import ( + "context" + "encoding/json" + + bolt "go.etcd.io/bbolt" + + "github.com/containerd/stargz-snapshotter/service" +) + +var ( + fuseInfoBucket = []byte("fuse-info-bucket") +) + +type fuseInfo struct { + Root string + Mountpoint string + Labels map[string]string + Config service.Config +} + +func (fm *Server) storeFuseInfo(fuseInfo *fuseInfo) error { + return fm.ms.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(fuseInfoBucket) + if err != nil { + return err + } + + key := []byte(fuseInfo.Mountpoint) + + val, err := json.Marshal(fuseInfo) + if err != nil { + return err + } + + err = bucket.Put(key, val) + if err != nil { + return err + } + + return nil + }) +} + +func (fm *Server) removeFuseInfo(fuseInfo *fuseInfo) error { + return fm.ms.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(fuseInfoBucket) + if err != nil { + return err + } + + key := []byte(fuseInfo.Mountpoint) + + err = bucket.Delete(key) + if err != nil { + return err + } + + return nil + }) +} + +// restoreFuseInfo restores fuseInfo when Init is called, it will skip mounted +// layers whose mountpoint can be found in fsMap +func (fm *Server) restoreFuseInfo(ctx context.Context) error { + return fm.ms.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(fuseInfoBucket) + if bucket == nil { + return nil + } + + return bucket.ForEach(func(_, v []byte) error { + mi := &fuseInfo{} + err := json.Unmarshal(v, mi) + if err != nil { + return err + } + + return fm.mount(ctx, mi.Mountpoint, mi.Labels) + }) + }) +} + +func (fm *Server) listMountpoints(ctx context.Context) ([]string, error) { + mountpoints := make([]string, 0) + err := fm.ms.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(fuseInfoBucket) + if bucket == nil { + return nil + } + return bucket.ForEach(func(_, v []byte) error { + mi := &fuseInfo{} + err := json.Unmarshal(v, mi) + if err != nil { + return err + } + mountpoints = append(mountpoints, mi.Mountpoint) + return nil + }) + }) + if err != nil { + return nil, err + } + + return mountpoints, nil +} diff --git a/fusemanager/service.go b/fusemanager/service.go new file mode 100644 index 000000000..7f2592336 --- /dev/null +++ b/fusemanager/service.go @@ -0,0 +1,330 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fusemanager + +import ( + "context" + "encoding/json" + "fmt" + "net" + "os" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/containerd/containerd/defaults" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/dialer" + "github.com/moby/sys/mountinfo" + "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + + pb "github.com/containerd/stargz-snapshotter/fusemanager/api" + "github.com/containerd/stargz-snapshotter/service" + "github.com/containerd/stargz-snapshotter/service/keychain/cri" + "github.com/containerd/stargz-snapshotter/service/keychain/dockerconfig" + "github.com/containerd/stargz-snapshotter/service/keychain/kubeconfig" + "github.com/containerd/stargz-snapshotter/service/resolver" + "github.com/containerd/stargz-snapshotter/snapshot" +) + +const ( + FuseManagerNotReady = iota + FuseManagerWaitInit + FuseManagerReady + + defaultImageServiceAddress = "/run/containerd/containerd.sock" +) + +type Server struct { + pb.UnimplementedStargzFuseManagerServiceServer + + lock sync.RWMutex + status int32 + + listener net.Listener + server *grpc.Server + + // root is the latest root passed from containerd-stargz-grpc + root string + // config is the latest config passed from containerd-stargz-grpc + config *service.Config + // fsMap maps mountpoint to its filesystem instance to ensure Mount/Check/Unmount + // call the proper filesystem + fsMap sync.Map + // curFs is filesystem created by latest config + curFs snapshot.FileSystem + ms *bolt.DB +} + +func NewFuseManager(ctx context.Context, listener net.Listener, server *grpc.Server, fuseStoreAddr string) (*Server, error) { + if err := os.MkdirAll(filepath.Dir(fuseStoreAddr), 0700); err != nil { + return nil, errors.Wrapf(err, "failed to create directory %q", filepath.Dir(fuseStoreAddr)) + } + + db, err := bolt.Open(fuseStoreAddr, 0666, &bolt.Options{Timeout: 10 * time.Second, ReadOnly: false}) + if err != nil { + return nil, errors.Wrap(err, "failed to configure fusestore") + } + + fm := &Server{ + status: FuseManagerWaitInit, + lock: sync.RWMutex{}, + fsMap: sync.Map{}, + ms: db, + listener: listener, + server: server, + } + + return fm, nil +} + +func (fm *Server) Status(ctx context.Context, _ *pb.StatusRequest) (*pb.StatusResponse, error) { + fm.lock.RLock() + defer fm.lock.RUnlock() + + return &pb.StatusResponse{ + Status: fm.status, + }, nil +} + +func (fm *Server) Init(ctx context.Context, req *pb.InitRequest) (*pb.Response, error) { + fm.lock.Lock() + fm.status = FuseManagerWaitInit + defer func() { + fm.status = FuseManagerReady + fm.lock.Unlock() + }() + + ctx = log.WithLogger(ctx, log.G(ctx)) + + config := &service.Config{} + err := json.Unmarshal(req.Config, config) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to get config") + return &pb.Response{}, err + } + fm.root = req.Root + fm.config = config + + // Configure keychain + credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)} + if config.KubeconfigKeychainConfig.EnableKeychain { + var opts []kubeconfig.Option + if kcp := config.KubeconfigKeychainConfig.KubeconfigPath; kcp != "" { + opts = append(opts, kubeconfig.WithKubeconfigPath(kcp)) + } + credsFuncs = append(credsFuncs, kubeconfig.NewKubeconfigKeychain(ctx, opts...)) + } + if config.CRIKeychainConfig.EnableKeychain { + // connects to the backend CRI service (defaults to containerd socket) + criAddr := defaultImageServiceAddress + if cp := config.CRIKeychainConfig.ImageServicePath; cp != "" { + criAddr = cp + } + connectCRI := func() (runtime.ImageServiceClient, error) { + // TODO: make gRPC options configurable from config.toml + backoffConfig := backoff.DefaultConfig + backoffConfig.MaxDelay = 3 * time.Second + connParams := grpc.ConnectParams{ + Backoff: backoffConfig, + } + gopts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithConnectParams(connParams), + grpc.WithContextDialer(dialer.ContextDialer), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), + } + conn, err := grpc.Dial(dialer.DialAddress(criAddr), gopts...) + if err != nil { + return nil, err + } + return runtime.NewImageServiceClient(conn), nil + } + f, _ := cri.NewCRIKeychain(ctx, connectCRI) + credsFuncs = append(credsFuncs, f) + } + + opts := []service.Option{service.WithCredsFuncs(credsFuncs...)} + + fs, err := service.NewFileSystem(ctx, fm.root, fm.config, opts...) + if err != nil { + return &pb.Response{}, err + } + fm.curFs = fs + + err = fm.restoreFuseInfo(ctx) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to restore fuse info") + return &pb.Response{}, err + } + + return &pb.Response{}, nil +} + +func (fm *Server) Mount(ctx context.Context, req *pb.MountRequest) (*pb.Response, error) { + fm.lock.RLock() + defer fm.lock.RUnlock() + if fm.status != FuseManagerReady { + return &pb.Response{}, fmt.Errorf("fuse manager not ready") + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint)) + + err := fm.mount(ctx, req.Mountpoint, req.Labels) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to mount stargz") + return &pb.Response{}, err + } + + fm.storeFuseInfo(&fuseInfo{ + Root: fm.root, + Mountpoint: req.Mountpoint, + Labels: req.Labels, + Config: *fm.config, + }) + + return &pb.Response{}, nil +} + +func (fm *Server) Check(ctx context.Context, req *pb.CheckRequest) (*pb.Response, error) { + fm.lock.RLock() + defer fm.lock.RUnlock() + if fm.status != FuseManagerReady { + return &pb.Response{}, fmt.Errorf("fuse manager not ready") + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint)) + + obj, found := fm.fsMap.Load(req.Mountpoint) + if !found { + err := fmt.Errorf("failed to find filesystem of mountpoint %s", req.Mountpoint) + log.G(ctx).WithError(err).Errorf("failed to check filesystem") + return &pb.Response{}, err + } + + fs := obj.(snapshot.FileSystem) + err := fs.Check(ctx, req.Mountpoint, req.Labels) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to check filesystem") + return &pb.Response{}, err + } + + return &pb.Response{}, nil +} + +func (fm *Server) Unmount(ctx context.Context, req *pb.UnmountRequest) (*pb.Response, error) { + fm.lock.RLock() + defer fm.lock.RUnlock() + if fm.status != FuseManagerReady { + return &pb.Response{}, fmt.Errorf("fuse manager not ready") + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint)) + + obj, found := fm.fsMap.Load(req.Mountpoint) + if !found { + // check whether already unmounted + mounts, err := mountinfo.GetMounts(func(info *mountinfo.Info) (skip, stop bool) { + if info.Mountpoint == req.Mountpoint { + return false, true + } + return true, false + }) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to get mount info") + return &pb.Response{}, err + } + + if len(mounts) <= 0 { + return &pb.Response{}, nil + } + err = fmt.Errorf("failed to find filesystem of mountpoint %s", req.Mountpoint) + log.G(ctx).WithError(err).Errorf("failed to unmount filesystem") + return &pb.Response{}, err + } + + fs := obj.(snapshot.FileSystem) + err := fs.Unmount(ctx, req.Mountpoint) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to unmount filesystem") + return &pb.Response{}, err + } + + fm.fsMap.Delete(req.Mountpoint) + fm.removeFuseInfo(&fuseInfo{ + Mountpoint: req.Mountpoint, + }) + + return &pb.Response{}, nil +} + +func (fm *Server) Close(ctx context.Context) error { + fm.lock.Lock() + defer fm.lock.Unlock() + fm.status = FuseManagerNotReady + + err := fm.clearMounts(ctx) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to clear mounts") + return err + } + + err = fm.ms.Close() + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to close fusestore") + return err + } + + return nil +} + +func (fm *Server) mount(ctx context.Context, mountpoint string, labels map[string]string) error { + // mountpoint in fsMap means layer is already mounted, skip it + if _, found := fm.fsMap.Load(mountpoint); found { + return nil + } + + err := fm.curFs.Mount(ctx, mountpoint, labels) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to mount stargz") + return err + } + + fm.fsMap.Store(mountpoint, fm.curFs) + return nil +} + +func (fm *Server) clearMounts(ctx context.Context) error { + mountpoints, err := fm.listMountpoints(ctx) + if err != nil { + return err + } + + for _, mp := range mountpoints { + if err := syscall.Unmount(mp, syscall.MNT_FORCE); err != nil { + return err + } + } + + return nil +} diff --git a/go.mod b/go.mod index 2b201367a..be7799e9d 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/docker/docker v20.10.7+incompatible // indirect github.com/docker/docker-credential-helpers v0.6.4 // indirect github.com/docker/go-metrics v0.0.1 + github.com/gogo/protobuf v1.3.2 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/hanwen/go-fuse/v2 v2.1.0 github.com/hashicorp/go-multierror v1.1.1 @@ -28,6 +29,7 @@ require ( github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 // indirect github.com/urfave/cli v1.22.2 + go.etcd.io/bbolt v1.3.5 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a golang.org/x/sys v0.0.0-20210324051608-47abb6519492 google.golang.org/grpc v1.35.0 diff --git a/service/service.go b/service/service.go index 90b26d72f..341f1ccb7 100644 --- a/service/service.go +++ b/service/service.go @@ -18,6 +18,8 @@ package service import ( "context" + "os" + "os/exec" "path/filepath" "github.com/containerd/containerd/log" @@ -25,6 +27,7 @@ import ( stargzfs "github.com/containerd/stargz-snapshotter/fs" "github.com/containerd/stargz-snapshotter/fs/source" "github.com/containerd/stargz-snapshotter/service/resolver" + "github.com/containerd/stargz-snapshotter/snapshot" snbase "github.com/containerd/stargz-snapshotter/snapshot" "github.com/containerd/stargz-snapshotter/snapshot/overlayutils" "github.com/hashicorp/go-multierror" @@ -53,6 +56,16 @@ func WithCustomRegistryHosts(hosts source.RegistryHosts) Option { // NewStargzSnapshotterService returns stargz snapshotter. func NewStargzSnapshotterService(ctx context.Context, root string, config *Config, opts ...Option) (snapshots.Snapshotter, error) { + fs, err := NewFileSystem(ctx, root, config, opts...) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure filesystem") + } + + return snbase.NewSnapshotter(ctx, snapshotterRoot(root), + fs, snbase.AsynchronousRemove, snbase.RestoreSnapshots, snbase.CleanupCommitted) +} + +func NewFileSystem(ctx context.Context, root string, config *Config, opts ...Option) (snapshot.FileSystem, error) { var sOpts options for _, o := range opts { o(&sOpts) @@ -73,10 +86,43 @@ func NewStargzSnapshotterService(ctx context.Context, root string, config *Confi )), ) if err != nil { - log.G(ctx).WithError(err).Fatalf("failed to configure filesystem") + return nil, err + } + + return fs, nil +} + +func StartFuseManager(ctx context.Context, executable, address, fusestore, logLevel, logPath string) error { + // if socket exists, do not start it + if _, err := os.Stat(address); err == nil { + return nil + } else if !os.IsNotExist(err) { + return err + } + + if _, err := os.Stat(executable); err != nil { + log.G(ctx).WithError(err).Errorf("failed to stat fusemanager binary: %s", executable) + return err + } + + args := []string{ + "-action", "start", + "-address", address, + "-fusestore-path", fusestore, + "-log-level", logLevel, + "-log-path", logPath, + } + + cmd := exec.Command(executable, args...) + if err := cmd.Start(); err != nil { + return err + } + + if err := cmd.Wait(); err != nil { + return err } - return snbase.NewSnapshotter(ctx, snapshotterRoot(root), fs, snbase.AsynchronousRemove) + return nil } func snapshotterRoot(root string) string { diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index 3f6baca2d..440679f8e 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -73,7 +73,9 @@ type FileSystem interface { // SnapshotterConfig is used to configure the remote snapshotter instance type SnapshotterConfig struct { - asyncRemove bool + asyncRemove bool + cleanupCommitted bool + restoreSnapshots bool } // Opt is an option to configure the remote snapshotter @@ -88,10 +90,24 @@ func AsynchronousRemove(config *SnapshotterConfig) error { return nil } +// CleanupCommitted cleans committed snapshots when closing snapshotter +func CleanupCommitted(config *SnapshotterConfig) error { + config.cleanupCommitted = true + return nil +} + +// RestoreSnapshots indicates whether restore snapshots when +// starting snapshotter +func RestoreSnapshots(config *SnapshotterConfig) error { + config.restoreSnapshots = true + return nil +} + type snapshotter struct { - root string - ms *storage.MetaStore - asyncRemove bool + root string + ms *storage.MetaStore + asyncRemove bool + cleanupCommitted bool // fs is a filesystem that this snapshotter recognizes. fs FileSystem @@ -146,8 +162,10 @@ func NewSnapshotter(ctx context.Context, root string, targetFs FileSystem, opts userxattr: userxattr, } - if err := o.restoreRemoteSnapshot(ctx); err != nil { - return nil, errors.Wrap(err, "failed to restore remote snapshot") + if config.restoreSnapshots { + if err := o.restoreRemoteSnapshot(ctx); err != nil { + return nil, errors.Wrap(err, "failed to restore remote snapshot") + } } return o, nil @@ -632,9 +650,8 @@ func (o *snapshotter) workPath(id string) string { // Close closes the snapshotter func (o *snapshotter) Close() error { // unmount all mounts including Committed - const cleanupCommitted = true ctx := context.Background() - if err := o.cleanup(ctx, cleanupCommitted); err != nil { + if err := o.cleanup(ctx, o.cleanupCommitted); err != nil { log.G(ctx).WithError(err).Warn("failed to cleanup") } return o.ms.Close() diff --git a/snapshot/snapshot_test.go b/snapshot/snapshot_test.go index 902122f44..448f6dc00 100644 --- a/snapshot/snapshot_test.go +++ b/snapshot/snapshot_test.go @@ -60,7 +60,7 @@ func TestRemotePrepare(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(root) - sn, err := NewSnapshotter(context.TODO(), root, bindFileSystem(t)) + sn, err := NewSnapshotter(context.TODO(), root, bindFileSystem(t), CleanupCommitted, RestoreSnapshots) if err != nil { t.Fatalf("failed to make new remote snapshotter: %q", err) } @@ -111,7 +111,7 @@ func TestRemoteOverlay(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(root) - sn, err := NewSnapshotter(context.TODO(), root, bindFileSystem(t)) + sn, err := NewSnapshotter(context.TODO(), root, bindFileSystem(t), CleanupCommitted, RestoreSnapshots) if err != nil { t.Fatalf("failed to make new remote snapshotter: %q", err) } @@ -170,7 +170,7 @@ func TestRemoteCommit(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(root) - sn, err := NewSnapshotter(context.TODO(), root, bindFileSystem(t)) + sn, err := NewSnapshotter(context.TODO(), root, bindFileSystem(t), CleanupCommitted, RestoreSnapshots) if err != nil { t.Fatalf("failed to make new remote snapshotter: %q", err) } @@ -313,7 +313,7 @@ func TestFailureDetection(t *testing.T) { } defer os.RemoveAll(root) fi := bindFileSystem(t) - sn, err := NewSnapshotter(context.TODO(), root, fi) + sn, err := NewSnapshotter(context.TODO(), root, fi, CleanupCommitted, RestoreSnapshots) if err != nil { t.Fatalf("failed to make new Snapshotter: %q", err) } @@ -439,7 +439,8 @@ func (fs *dummyFs) Unmount(ctx context.Context, mountpoint string) error { // Tests backword-comaptibility of overlayfs snapshotter. func newSnapshotter(ctx context.Context, root string) (snapshots.Snapshotter, func() error, error) { - snapshotter, err := NewSnapshotter(context.TODO(), root, dummyFileSystem()) + snapshotter, err := NewSnapshotter(context.TODO(), root, dummyFileSystem(), + CleanupCommitted, RestoreSnapshots) if err != nil { return nil, nil, err }