Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix problem #47

Merged
merged 3 commits into from
Apr 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/s3driver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/*

# install rclone
ARG RCLONE_VERSION=v1.47.0
ARG RCLONE_VERSION=v1.54.1
RUN cd /tmp \
&& curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
&& unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
Expand Down
2 changes: 1 addition & 1 deletion cmd/s3driver/Dockerfile.full
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/*

# install rclone
ARG RCLONE_VERSION=v1.47.0
ARG RCLONE_VERSION=v1.54.1
RUN cd /tmp \
&& curl -O https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-amd64.zip \
&& unzip /tmp/rclone-${RCLONE_VERSION}-linux-amd64.zip \
Expand Down
3 changes: 0 additions & 3 deletions deploy/kubernetes/attacher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ metadata:
name: csi-attacher-s3
namespace: kube-system
spec:
selector:
matchLabels:
app: "csi-attacher-s3"
serviceName: "csi-attacher-s3"
replicas: 1
selector:
Expand Down
3 changes: 1 addition & 2 deletions deploy/kubernetes/csi-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ spec:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: ctrox/csi-s3:v1.2.0-rc.1
imagePullPolicy: Never
imagePullPolicy: "Always"
args:
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(NODE_ID)"
Expand All @@ -94,7 +94,6 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
imagePullPolicy: "Always"
volumeMounts:
- name: plugin-dir
mountPath: /csi
Expand Down
3 changes: 0 additions & 3 deletions deploy/kubernetes/provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ metadata:
name: csi-provisioner-s3
namespace: kube-system
spec:
selector:
matchLabels:
app: "csi-provisioner-s3"
serviceName: "csi-provisioner-s3"
replicas: 1
selector:
Expand Down
27 changes: 18 additions & 9 deletions pkg/driver/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeID := req.GetVolumeId()
bucketName, prefix := volumeIDToBucketPrefix(volumeID)
var meta *s3.FSMeta

// Check arguments
if len(volumeID) == 0 {
Expand All @@ -146,25 +147,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}

if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
if meta, err = client.GetFSMeta(bucketName, prefix); err != nil {
glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID)
return &csi.DeleteVolumeResponse{}, nil
}

var deleteErr error
if prefix == "" {
// prefix is empty, we delete the whole bucket
if err := client.RemoveBucket(bucketName); err != nil {
glog.V(3).Infof("Failed to remove volume %s: %v", volumeID, err)
return nil, err
deleteErr = err
}
glog.V(4).Infof("Bucket %s removed", bucketName)
} else {
if err := client.RemovePrefix(bucketName, prefix); err != nil {
return nil, fmt.Errorf("unable to remove prefix: %w", err)
deleteErr = fmt.Errorf("unable to remove prefix: %w", err)
}
glog.V(4).Infof("Prefix %s removed", prefix)
}

if deleteErr != nil {
glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume")
if err := client.SetFSMeta(meta); err != nil {
glog.Error(err)
}
return nil, deleteErr
}

return &csi.DeleteVolumeResponse{}, nil
}

Expand All @@ -178,11 +187,11 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
}
bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId())

s3, err := s3.NewClientFromSecret(req.GetSecrets())
client, err := s3.NewClientFromSecret(req.GetSecrets())
if err != nil {
return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
}
exists, err := s3.BucketExists(bucketName)
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, err
}
Expand All @@ -192,7 +201,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId()))
}

if _, err := s3.GetFSMeta(bucketName, prefix); err != nil {
if _, err := client.GetFSMeta(bucketName, prefix); err != nil {
// return an error if the fsmeta of the requested volume does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId()))
}
Expand All @@ -202,8 +211,8 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
}

for _, cap := range req.VolumeCapabilities {
if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
for _, capability := range req.VolumeCapabilities {
if capability.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
}
}
Expand Down
90 changes: 79 additions & 11 deletions pkg/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,35 @@ func (client *s3Client) CreatePrefix(bucketName string, prefix string) error {
}

func (client *s3Client) RemovePrefix(bucketName string, prefix string) error {
if err := client.removeObjects(bucketName, prefix); err != nil {
return err
var err error

if err = client.removeObjects(bucketName, prefix); err == nil {
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}

glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err)

if err = client.removeObjectsOneByOne(bucketName, prefix); err == nil {
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})
}
return client.minio.RemoveObject(client.ctx, bucketName, prefix, minio.RemoveObjectOptions{})

return err
}

func (client *s3Client) RemoveBucket(bucketName string) error {
if err := client.removeObjects(bucketName, ""); err != nil {
return err
var err error

if err = client.removeObjects(bucketName, ""); err == nil {
return client.minio.RemoveBucket(client.ctx, bucketName)
}

glog.Warningf("removeObjects failed with: %s, will try removeObjectsOneByOne", err)

if err = client.removeObjectsOneByOne(bucketName, ""); err == nil {
return client.minio.RemoveBucket(client.ctx, bucketName)
}
return client.minio.RemoveBucket(client.ctx, bucketName)

return err
}

func (client *s3Client) removeObjects(bucketName, prefix string) error {
Expand All @@ -114,10 +132,6 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
go func() {
defer close(objectsCh)

doneCh := make(chan struct{})

defer close(doneCh)

for object := range client.minio.ListObjects(
client.ctx,
bucketName,
Expand All @@ -141,17 +155,71 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
GovernanceBypass: true,
}
errorCh := client.minio.RemoveObjects(client.ctx, bucketName, objectsCh, opts)
haveErrWhenRemoveObjects := false
for e := range errorCh {
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
haveErrWhenRemoveObjects = true
}
if len(errorCh) != 0 {
if haveErrWhenRemoveObjects {
return fmt.Errorf("Failed to remove all objects of bucket %s", bucketName)
}
}

return nil
}

// will delete files one by one without file lock
func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
objectsCh := make(chan minio.ObjectInfo, 1)
removeErrCh := make(chan minio.RemoveObjectError, 1)
var listErr error

go func() {
defer close(objectsCh)

for object := range client.minio.ListObjects(client.ctx, bucketName,
minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
if object.Err != nil {
listErr = object.Err
return
}
objectsCh <- object
}
}()

if listErr != nil {
glog.Error("Error listing objects", listErr)
return listErr
}

go func() {
defer close(removeErrCh)

for object := range objectsCh {
err := client.minio.RemoveObject(client.ctx, bucketName, object.Key,
minio.RemoveObjectOptions{VersionID: object.VersionID})
if err != nil {
removeErrCh <- minio.RemoveObjectError{
ObjectName: object.Key,
VersionID: object.VersionID,
Err: err,
}
}
}
}()

haveErrWhenRemoveObjects := false
for e := range removeErrCh {
glog.Errorf("Failed to remove object %s, error: %s", e.ObjectName, e.Err)
haveErrWhenRemoveObjects = true
}
if haveErrWhenRemoveObjects {
return fmt.Errorf("Failed to remove all objects of path %s", bucketName)
}

return nil
}

func (client *s3Client) SetFSMeta(meta *FSMeta) error {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(meta)
Expand Down