Skip to content

Commit

Permalink
Merge pull request #47 from Boxjan/master
Browse files Browse the repository at this point in the history
* use rclone 1.54.1
* if removeBucket or removePrefix failed, fsmeta is not deleted
* fix removeObjects error check
* add removeObjectsOneByOne as a fallback when removeObjects fails
* fix repeat in yaml
  • Loading branch information
ctrox committed Apr 10, 2021
2 parents ebffbeb + 8449c1e commit 0433ead
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 30 deletions.
2 changes: 1 addition & 1 deletion cmd/s3driver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/*

# install rclone
ARG RCLONE_VERSION=v1.47.0
ARG RCLONE_VERSION=v1.54.1
RUN cd /tmp \
&& curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
&& unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
Expand Down
2 changes: 1 addition & 1 deletion cmd/s3driver/Dockerfile.full
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/*

# install rclone
ARG RCLONE_VERSION=v1.47.0
ARG RCLONE_VERSION=v1.54.1
RUN cd /tmp \
&& curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
&& unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
Expand Down
3 changes: 0 additions & 3 deletions deploy/kubernetes/attacher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ metadata:
name: csi-attacher-s3
namespace: kube-system
spec:
selector:
matchLabels:
app: "csi-attacher-s3"
serviceName: "csi-attacher-s3"
replicas: 1
selector:
Expand Down
3 changes: 1 addition & 2 deletions deploy/kubernetes/csi-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ spec:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: ctrox/csi-s3:v1.2.0-rc.1
imagePullPolicy: Never
imagePullPolicy: "Always"
args:
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(NODE_ID)"
Expand All @@ -94,7 +94,6 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
imagePullPolicy: "Always"
volumeMounts:
- name: plugin-dir
mountPath: /csi
Expand Down
3 changes: 0 additions & 3 deletions deploy/kubernetes/provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ metadata:
name: csi-provisioner-s3
namespace: kube-system
spec:
selector:
matchLabels:
app: "csi-provisioner-s3"
serviceName: "csi-provisioner-s3"
replicas: 1
selector:
Expand Down
27 changes: 18 additions & 9 deletions pkg/driver/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
var meta *s3.FSMeta

// Check arguments
if len(volumeID) == 0 {
Expand All @@ -146,25 +147,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}

if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
if meta, err = client.GetFSMeta(bucketName, prefix); err != nil {
glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
return &csi.DeleteVolumeResponse{}, nil
}

var deleteErr error
if prefix == "" {
// prefix is empty, we delete the whole bucket
if err := client.RemoveBucket(bucketName); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err
deleteErr = err
}
glog.V(4).Infof("Bucket %s removed", bucketName)
} else {
if err := client.RemovePrefix(bucketName, prefix); err != nil {
return nil, fmt.Errorf("unable to remove prefix: %w", err)
deleteErr = fmt.Errorf("unable to remove prefix: %w", err)
}
glog.V(4).Infof("Prefix %s removed", prefix)
}

if deleteErr != nil {
glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume")
if err := client.SetFSMeta(meta); err != nil {
glog.Error(err)
}
return nil, deleteErr
}

return &csi.DeleteVolumeResponse{}, nil
}

Expand All @@ -178,11 +187,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
}
bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())

s3, err := s3.NewClientFromSecret(req.GetSecrets())
client, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.BucketExists(bucketName)
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, err
}
Expand All @@ -192,7 +201,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
}

if _, err := s3.GetFSMeta(bucketName, prefix); err != nil {
if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
// return an error if the fsmeta of the requested volume does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
}
Expand All @@ -202,8 +211,8 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
}

for _, cap := range req.VolumeCapabilities {
if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
for _, capability := range req.VolumeCapabilities {
if capability.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
}
}
Expand Down
90 changes: 79 additions & 11 deletions pkg/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,35 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
}

func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
if err := client.removeObjects(bucketName, prefix); err != nil {
return err
var err error

if err = client.removeObjects(bucketName, prefix); err == nil {
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}

glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err)

if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil {
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})

return err
}

func (client *s3Client) RemoveBucket(bucketName string) error {
if err := client.removeObjects(bucketName, ""); err != nil {
return err
var err error

if err = client.removeObjects(bucketName, ""); err == nil {
return client.minio.RemoveBucket(client.ctx, bucketName)
}

glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err)

if err = client.removeObjectsOneByOne(bucketName, ""); err == nil {
return client.minio.RemoveBucket(client.ctx, bucketName)
}
return client.minio.RemoveBucket(client.ctx, bucketName)

return err
}

func (client *s3Client) removeObjects(bucketName, prefix string) error {
Expand All @@ -114,10 +132,6 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
go func() {
defer close(objectsCh)

doneCh := make(chan struct{})

defer close(doneCh)

for object := range client.minio.ListObjects(
client.ctx,
bucketName,
Expand All @@ -141,17 +155,71 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
GovernanceBypass: true,
}
errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
haveErrWhenRemoveObjects := false
for e := range errorCh {
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
haveErrWhenRemoveObjects = true
}
if len(errorCh) != 0 {
if haveErrWhenRemoveObjects {
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
}
}

return nil
}

// will delete files one by one without file lock
func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
objectsCh := make(chan minio.ObjectInfo, 1)
removeErrCh := make(chan minio.RemoveObjectError, 1)
var listErr error

go func() {
defer close(objectsCh)

for object := range client.minio.ListObjects(client.ctx, bucketName,
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
if object.Err != nil {
listErr = object.Err
return
}
objectsCh <- object
}
}()

if listErr != nil {
glog.Error("Error listing objects", listErr)
return listErr
}

go func() {
defer close(removeErrCh)

for object := range objectsCh {
err := client.minio.RemoveObject(client.ctx, bucketName, object.Key,
minio.RemoveObjectOptions{VersionID: object.VersionID})
if err != nil {
removeErrCh <- minio.RemoveObjectError{
ObjectName: object.Key,
VersionID: object.VersionID,
Err: err,
}
}
}
}()

haveErrWhenRemoveObjects := false
for e := range removeErrCh {
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
haveErrWhenRemoveObjects = true
}
if haveErrWhenRemoveObjects {
return fmt.Errorf("Failed to remove all objects of path %s", bucketName)
}

return nil
}

func (client *s3Client) SetFSMeta(meta *FSMeta) error {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(meta)
Expand Down

0 comments on commit 0433ead

Please sign in to comment.