Skip to content

Commit

Permalink
runc/adapter: volumes can be marked as shared
Browse files Browse the repository at this point in the history
If a volume is marked as shared then any updates to mounts inside it
will be reflected in all mount namespaces rather than just reflecting
the state of the mounts at job start time.

In order to avoid race conditions when creating and mounting the volume
host-level locking wraps all sharing operations.

The syntax for enabling shared volumes is:

  processes:
  - name : ...
    ...
    additional_volumes:
    - path: /var/vcap/data/shared
      shared: true
  • Loading branch information
Christopher Brown committed May 22, 2019
1 parent cb91ffe commit 8140ecb
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 17 deletions.
1 change: 1 addition & 0 deletions packages/bpm/spec
Expand Up @@ -22,6 +22,7 @@ files:
- bpm/runc/client/*.go # gosub
- bpm/runc/lifecycle/*.go # gosub
- bpm/runc/specbuilder/*.go # gosub
- bpm/sharedvolume/*.go # gosub
- bpm/sysfeat/*.go # gosub
- bpm/usertools/*.go # gosub
- bpm/vendor/code.cloudfoundry.org/bytefmt/*.go # gosub
Expand Down
19 changes: 11 additions & 8 deletions src/bpm/commands/root.go
Expand Up @@ -33,6 +33,7 @@ import (
"bpm/runc/adapter"
"bpm/runc/client"
"bpm/runc/lifecycle"
"bpm/sharedvolume"
"bpm/sysfeat"
"bpm/usertools"
)
Expand All @@ -46,6 +47,7 @@ var (
userFinder = usertools.NewUserFinder()
boshEnv = bosh.NewEnv(os.Getenv("BPM_BOSH_ROOT"))

locks *hostlock.Handle
lifecycleLock hostlock.LockedLock
)

Expand Down Expand Up @@ -78,6 +80,13 @@ func rootPre(cmd *cobra.Command, _ []string) error {
return errors.New("bpm must be run as root. Please run 'sudo -i' to become the root user.")
}

lockDir := filepath.Dir(config.LocksPath(boshEnv))
if err := os.MkdirAll(lockDir, 0700); err != nil {
return err
}

locks = hostlock.NewHandle(lockDir)

if !isRunningSystemd() {
return cgroups.Setup()
}
Expand Down Expand Up @@ -141,13 +150,6 @@ func acquireLifecycleLock() error {
l.Info("starting")
defer l.Info("complete")

lockDir := filepath.Dir(config.LocksPath(boshEnv))
if err := os.MkdirAll(lockDir, 0700); err != nil {
l.Error("failed-to-create-lock-dir", err)
return err
}

locks := hostlock.NewHandle(lockDir)
var err error
lifecycleLock, err = locks.LockJob(bpmCfg.JobName(), bpmCfg.ProcName())
if err != nil {
Expand Down Expand Up @@ -181,7 +183,8 @@ func newRuncLifecycle() (*lifecycle.RuncLifecycle, error) {
if err != nil {
return nil, fmt.Errorf("failed to fetch system features: %q", err)
}
runcAdapter := adapter.NewRuncAdapter(*features, filepath.Glob)

runcAdapter := adapter.NewRuncAdapter(*features, filepath.Glob, sharedvolume.MakeShared, locks)
clock := clock.NewClock()

return lifecycle.NewRuncLifecycle(
Expand Down
3 changes: 3 additions & 0 deletions src/bpm/config/job_config.go
Expand Up @@ -61,6 +61,7 @@ type Volume struct {
Writable bool `yaml:"writable"`
AllowExecutions bool `yaml:"allow_executions"`
MountOnly bool `yaml:"mount_only"`
Shared bool `yaml:"shared"`
}

type Unsafe struct {
Expand Down Expand Up @@ -160,6 +161,8 @@ func (c *ProcessConfig) AddVolumes(
v.MountOnly = true
case "allow_executions":
v.AllowExecutions = true
case "shared":
v.Shared = true
default:
return fmt.Errorf("invalid volume option: %s", option)
}
Expand Down
8 changes: 7 additions & 1 deletion src/bpm/config/job_config_test.go
Expand Up @@ -57,6 +57,7 @@ var _ = Describe("Config", func() {
config.Volume{Path: "/var/vcap/data/program/foobar", Writable: true},
config.Volume{Path: "/var/vcap/data/alternate-program"},
config.Volume{Path: "/var/vcap/data/jna-tmp", Writable: true, AllowExecutions: true},
config.Volume{Path: "/var/vcap/data/shared", Shared: true},
))
Expect(cfg.Processes[0].Hooks.PreStart).To(Equal("/var/vcap/jobs/program/bin/pre"))
Expect(cfg.Processes[0].Capabilities).To(ConsistOf("NET_BIND_SERVICE", "SYS_TIME"))
Expand Down Expand Up @@ -206,13 +207,14 @@ var _ = Describe("Config", func() {
"/path/to/data/volume3:mount_only",
"/path/to/store/volume4:allow_executions",
"/path/to/data/volume5:writable,mount_only,allow_executions",
"/path/to/data/volume6:shared",
},
boshEnv,
[]string{},
)
Expect(err).NotTo(HaveOccurred())

Expect(cfg.AdditionalVolumes).To(HaveLen(5))
Expect(cfg.AdditionalVolumes).To(HaveLen(6))
Expect(cfg.AdditionalVolumes).To(ContainElement(config.Volume{
Path: "/path/to/data/volume1",
}))
Expand All @@ -234,6 +236,10 @@ var _ = Describe("Config", func() {
MountOnly: true,
AllowExecutions: true,
}))
Expect(cfg.AdditionalVolumes).To(ContainElement(config.Volume{
Path: "/path/to/data/volume6",
Shared: true,
}))
})

Context("when the volume definition contains an invalid option", func() {
Expand Down
4 changes: 4 additions & 0 deletions src/bpm/config/testdata/example.yml
Expand Up @@ -19,6 +19,8 @@ processes:
- path: /var/vcap/data/jna-tmp
writable: true
allow_executions: true
- path: /var/vcap/data/shared
shared: true
hooks:
pre_start: /var/vcap/jobs/program/bin/pre
capabilities:
Expand All @@ -37,8 +39,10 @@ processes:
- path: /foobar
writable: true
allow_executions: true

- name: second-process
executable: /I/AM/A/SECOND-EXECUTABLE

- name: third-process
executable: /I/AM/A/THIRD-EXECUTABLE
hooks: {}
44 changes: 39 additions & 5 deletions src/bpm/runc/adapter/adapter.go
Expand Up @@ -26,6 +26,7 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"

"bpm/config"
"bpm/hostlock"
"bpm/runc/specbuilder"
"bpm/sysfeat"
)
Expand All @@ -39,15 +40,25 @@ const (
// of paths or an error if the search failed.
type GlobFunc func(string) ([]string, error)

type MountShare func(string) error

type VolumeLocker interface {
LockVolume(string) (hostlock.LockedLock, error)
}

type RuncAdapter struct {
features sysfeat.Features
glob GlobFunc
features sysfeat.Features
glob GlobFunc
shareMount MountShare
locker VolumeLocker
}

func NewRuncAdapter(features sysfeat.Features, glob GlobFunc) *RuncAdapter {
func NewRuncAdapter(features sysfeat.Features, glob GlobFunc, mountSharer MountShare, locker VolumeLocker) *RuncAdapter {
return &RuncAdapter{
features: features,
glob: glob,
features: features,
glob: glob,
shareMount: mountSharer,
locker: locker,
}
}

Expand All @@ -67,6 +78,12 @@ func (a *RuncAdapter) CreateJobPrerequisites(
continue
}

if vol.Shared {
if err := a.makeShared(vol); err != nil {
return nil, nil, err
}
}

_, err = os.Stat(vol.Path)
if os.IsNotExist(err) {
dirsToCreate = append(dirsToCreate, vol.Path)
Expand Down Expand Up @@ -118,6 +135,23 @@ func (a *RuncAdapter) CreateJobPrerequisites(
return createLogFiles(bpmCfg, user)
}

func (a *RuncAdapter) makeShared(volume config.Volume) error {
held, err := a.locker.LockVolume(volume.Path)
if err != nil {
return err
}

if err := a.shareMount(volume.Path); err != nil {
return err
}

if err := held.Unlock(); err != nil {
return err
}

return nil
}

func createDirs(dirs []string, user specs.User) error {
for _, dir := range dirs {
err := createDirFor(dir, int(user.UID), int(user.GID))
Expand Down
93 changes: 90 additions & 3 deletions src/bpm/runc/adapter/adapter_test.go
Expand Up @@ -35,6 +35,7 @@ import (

"bpm/bosh"
"bpm/config"
"bpm/hostlock"
"bpm/runc/specbuilder"
"bpm/sysfeat"
)
Expand All @@ -52,6 +53,9 @@ var _ = Describe("RuncAdapter", func() {
bpmCfg *config.BPMConfig
procCfg *config.ProcessConfig
logger *lagertest.TestLogger

mountSharer *fakeMountSharer
volumeLocker *fakeVolumeLocker
)

BeforeEach(func() {
Expand All @@ -76,6 +80,9 @@ var _ = Describe("RuncAdapter", func() {
}

Expect(os.MkdirAll(filepath.Join(systemRoot, "store"), 0700)).To(Succeed())

mountSharer = &fakeMountSharer{}
volumeLocker = &fakeVolumeLocker{}
})

JustBeforeEach(func() {
Expand All @@ -84,7 +91,7 @@ var _ = Describe("RuncAdapter", func() {
identityGlob := func(pattern string) ([]string, error) {
return []string{pattern}, nil
}
runcAdapter = NewRuncAdapter(features, identityGlob)
runcAdapter = NewRuncAdapter(features, identityGlob, mountSharer.MakeShared, volumeLocker)
})

AfterEach(func() {
Expand Down Expand Up @@ -212,6 +219,54 @@ var _ = Describe("RuncAdapter", func() {
})
})

Context("when a volume should be shared", func() {
var sharedPath string

BeforeEach(func() {
sharedPath = filepath.Join(systemRoot, "share", "me")
procCfg.AdditionalVolumes = append(procCfg.AdditionalVolumes, config.Volume{
Path: sharedPath,
Shared: true,
})
})

It("makes the directory shared after", func() {
_, _, err := runcAdapter.CreateJobPrerequisites(bpmCfg, procCfg, user)
Expect(err).NotTo(HaveOccurred())

Expect(mountSharer.sharedMounts).To(ConsistOf(sharedPath))
})

It("locks the volume while it's creating it and making it shared", func() {
_, _, err := runcAdapter.CreateJobPrerequisites(bpmCfg, procCfg, user)
Expect(err).NotTo(HaveOccurred())

Expect(volumeLocker.lockedPaths).To(ConsistOf(sharedPath))
})

Context("when the mount sharing fails", func() {
BeforeEach(func() {
mountSharer.err = errors.New("disaster")
})

It("returns an error", func() {
_, _, err := runcAdapter.CreateJobPrerequisites(bpmCfg, procCfg, user)
Expect(err).To(HaveOccurred())
})
})

Context("when the locking fails", func() {
BeforeEach(func() {
volumeLocker.err = errors.New("disaster")
})

It("returns an error", func() {
_, _, err := runcAdapter.CreateJobPrerequisites(bpmCfg, procCfg, user)
Expect(err).To(HaveOccurred())
})
})
})

Context("when the user requests a persistent disk", func() {
BeforeEach(func() {
procCfg.PersistentDisk = true
Expand Down Expand Up @@ -833,7 +888,7 @@ var _ = Describe("RuncAdapter", func() {
return []string{pattern}, nil
}
}
runcAdapter = NewRuncAdapter(features, fakeGlob)
runcAdapter = NewRuncAdapter(features, fakeGlob, mountSharer.MakeShared, volumeLocker)
})

It("adds volumes for whatever the volume matches", func() {
Expand All @@ -859,7 +914,7 @@ var _ = Describe("RuncAdapter", func() {
fail := func(path string) ([]string, error) {
return nil, errors.New("doomed from the start")
}
runcAdapter = NewRuncAdapter(features, fail)
runcAdapter = NewRuncAdapter(features, fail, mountSharer.MakeShared, volumeLocker)
})

It("returns an error", func() {
Expand Down Expand Up @@ -923,3 +978,35 @@ func (matcher *beMountMatcher) FailureMessage(actual interface{}) (message strin
func (matcher *beMountMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("Expected\n\t%#v\nnot to be the same mount as\n\t%#v", actual, matcher.expected)
}

type fakeLock struct{}

func (l *fakeLock) Unlock() error {
return nil
}

type fakeVolumeLocker struct {
lockedPaths []string
err error
}

func (l *fakeVolumeLocker) LockVolume(path string) (hostlock.LockedLock, error) {
if l.err != nil {
return nil, l.err
}
l.lockedPaths = append(l.lockedPaths, path)
return &fakeLock{}, nil
}

type fakeMountSharer struct {
sharedMounts []string
err error
}

func (ms *fakeMountSharer) MakeShared(path string) error {
if ms.err != nil {
return ms.err
}
ms.sharedMounts = append(ms.sharedMounts, path)
return nil
}

0 comments on commit 8140ecb

Please sign in to comment.