diff --git a/cmd/s3driver/Dockerfile b/cmd/s3driver/Dockerfile index 87fea7f..ec9e2ea 100644 --- a/cmd/s3driver/Dockerfile +++ b/cmd/s3driver/Dockerfile @@ -17,7 +17,7 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* # install rclone -ARG RCLONE_VERSION=v1.47.0 +ARG RCLONE_VERSION=v1.54.1 RUN cd /tmp \ && curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \ && unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \ diff --git a/cmd/s3driver/Dockerfile.full b/cmd/s3driver/Dockerfile.full index a8da20d..7dae6ab 100644 --- a/cmd/s3driver/Dockerfile.full +++ b/cmd/s3driver/Dockerfile.full @@ -45,7 +45,7 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* # install rclone -ARG RCLONE_VERSION=v1.47.0 +ARG RCLONE_VERSION=v1.54.1 RUN cd /tmp \ && curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \ && unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \ diff --git a/deploy/kubernetes/attacher.yaml b/deploy/kubernetes/attacher.yaml index bccc7f8..e587d21 100644 --- a/deploy/kubernetes/attacher.yaml +++ b/deploy/kubernetes/attacher.yaml @@ -59,9 +59,6 @@ metadata: name: csi-attacher-s3 namespace: kube-system spec: - selector: - matchLabels: - app: "csi-attacher-s3" serviceName: "csi-attacher-s3" replicas: 1 selector: diff --git a/deploy/kubernetes/csi-s3.yaml b/deploy/kubernetes/csi-s3.yaml index 8f2b376..1482cac 100644 --- a/deploy/kubernetes/csi-s3.yaml +++ b/deploy/kubernetes/csi-s3.yaml @@ -82,7 +82,7 @@ spec: add: ["SYS_ADMIN"] allowPrivilegeEscalation: true image: ctrox/csi-s3:v1.2.0-rc.1 - imagePullPolicy: Never + imagePullPolicy: "Always" args: - "--endpoint=$(CSI_ENDPOINT)" - "--nodeid=$(NODE_ID)" @@ -94,7 +94,6 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - imagePullPolicy: "Always" volumeMounts: - name: plugin-dir mountPath: /csi diff --git a/deploy/kubernetes/provisioner.yaml b/deploy/kubernetes/provisioner.yaml index b543458..40c9e71 100644 --- a/deploy/kubernetes/provisioner.yaml +++ b/deploy/kubernetes/provisioner.yaml @@ -58,9 +58,6 @@ metadata: name: csi-provisioner-s3 namespace: kube-system spec: - selector: - matchLabels: - app: "csi-provisioner-s3" serviceName: "csi-provisioner-s3" replicas: 1 selector: diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index a93160c..319c444 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -129,6 +129,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeID := req.GetVolumeId() bucketName, prefix := volumeIDToBucketPrefix(volumeID) + var meta *s3.FSMeta // Check arguments if len(volumeID) == 0 { @@ -146,25 +147,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - if _, err := client.GetFSMeta(bucketName, prefix); err != nil { + if meta, err = client.GetFSMeta(bucketName, prefix); err != nil { glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID) return &csi.DeleteVolumeResponse{}, nil } + var deleteErr error if prefix == "" { // prefix is empty, we delete the whole bucket if err := client.RemoveBucket(bucketName); err != nil { - glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err) - return nil, err + deleteErr = err } glog.V(4).Infof("Bucket %s removed", bucketName) } else { if err := client.RemovePrefix(bucketName, prefix); err != nil { - return nil, fmt.Errorf("unable to remove prefix: %w", err) + deleteErr = fmt.Errorf("unable to remove prefix: %w", err) } glog.V(4).Infof("Prefix %s removed", prefix) } + if deleteErr != nil { + glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume") + if err := client.SetFSMeta(meta); err != nil { + glog.Error(err) + } + return nil, deleteErr + } + return &csi.DeleteVolumeResponse{}, nil } @@ -178,11 +187,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req } bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId()) - s3, err := s3.NewClientFromSecret(req.GetSecrets()) + client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } - exists, err := s3.BucketExists(bucketName) + exists, err := client.BucketExists(bucketName) if err != nil { return nil, err } @@ -192,7 +201,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId())) } - if _, err := s3.GetFSMeta(bucketName, prefix); err != nil { + if _, err := client.GetFSMeta(bucketName, prefix); err != nil { // return an error if the fsmeta of the requested volume does not exist return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId())) } @@ -202,8 +211,8 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, } - for _, cap := range req.VolumeCapabilities { - if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() { + for _, capability := range req.VolumeCapabilities { + if capability.GetAccessMode().GetMode() != supportedAccessMode.GetMode() { return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil } } diff --git a/pkg/s3/client.go b/pkg/s3/client.go index 6e7dadb..87d5df1 100644 --- a/pkg/s3/client.go +++ b/pkg/s3/client.go @@ -94,17 +94,35 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error { } func (client *s3Client) RemovePrefix(bucketName string, prefix string) error { - if err := client.removeObjects(bucketName, prefix); err != nil { - return err + var err error + + if err = client.removeObjects(bucketName, prefix); err == nil { + return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) + } + + glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err) + + if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil { + return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) } - return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{}) + + return err } func (client *s3Client) RemoveBucket(bucketName string) error { - if err := client.removeObjects(bucketName, ""); err != nil { - return err + var err error + + if err = client.removeObjects(bucketName, ""); err == nil { + return client.minio.RemoveBucket(client.ctx, bucketName) + } + + glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err) + + if err = client.removeObjectsOneByOne(bucketName, ""); err == nil { + return client.minio.RemoveBucket(client.ctx, bucketName) } - return client.minio.RemoveBucket(client.ctx, bucketName) + + return err } func (client *s3Client) removeObjects(bucketName, prefix string) error { @@ -114,10 +132,6 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { go func() { defer close(objectsCh) - doneCh := make(chan struct{}) - - defer close(doneCh) - for object := range client.minio.ListObjects( client.ctx, bucketName, @@ -141,10 +155,12 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { GovernanceBypass: true, } errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts) + haveErrWhenRemoveObjects := false for e := range errorCh { glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) + haveErrWhenRemoveObjects = true } - if len(errorCh) != 0 { + if haveErrWhenRemoveObjects { return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName) } } @@ -152,6 +168,58 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { return nil } +// will delete files one by one without file lock +func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { + objectsCh := make(chan minio.ObjectInfo, 1) + removeErrCh := make(chan minio.RemoveObjectError, 1) + var listErr error + + go func() { + defer close(objectsCh) + + for object := range client.minio.ListObjects(client.ctx, bucketName, + minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) { + if object.Err != nil { + listErr = object.Err + return + } + objectsCh <- object + } + }() + + if listErr != nil { + glog.Error("Error listing objects", listErr) + return listErr + } + + go func() { + defer close(removeErrCh) + + for object := range objectsCh { + err := client.minio.RemoveObject(client.ctx, bucketName, object.Key, + minio.RemoveObjectOptions{VersionID: object.VersionID}) + if err != nil { + removeErrCh <- minio.RemoveObjectError{ + ObjectName: object.Key, + VersionID: object.VersionID, + Err: err, + } + } + } + }() + + haveErrWhenRemoveObjects := false + for e := range removeErrCh { + glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err) + haveErrWhenRemoveObjects = true + } + if haveErrWhenRemoveObjects { + return fmt.Errorf("Failed to remove all objects of path %s", bucketName) + } + + return nil +} + func (client *s3Client) SetFSMeta(meta *FSMeta) error { b := new(bytes.Buffer) json.NewEncoder(b).Encode(meta)