Skip to content
Permalink
Browse files
JCLOUDS-457: BlobStore MultiPartUpload strategy
The code related to the MultiPartUpload strategy has been added.
MultiPart uploads use an upload strategy (e.g. sequential vs parallel)
and also a slicing strategy to split the payload in different parts.
  • Loading branch information
rcoedo authored and gaul committed Jul 20, 2014
1 parent f7d2319 commit 244f50edc251153f4233125498d4c3ca06193354
Showing 9 changed files with 517 additions and 2 deletions.
@@ -35,19 +35,24 @@
import org.jclouds.crypto.Crypto;
import org.jclouds.domain.Location;
import org.jclouds.glacier.GlacierClient;
import org.jclouds.glacier.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.javax.annotation.Nullable;

import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.google.inject.Provider;

public class GlacierBlobStore extends BaseBlobStore {
private final GlacierClient sync;
private final Crypto crypto;
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;

@Inject
GlacierBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
@Memoized Supplier<Set<? extends Location>> locations, GlacierClient sync, Crypto crypto) {
@Memoized Supplier<Set<? extends Location>> locations, GlacierClient sync, Crypto crypto,
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
super(context, blobUtils, defaultLocation, locations);
this.multipartUploadStrategy = checkNotNull(multipartUploadStrategy, "multipartUploadStrategy");
this.sync = checkNotNull(sync, "sync");
this.crypto = checkNotNull(crypto, "crypto");
}
@@ -95,7 +100,10 @@ public String putBlob(String container, Blob blob) {

@Override
public String putBlob(String container, Blob blob, PutOptions options) {
throw new UnsupportedOperationException();
if (options.isMultipart()) {
return multipartUploadStrategy.get().execute(container, blob);
}
return putBlob(container, blob);
}

@Override
@@ -21,6 +21,10 @@
import org.jclouds.blobstore.attr.ConsistencyModel;
import org.jclouds.glacier.blobstore.GlacierAsyncBlobStore;
import org.jclouds.glacier.blobstore.GlacierBlobStore;
import org.jclouds.glacier.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.glacier.blobstore.strategy.SlicingStrategy;
import org.jclouds.glacier.blobstore.strategy.internal.BaseSlicingStrategy;
import org.jclouds.glacier.blobstore.strategy.internal.SequentialMultipartUploadStrategy;

import com.google.inject.AbstractModule;

@@ -30,5 +34,7 @@ protected void configure() {
bind(ConsistencyModel.class).toInstance(ConsistencyModel.EVENTUAL);
bind(BlobStore.class).to(GlacierBlobStore.class);
bind(AsyncBlobStore.class).to(GlacierAsyncBlobStore.class);
bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class);
bind(SlicingStrategy.class).to(BaseSlicingStrategy.class);
}
}
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy;

import org.jclouds.blobstore.domain.Blob;

public interface MultipartUploadStrategy {
String execute(String container, Blob blob);
}
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import org.jclouds.glacier.util.ContentRange;
import org.jclouds.io.Payload;

public class PayloadSlice {
private final Payload payload;
private final ContentRange range;
private final int part;

public PayloadSlice(Payload payload, ContentRange range, int part) {
this.payload = checkNotNull(payload, "payload");
this.range = checkNotNull(range, "range");
checkArgument(part >= 0, "The part number cannot be negative");
this.part = part;
}

public Payload getPayload() {
return payload;
}

public ContentRange getRange() {
return range;
}

public int getPart() {
return part;
}
}
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy;

import org.jclouds.io.Payload;

public interface SlicingStrategy {
public static final int MAX_LIST_PARTS_RETURNED = 1000;
public static final int MAX_LIST_MPU_RETURNED = 1000;
public static final int MAX_NUMBER_OF_PARTS = 10000;

public static final long MIN_PART_SIZE = 1L << 20; //1 MB, last part can be < 1 MB
public static final long MAX_PART_SIZE = 1L << 32; //4 GB

void startSlicing(Payload payload);
PayloadSlice nextSlice();
boolean hasNext();
long getPartSizeInMB();
}
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy.internal;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.Math.sqrt;

import org.jclouds.glacier.blobstore.strategy.PayloadSlice;
import org.jclouds.glacier.blobstore.strategy.SlicingStrategy;
import org.jclouds.glacier.util.ContentRange;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;

import com.google.inject.Inject;
import com.google.inject.name.Named;

public class BaseSlicingStrategy implements SlicingStrategy {

public static final double DEFAULT_RATIO = 0.32; // (part size/number of parts) ratio

@Inject(optional = true)
@Named("jclouds.mpu.part.ratio")
private final double ratio = DEFAULT_RATIO;

private final PayloadSlicer slicer;
private Payload payload;
private volatile long partSizeInMB;
private volatile long total;
private volatile long copied;
private volatile int part;

@Inject
public BaseSlicingStrategy(PayloadSlicer slicer) {
this.slicer = checkNotNull(slicer, "slicer");
this.total = 0;
this.copied = 0;
this.partSizeInMB = 0;
this.part = 0;
}

protected long calculatePartSize(long length) {
long lengthInMB = (long) (length / (1L << 20)) + 1;
double fpPartSizeInMB = sqrt(ratio * lengthInMB); //Get the part size which matches the given ratio
long partSizeInMB = Long.highestOneBit((long) fpPartSizeInMB - 1) << 1;
if (partSizeInMB < 1) return 1;
else if (partSizeInMB > MAX_PART_SIZE) return MAX_PART_SIZE;
return partSizeInMB;
}

public long getRemaining() {
return total - copied;
}

@Override
public void startSlicing(Payload payload) {
this.payload = checkNotNull(payload, "payload");
this.copied = 0;
this.total = checkNotNull(payload.getContentMetadata().getContentLength(), "contentLength");
this.partSizeInMB = calculatePartSize(total);
this.part = 0;
}

@Override
public PayloadSlice nextSlice() {
checkNotNull(this.payload, "payload");
long sliceLength = Math.min(getRemaining(), partSizeInMB << 20);
Payload slicedPayload = slicer.slice(payload, copied, sliceLength);
ContentRange range = ContentRange.build(copied, copied + sliceLength - 1);
copied += sliceLength;
part++;
return new PayloadSlice(slicedPayload, range, part);
}

@Override
public boolean hasNext() {
return this.getRemaining() != 0;
}

@Override
public long getPartSizeInMB() {
return partSizeInMB;
}
}
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.glacier.blobstore.strategy.internal;

import static com.google.common.base.Preconditions.checkNotNull;

import org.jclouds.blobstore.domain.Blob;
import org.jclouds.glacier.GlacierClient;
import org.jclouds.glacier.blobstore.strategy.MultipartUploadStrategy;
import org.jclouds.glacier.blobstore.strategy.PayloadSlice;
import org.jclouds.glacier.blobstore.strategy.SlicingStrategy;

import com.google.common.collect.ImmutableMap;
import com.google.common.hash.HashCode;
import com.google.inject.Inject;

public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy {
private final GlacierClient client;
private final SlicingStrategy slicer;

@Inject
public SequentialMultipartUploadStrategy(GlacierClient client, SlicingStrategy slicer) {
this.client = checkNotNull(client, "client");
this.slicer = checkNotNull(slicer, "slicer");
}

@Override
public String execute(String container, Blob blob) {
slicer.startSlicing(blob.getPayload());
String uploadId = client.initiateMultipartUpload(container, slicer.getPartSizeInMB(),
blob.getMetadata().getName());
try {
ImmutableMap.Builder<Integer, HashCode> hashes = ImmutableMap.builder();
while (slicer.hasNext()) {
PayloadSlice slice = slicer.nextSlice();
hashes.put(slice.getPart(),
client.uploadPart(container, uploadId, slice.getRange(), slice.getPayload()));
}
return client.completeMultipartUpload(container, uploadId, hashes.build(),
blob.getPayload().getContentMetadata().getContentLength());
} catch (RuntimeException exception) {
client.abortMultipartUpload(container, uploadId);
throw exception;
}
}
}

0 comments on commit 244f50e

Please sign in to comment.