From 7b5a3f6f4d5f5cb27cbed81cdb0fae045ab45baf Mon Sep 17 00:00:00 2001 From: boxjan Date: Thu, 8 Apr 2021 00:06:56 +0800 Subject: [PATCH 1/3] fix problem * rclone use 1.54.1 * add EnsureFSMetaExist func * fix removeObjects have err check * add removeObjectsOneByOne, when removeObjects failed, will try to use it * fix repeat in yaml --- cmd/s3driver/Dockerfile | 2 +- cmd/s3driver/Dockerfile.full | 2 +- deploy/kubernetes/attacher.yaml | 3 - deploy/kubernetes/csi-s3.yaml | 3 +- deploy/kubernetes/provisioner.yaml | 3 - pkg/driver/controllerserver.go | 27 +++++--- pkg/s3/client.go | 106 +++++++++++++++++++++++++---- 7 files changed, 115 insertions(+), 31 deletions(-) diff --git a/cmd/s3driver/Dockerfile b/cmd/s3driver/Dockerfile index 87fea7f..ec9e2ea 100644 --- a/cmd/s3driver/Dockerfile +++ b/cmd/s3driver/Dockerfile @@ -17,7 +17,7 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* # install rclone -ARG RCLONE_VERSION=v1.47.0 +ARG RCLONE_VERSION=v1.54.1 RUN cd /tmp \ && curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \ && unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \ diff --git a/cmd/s3driver/Dockerfile.full b/cmd/s3driver/Dockerfile.full index a8da20d..7dae6ab 100644 --- a/cmd/s3driver/Dockerfile.full +++ b/cmd/s3driver/Dockerfile.full @@ -45,7 +45,7 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* # install rclone -ARG RCLONE_VERSION=v1.47.0 +ARG RCLONE_VERSION=v1.54.1 RUN cd /tmp \ && curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \ && unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \ diff --git a/deploy/kubernetes/attacher.yaml b/deploy/kubernetes/attacher.yaml index bccc7f8..e587d21 100644 --- a/deploy/kubernetes/attacher.yaml +++ b/deploy/kubernetes/attacher.yaml @@ -59,9 +59,6 @@ metadata: name: csi-attacher-s3 namespace: kube-system spec: - selector: - matchLabels: - app: "csi-attacher-s3" serviceName: "csi-attacher-s3" replicas: 1 selector: diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml index 8f2b376..1482cac 100644 --- a/deploy/kubernetes/csi-s3.yaml +++ b/deploy/kubernetes/csi-s3.yaml @@ -82,7 +82,7 @@ spec: add: ["SYS_ADMIN"] allowPrivilegeEscalation: true image: ctrox/csi-s3:v1.2.0-rc.1 - imagePullPolicy: Never + imagePullPolicy: "Always" args: - "--endpoint=$(CSI_ENDPOINT)" - "--nodeid=$(NODE_ID)" @@ -94,7 +94,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - imagePullPolicy: "Always" volumeMounts: - name: plugin-dir mountPath: /csi diff --git a/deploy/kubernetes/provisioner.yaml b/deploy/kubernetes/provisioner.yaml index b543458..40c9e71 100644 --- a/deploy/kubernetes/provisioner.yaml +++ b/deploy/kubernetes/provisioner.yaml @@ -58,9 +58,6 @@ metadata: name: csi-provisioner-s3 namespace: kube-system spec: - selector: - matchLabels: - app: "csi-provisioner-s3" serviceName: "csi-provisioner-s3" replicas: 1 selector: diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index a93160c..19a3f30 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -129,6 +129,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeID := req.GetVolumeId() bucketName, prefix := volumeIDToBucketPrefix(volumeID) + var meta *s3.FSMeta // Check arguments if len(volumeID) == 0 { @@ -146,25 +147,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - if _, err := client.GetFSMeta(bucketName, prefix); err != nil { + if meta, err = client.GetFSMeta(bucketName, prefix); err != nil { glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID) return &csi.DeleteVolumeResponse{}, nil } + var deleteErr error if prefix == "" { // prefix is empty, we delete the whole bucket if err := client.RemoveBucket(bucketName); err != nil { - glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err) - return nil, err + deleteErr = err } glog.V(4).Infof("Bucket %s removed", bucketName) } else { if err := client.RemovePrefix(bucketName, prefix); err != nil { - return nil, fmt.Errorf("unable to remove prefix: %w", err) + deleteErr = fmt.Errorf("unable to remove prefix: %w", err) } glog.V(4).Infof("Prefix %s removed", prefix) } + if deleteErr != nil { + glog.Warning("remove volume failed, will ensure fsmeta exist, or may will lost control") + if err := client.EnsureFSMetaExist(meta, bucketName, prefix); err != nil { + glog.Error(err) + } + return nil, deleteErr + } + return &csi.DeleteVolumeResponse{}, nil } @@ -178,11 +187,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req } bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId()) - s3, err := s3.NewClientFromSecret(req.GetSecrets()) + client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - exists, err := s3.BucketExists(bucketName) + exists, err := client.BucketExists(bucketName) if err != nil { return nil, err } @@ -192,7 +201,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId())) } - if _, err := s3.GetFSMeta(bucketName, prefix); err != nil { + if _, err := client.GetFSMeta(bucketName, prefix); err != nil { // return an error if the fsmeta of the requested volume does not exist return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId())) } @@ -202,8 +211,8 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, } - for _, cap := range req.VolumeCapabilities { - if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() { + for _, capability := range req.VolumeCapabilities { + if capability.GetAccessMode().GetMode() != supportedAccessMode.GetMode() { return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil } } diff --git a/pkg/s3/client.go b/pkg/s3/client.go index 6e7dadb..a6a0302 100644 --- a/pkg/s3/client.go +++ b/pkg/s3/client.go @@ -94,17 +94,35 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error { } func (client *s3Client) RemovePrefix(bucketName string, prefix string) error { - if err := client.removeObjects(bucketName, prefix); err != nil { - return err + var err error + + if err = client.removeObjects(bucketName, prefix); err == nil { + return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) } - return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) + + glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne") + + if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil { + return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) + } + + return err } func (client *s3Client) RemoveBucket(bucketName string) error { - if err := client.removeObjects(bucketName, ""); err != nil { - return err + var err error + + if err = client.removeObjects(bucketName, ""); err == nil { + return client.minio.RemoveBucket(client.ctx, bucketName) } - return client.minio.RemoveBucket(client.ctx, bucketName) + + glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne") + + if err = client.removeObjectsOneByOne(bucketName, ""); err == nil { + return client.minio.RemoveBucket(client.ctx, bucketName) + } + + return err } func (client *s3Client) removeObjects(bucketName, prefix string) error { @@ -114,10 +132,6 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { go func() { defer close(objectsCh) - doneCh := make(chan struct{}) - - defer close(doneCh) - for object := range client.minio.ListObjects( client.ctx, bucketName, @@ -141,10 +155,14 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { GovernanceBypass: true, } errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts) + haveErrWhenRemoveObjects := false for e := range errorCh { - glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) + if e.Err.Error() != "EOF" { + glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) + haveErrWhenRemoveObjects = true + } } - if len(errorCh) != 0 { + if haveErrWhenRemoveObjects { return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName) } } @@ -152,6 +170,60 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { return nil } +// will delete files one by one without file lock +func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { + objectsCh := make(chan minio.ObjectInfo, 1) + removeErrCh := make(chan minio.RemoveObjectError, 1) + var listErr error + + go func() { + defer close(objectsCh) + + for object := range client.minio.ListObjects(client.ctx, bucketName, + minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) { + if object.Err != nil { + listErr = object.Err + return + } + objectsCh <- object + } + }() + + if listErr != nil { + glog.Error("Error listing objects", listErr) + return listErr + } + + go func() { + defer close(removeErrCh) + + for object := range objectsCh { + err := client.minio.RemoveObject(client.ctx, bucketName, object.Key, + minio.RemoveObjectOptions{VersionID: object.VersionID}) + if err != nil { + removeErrCh <- minio.RemoveObjectError{ + ObjectName: object.Key, + VersionID: object.VersionID, + Err: err, + } + } + } + }() + + haveErrWhenRemoveObjects := false + for e := range removeErrCh { + if e.Err.Error() != "EOF" { + glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) + haveErrWhenRemoveObjects = true + } + } + if haveErrWhenRemoveObjects { + return fmt.Errorf("Failed to remove all objects of path %s", bucketName) + } + + return nil +} + func (client *s3Client) SetFSMeta(meta *FSMeta) error { b := new(bytes.Buffer) json.NewEncoder(b).Encode(meta) @@ -182,3 +254,13 @@ func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) { err = json.Unmarshal(b, &meta) return &meta, err } + +func (client *s3Client) EnsureFSMetaExist(meta *FSMeta, bucketName, prefix string) error { + if _, err := client.GetFSMeta(bucketName, prefix); err != nil { + glog.Warningf("%s/%s get meta failed with: %s, will set meta of it", bucketName, prefix, err) + if err := client.SetFSMeta(meta); err != nil { + return fmt.Errorf("%s/%s get meta failed with: %s, may will lost control of it", bucketName, prefix, err) + } + } + return nil +} From fa9ef783623cc6c63754b3c4464d031f05f55ef8 Mon Sep 17 00:00:00 2001 From: boxjan Date: Thu, 8 Apr 2021 22:35:31 +0800 Subject: [PATCH 2/3] fix problem: log format; fs meta; --- pkg/driver/controllerserver.go | 4 ++-- pkg/s3/client.go | 14 ++------------ 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index 19a3f30..319c444 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -167,8 +167,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } if deleteErr != nil { - glog.Warning("remove volume failed, will ensure fsmeta exist, or may will lost control") - if err := client.EnsureFSMetaExist(meta, bucketName, prefix); err != nil { + glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume") + if err := client.SetFSMeta(meta); err != nil { glog.Error(err) } return nil, deleteErr diff --git a/pkg/s3/client.go b/pkg/s3/client.go index a6a0302..5d06196 100644 --- a/pkg/s3/client.go +++ b/pkg/s3/client.go @@ -100,7 +100,7 @@ func (client *s3Client) RemovePrefix(bucketName string, prefix string) error { return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) } - glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne") + glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err) if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil { return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) @@ -116,7 +116,7 @@ func (client *s3Client) RemoveBucket(bucketName string) error { return client.minio.RemoveBucket(client.ctx, bucketName) } - glog.Warning("removeObjects failed with: ", err, ", will try removeObjectsOneByOne") + glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err) if err = client.removeObjectsOneByOne(bucketName, ""); err == nil { return client.minio.RemoveBucket(client.ctx, bucketName) @@ -254,13 +254,3 @@ func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) { err = json.Unmarshal(b, &meta) return &meta, err } - -func (client *s3Client) EnsureFSMetaExist(meta *FSMeta, bucketName, prefix string) error { - if _, err := client.GetFSMeta(bucketName, prefix); err != nil { - glog.Warningf("%s/%s get meta failed with: %s, will set meta of it", bucketName, prefix, err) - if err := client.SetFSMeta(meta); err != nil { - return fmt.Errorf("%s/%s get meta failed with: %s, may will lost control of it", bucketName, prefix, err) - } - } - return nil -} From 8449c1e2e91562fbadbc15f087ed1cfc7fa4c2a8 Mon Sep 17 00:00:00 2001 From: boxjan Date: Fri, 9 Apr 2021 21:45:52 +0800 Subject: [PATCH 3/3] Although get a EOF error, we should still know it --- pkg/s3/client.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pkg/s3/client.go b/pkg/s3/client.go index 5d06196..87d5df1 100644 --- a/pkg/s3/client.go +++ b/pkg/s3/client.go @@ -157,10 +157,8 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts) haveErrWhenRemoveObjects := false for e := range errorCh { - if e.Err.Error() != "EOF" { - glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) - haveErrWhenRemoveObjects = true - } + glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) + haveErrWhenRemoveObjects = true } if haveErrWhenRemoveObjects { return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName) @@ -212,10 +210,8 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { haveErrWhenRemoveObjects := false for e := range removeErrCh { - if e.Err.Error() != "EOF" { - glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) - haveErrWhenRemoveObjects = true - } + glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) + haveErrWhenRemoveObjects = true } if haveErrWhenRemoveObjects { return fmt.Errorf("Failed to remove all objects of path %s", bucketName)