Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jjkr committed Nov 3, 2023
1 parent e26f759 commit e418072
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 27 deletions.
25 changes: 25 additions & 0 deletions charts/aws-s3-csi-driver/templates/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
hostPID: true
serviceAccountName: {{ .Values.node.serviceAccount.name }}
priorityClassName: system-node-critical
tolerations:
Expand All @@ -34,6 +35,17 @@ spec:
{{- with .Values.node.tolerations }}
{{- toYaml . | nindent 8 }}
{{- end }}
initContainers:
- name: install-mountpoint
securityContext:
privileged: true
image: {{ printf "%s%s:%s" (default "" .Values.image.containerRegistry) .Values.image.repository (default (printf "v%s" .Chart.AppVersion) (toString .Values.image.tag)) }}
imagePullPolicy: IfNotPresent
command:
- "/install-mp.sh"
volumeMounts:
- name: host-usr
mountPath: /host/usr
containers:
- name: s3-plugin
image: {{ printf "%s%s:%s" (default "" .Values.image.containerRegistry) .Values.image.repository (default (printf "v%s" .Chart.AppVersion) (toString .Values.image.tag)) }}
Expand All @@ -55,6 +67,10 @@ spec:
mountPropagation: "Bidirectional"
- name: plugin-dir
mountPath: /csi
- name: host-dbus
mountPath: /var/run/dbus
- name: host-dev
mountPath: /hostdev
ports:
- name: healthz
containerPort: 9808
Expand Down Expand Up @@ -107,6 +123,15 @@ spec:
- name: plugin-dir
mountPath: /csi
volumes:
- name: host-usr
hostPath:
path: /usr
- name: host-dev
hostPath:
path: /dev
- name: host-dbus
hostPath:
path: /var/run/dbus
- name: kubelet-dir
hostPath:
path: /var/lib/kubelet
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/base/node-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
hostPID: true
serviceAccountName: s3-csi-driver-sa
priorityClassName: system-node-critical
tolerations:
Expand Down
7 changes: 6 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
}

//Checking if the target directory is already mounted with a volume.
mounted, err := d.Mounter.IsMountPoint(target)
mounted, err := d.isMounted(target)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not check if %q is mounted: %v", target, err)
}
Expand Down Expand Up @@ -125,6 +125,11 @@ func compileMountOptions(currentOptions []string, newOptions []string) []string
}

for _, mountOption := range newOptions {
// disallow options that don't make sense in CSI
switch mountOption {
case "--foreground", "-f", "--help", "-h", "--version", "-v":
continue
}
allMountOptions.Insert(mountOption)
}

Expand Down
38 changes: 35 additions & 3 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestNodePublishVolume(t *testing.T) {
}

nodeTestEnv.mockMounter.EXPECT().MakeDir(gomock.Eq(targetPath)).Return(nil)
nodeTestEnv.mockMounter.EXPECT().IsMountPoint(gomock.Eq(targetPath)).Return(false, nil)
nodeTestEnv.mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(false, nil)
nodeTestEnv.mockMounter.EXPECT().Mount(gomock.Eq(volumeId), gomock.Eq(targetPath), gomock.Eq("unused"), gomock.Any())
_, err := nodeTestEnv.driver.NodePublishVolume(ctx, req)
if err != nil {
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestNodePublishVolume(t *testing.T) {
}

nodeTestEnv.mockMounter.EXPECT().MakeDir(gomock.Eq(targetPath)).Return(nil)
nodeTestEnv.mockMounter.EXPECT().IsMountPoint(gomock.Eq(targetPath)).Return(false, nil)
nodeTestEnv.mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(false, nil)
nodeTestEnv.mockMounter.EXPECT().Mount(gomock.Eq(volumeId), gomock.Eq(targetPath), gomock.Eq("unused"), gomock.Eq([]string{"--read-only"}))
_, err := nodeTestEnv.driver.NodePublishVolume(ctx, req)
if err != nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestNodePublishVolume(t *testing.T) {
}

nodeTestEnv.mockMounter.EXPECT().MakeDir(gomock.Eq(targetPath)).Return(nil)
nodeTestEnv.mockMounter.EXPECT().IsMountPoint(gomock.Eq(targetPath)).Return(false, nil)
nodeTestEnv.mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(false, nil)
nodeTestEnv.mockMounter.EXPECT().Mount(gomock.Eq(volumeId), gomock.Eq(targetPath), gomock.Eq("unused"), gomock.Eq([]string{"--bar", "--foo", "--read-only", "--test=123"}))
_, err := nodeTestEnv.driver.NodePublishVolume(ctx, req)
if err != nil {
Expand All @@ -134,6 +134,38 @@ func TestNodePublishVolume(t *testing.T) {
nodeTestEnv.mockCtl.Finish()
},
},
{
name: "success: foreground option is removed",
testFunc: func(t *testing.T) {
nodeTestEnv := initNodeServerTestEnv(t)
ctx := context.Background()
req := &csi.NodePublishVolumeRequest{
VolumeId: volumeId,
VolumeCapability: &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
MountFlags: []string{"--foreground", "-f", "--test 123"},
},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
TargetPath: targetPath,
Readonly: true,
}

nodeTestEnv.mockMounter.EXPECT().MakeDir(gomock.Eq(targetPath)).Return(nil)
nodeTestEnv.mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(false, nil)
nodeTestEnv.mockMounter.EXPECT().Mount(gomock.Eq(volumeId), gomock.Eq(targetPath), gomock.Eq("unused"), gomock.Eq([]string{"--test=123", "--foo"}))
_, err := nodeTestEnv.driver.NodePublishVolume(ctx, req)
if err != nil {
t.Fatalf("NodePublishVolume is failed: %v", err)
}

nodeTestEnv.mockCtl.Finish()
},
},
{
name: "fail: missing volume id",
testFunc: func(t *testing.T) {
Expand Down
48 changes: 27 additions & 21 deletions pkg/driver/systemd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

// Interface to wrap the external go-systemd dbus connection
// https://pkg.go.dev/github.com/coreos/go-systemd/dbus
type SystemdConnection interface {
Close()
SubscribeUnitsCustom(interval time.Duration, buffer int,
Expand All @@ -30,6 +31,7 @@ type SystemdConnection interface {
ResetFailedUnitContext(ctx context.Context, name string) error
}

// Factory interface for connections, needed for mocking
type SystemdConnector interface {
Connect(ctx context.Context) (SystemdConnection, error)
}
Expand All @@ -49,13 +51,15 @@ type SystemdRunner struct {
Pts Pts
}

// SystemdRunner that talks to the real system dbus
func NewSystemdRunner() SystemdRunner {
return SystemdRunner{
Connector: NewOsSystemd(),
Pts: &OsPts{},
}
}

// Run a given command in a transient systemd service. Will wait for the service to become active
func (sr *SystemdRunner) Run(ctx context.Context, cmd string, args []string) (string, error) {
systemdConn, err := sr.Connector.Connect(ctx)
if err != nil {
Expand Down Expand Up @@ -105,55 +109,57 @@ func (sr *SystemdRunner) Run(ctx context.Context, cmd string, args []string) (st
}

// Wait for systemd dbus response
resp := <-respChan
switch resp {
case "done":
// Success, don't return
case "cancelled", "timeout", "failed", "dependency", "skipped":
systemdConn.ResetFailedUnitContext(ctx, serviceName)
return "", fmt.Errorf("Failed to create systemd service %s, resp: %s", serviceName, resp)
default:
systemdConn.ResetFailedUnitContext(ctx, serviceName)
return "", fmt.Errorf("Unknown status starting systemd service %s, resp: %s", serviceName, resp)
select {
case resp := <-respChan:
switch resp {
case "done":
// Success, continue
case "cancelled", "timeout", "failed", "dependency", "skipped":
systemdConn.ResetFailedUnitContext(ctx, serviceName)
return readOutput(), fmt.Errorf("Failed to create systemd service %s, resp: %s", serviceName, resp)
default:
systemdConn.ResetFailedUnitContext(ctx, serviceName)
return readOutput(), fmt.Errorf("Unknown status starting systemd service %s, resp: %s", serviceName, resp)
}
case <-ctx.Done():
return readOutput(), fmt.Errorf("Context cancelled starting systemd service %s", serviceName)
}

starting := true
// Wait 1 second for the status to reach active. The status can briefly go inactive before becoming active
startTimeout := time.After(time.Second)
for starting {
select {
case update := <-updates:
for k, v := range update {
klog.V(5).Infof("Systemd service update [%s]: %v", k, v)
if k == serviceName {
if v == nil {
mountOutput := readOutput()
return mountOutput, fmt.Errorf("%s failed to launch, mount-s3 output: %s",
serviceName, mountOutput)
return readOutput(), fmt.Errorf("%s failed to launch", serviceName)
} else if v.ActiveState == "active" {
starting = false
}
}
}
case <-startTimeout:
return "", fmt.Errorf("%s failed to launch, mount-s3 output: %s",
serviceName, readOutput())
case <-ctx.Done():
return readOutput(), fmt.Errorf("Timed out launching %s service",
serviceName)
case err = <-errChan:
return "", fmt.Errorf("Failed to start systemd service %s err: %w, mount-s3 output: %s",
serviceName, err, readOutput())
return readOutput(), fmt.Errorf("Failed to start systemd service %s err: %w",
serviceName, err)
}
}

return readOutput(), nil
}

// Interface for creating new private terminal session. See pts(4)
// Interface for creating new private terminal session. See man pts(4)
type Pts interface {
NewPts() (io.ReadCloser, int, error)
}

// Real os implementation of the Pts interface
type OsPts struct{}

// Create a new pseduo terminal slave (pts). Returns a ReaderCloser for the master device and a pts number
func (p *OsPts) NewPts() (io.ReadCloser, int, error) {
ptsMaster, err := os.Open("/hostdev/ptmx")
if err != nil {
Expand Down
37 changes: 35 additions & 2 deletions pkg/driver/systemd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,39 @@ func TestSystemdStartUnitFailure(t *testing.T) {
}
}

func TestSystemdRunCanceledContext(t *testing.T) {
mockCtl := gomock.NewController(t)
mockConnector := mock_driver.NewMockSystemdConnector(mockCtl)
mockConnection := mock_driver.NewMockSystemdConnection(mockCtl)
mockPts := mock_driver.NewMockPts(mockCtl)
mockConnection.EXPECT().Close()
mockConnector.EXPECT().Connect(gomock.Any()).Return(mockConnection, nil)
testOutput := "testoutputdata"
mockPts.EXPECT().NewPts().Return(io.NopCloser(strings.NewReader(testOutput)), 0, nil)
ctx, cancel := context.WithCancel(context.Background())
cancel() // immediately cancel context

updates := make(chan map[string]*systemd.UnitStatus)
errChan := make(chan error)
mockConnection.EXPECT().SubscribeUnitsCustom(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(updates, errChan)

mockConnection.EXPECT().StartTransientUnitContext(
gomock.Eq(ctx), gomock.Any(), gomock.Eq("fail"), gomock.Any(), gomock.Any()).Return(0, nil)

runner := driver.SystemdRunner{
Connector: mockConnector,
Pts: mockPts,
}
out, err := runner.Run(ctx, "", nil)
if err == nil {
t.Fatalf("Expected error on connection failure")
}
if out != testOutput {
t.Fatalf("Unexpected output, expected: %s got: %s", testOutput, out)
}
}

func TestSystemdRunSuccess(t *testing.T) {
mockCtl := gomock.NewController(t)
mockConnector := mock_driver.NewMockSystemdConnector(mockCtl)
Expand All @@ -115,9 +148,9 @@ func TestSystemdRunSuccess(t *testing.T) {
ctx := context.Background()

updates := make(chan map[string]*systemd.UnitStatus)
errors := make(chan error)
errChan := make(chan error)
mockConnection.EXPECT().SubscribeUnitsCustom(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(updates, errors)
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(updates, errChan)

var startUnitResp chan<- string
var serviceName string
Expand Down

0 comments on commit e418072

Please sign in to comment.