Skip to content

Commit

Permalink
Add fuse-manager
Browse files Browse the repository at this point in the history
Signed-off-by: Zuti He <ilyeeelihe@gmail.com>
  • Loading branch information
ilyee committed May 19, 2021
1 parent 2f7f3ea commit ccd8af2
Show file tree
Hide file tree
Showing 14 changed files with 2,399 additions and 56 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ VERSION=$(shell git describe --match 'v[0-9]*' --dirty='.m' --always --tags)
REVISION=$(shell git rev-parse HEAD)$(shell if ! git diff --no-ext-diff --quiet --exit-code; then echo .m; fi)
GO_LD_FLAGS=-ldflags '-s -w -X $(PKG)/version.Version=$(VERSION) -X $(PKG)/version.Revision=$(REVISION) $(GO_EXTRA_LDFLAGS)'

CMD=containerd-stargz-grpc ctr-remote stargz-store
CMD=containerd-stargz-grpc ctr-remote stargz-store fuse-manager

CMD_BINARIES=$(addprefix $(PREFIX),$(CMD))

Expand All @@ -44,6 +44,9 @@ ctr-remote: FORCE
stargz-store: FORCE
GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./cmd/stargz-store

fuse-manager: FORCE
GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./cmd/fuse-manager

check:
@echo "$@"
@GO111MODULE=$(GO111MODULE_VALUE) golangci-lint run
Expand Down
41 changes: 29 additions & 12 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"github.com/containerd/containerd/contrib/snapshotservice"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/stargz-snapshotter/fusemanager"
"github.com/containerd/stargz-snapshotter/service"
snbase "github.com/containerd/stargz-snapshotter/snapshot"
"github.com/containerd/stargz-snapshotter/version"
sddaemon "github.com/coreos/go-systemd/v22/daemon"
metrics "github.com/docker/go-metrics"
Expand All @@ -43,18 +45,21 @@ import (
)

const (
defaultAddress = "/run/containerd-stargz-grpc/containerd-stargz-grpc.sock"
defaultConfigPath = "/etc/containerd-stargz-grpc/config.toml"
defaultLogLevel = logrus.InfoLevel
defaultRootDir = "/var/lib/containerd-stargz-grpc"
defaultAddress = "/run/containerd-stargz-grpc/containerd-stargz-grpc.sock"
defaultConfigPath = "/etc/containerd-stargz-grpc/config.toml"
defaultLogLevel = logrus.InfoLevel
defaultRootDir = "/var/lib/containerd-stargz-grpc"
defaultFuseManagerAddr = "unix:///run/containerd-stargz-grpc/fuse-manager.sock"
)

var (
address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server")
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
printVersion = flag.Bool("version", false, "print the version")
address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server")
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
fuseManagerAddr = flag.String("fusemanager-address", defaultFuseManagerAddr, "address for the fuse manager's GRPC server")
enableFuseManager = flag.Bool("enable-fusemanager", false, "whether enable fuse manager")
printVersion = flag.Bool("version", false, "print the version")
)

type snapshotterConfig struct {
Expand Down Expand Up @@ -97,9 +102,21 @@ func main() {
log.G(ctx).WithError(err).Fatalf("snapshotter is not supported")
}

rs, err := service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
var rs snapshots.Snapshotter
if *enableFuseManager {
fs, err := fusemanager.NewManagerClient(ctx, *rootDir, *fuseManagerAddr, &config.Config)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure fuse manager")
}
rs, err = snbase.NewSnapshotter(ctx, *rootDir, fs, snbase.AsynchronousRemove)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
}
} else {
rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
}
}

cleanup, err := serve(ctx, *address, rs, config)
Expand Down
128 changes: 128 additions & 0 deletions cmd/fuse-manager/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"flag"
golog "log"
"net"
"os"
"os/signal"
"path/filepath"

"github.com/containerd/containerd/log"
sddaemon "github.com/coreos/go-systemd/v22/daemon"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
"google.golang.org/grpc"

"github.com/containerd/stargz-snapshotter/fusemanager"
pb "github.com/containerd/stargz-snapshotter/fusemanager/api"
)

const (
defaultStoreAddr = "/var/lib/containerd-stargz-grpc/fusestore.db"
defaultSockerAddr = "/run/containerd-stargz-grpc/fuse-manager.sock"
defaultLogLevel = logrus.InfoLevel
)

var (
fuseStoreAddr = flag.String("fusestore", defaultStoreAddr, "address for the fusemanager's store")
socketAddr = flag.String("socket", defaultSockerAddr, "address for the fusemanager's gRPC socket")
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
)

func main() {
flag.Parse()

lvl, err := logrus.ParseLevel(*logLevel)
if err != nil {
log.L.WithError(err).Fatal("failed to prepare logger")
}

logrus.SetLevel(lvl)
logrus.SetFormatter(&logrus.JSONFormatter{
TimestampFormat: log.RFC3339NanoFixed,
})

ctx := log.WithLogger(context.Background(), log.L)

golog.SetOutput(log.G(ctx).WriterLevel(logrus.DebugLevel))

sigCh := make(chan os.Signal, 1)
errCh := make(chan error, 1)
server := grpc.NewServer()

signal.Notify(sigCh, unix.SIGINT, unix.SIGTERM)
go func() {
select {
case sig := <-sigCh:
log.G(ctx).Infof("Got %v", sig)
server.Stop()
case err := <-errCh:
log.G(ctx).WithError(err).Fatal("failed to run stargz fuse manager")
}
}()

// Prepare the directory for the socket
if err := os.MkdirAll(filepath.Dir(*socketAddr), 0700); err != nil {
log.G(ctx).WithError(err).Errorf("failed to create directory %s", filepath.Dir(*socketAddr))
errCh <- &net.DNSConfigError{}
}

// Try to remove the socket file to avoid EADDRINUSE
if err := os.Remove(*socketAddr); err != nil && !os.IsNotExist(err) {
log.G(ctx).WithError(err).Error("failed to remove old socket file")
errCh <- err
}

l, err := net.Listen("unix", *socketAddr)
if err != nil {
log.G(ctx).WithError(err).Error("failed to listen socket")
errCh <- err
}

if os.Getenv("NOTIFY_SOCKET") != "" {
notified, notifyErr := sddaemon.SdNotify(false, sddaemon.SdNotifyReady)
log.G(ctx).Debugf("SdNotifyReady notified=%v, err=%v", notified, notifyErr)
}
defer func() {
if os.Getenv("NOTIFY_SOCKET") != "" {
notified, notifyErr := sddaemon.SdNotify(false, sddaemon.SdNotifyStopping)
log.G(ctx).Debugf("SdNotifyStopping notified=%v, err=%v", notified, notifyErr)
}
}()

fm, err := fusemanager.NewFuseManager(ctx, *fuseStoreAddr)
if err != nil {
log.G(ctx).WithError(err).Error("failed to configure manager server")
errCh <- err
}

pb.RegisterFileSystemServiceServer(server, fm)

if err := server.Serve(l); err != nil {
log.G(ctx).WithError(err).Error("failed to serve fuse manager")
errCh <- err
}

err = fm.Close(ctx)
if err != nil {
log.G(ctx).WithError(err).Fatal("failed to close fusemanager")
}
}
27 changes: 20 additions & 7 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ import (

const defaultMaxConcurrency = 2

var (
nsLock = sync.Mutex{}

ns *metrics.Namespace
metricsCtr *fsmetrics.Controller
)

type Option func(*options)

type options struct {
Expand Down Expand Up @@ -95,13 +102,19 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
return nil, errors.Wrapf(err, "failed to setup resolver")
}

var ns *metrics.Namespace
if !cfg.NoPrometheus {
ns = metrics.NewNamespace("stargz", "fs", nil)
}
c := fsmetrics.NewLayerMetrics(ns)
if ns != nil {
metrics.Register(ns)
nsLock.Lock()
defer nsLock.Unlock()

var c *fsmetrics.Controller
if cfg.NoPrometheus {
c = fsmetrics.NewLayerMetrics(nil)
} else {
if ns == nil {
ns = metrics.NewNamespace("stargz", "fs", nil)
metrics.Register(ns)
metricsCtr = fsmetrics.NewLayerMetrics(ns)
}
c = metricsCtr
}

return &filesystem{
Expand Down

0 comments on commit ccd8af2

Please sign in to comment.