Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18637:S3A to support upload of files greater than 2 GB using DiskBlocks #5481

Open
wants to merge 17 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public interface StreamCapabilities {
*/
String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported";

/**
HarshitGupta11 marked this conversation as resolved.
Show resolved Hide resolved
* Stream supports multipart uploads to the given path.
*/
String MULTIPART_SUPPORTED = "fs.capability.multipart.supported";

/**
* Capabilities that a stream can support and be queried for.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,4 +1255,18 @@ private Constants() {
*/
public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count";
public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8;

/**
* Option to enable or disable the multipart uploads.
HarshitGupta11 marked this conversation as resolved.
Show resolved Hide resolved
* Value: {@value}.
* <p>
* Default is {@link #MULTIPART_UPLOAD_ENABLED_DEFAULT}.
*/
public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled";
HarshitGupta11 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Default value for multipart uploads.
* {@value}
*/
public static final boolean MULTIPART_UPLOAD_ENABLED_DEFAULT = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements
private final String key;

/** Size of all blocks. */
private final int blockSize;
private final long blockSize;

/** IO Statistics. */
private final IOStatistics iostatistics;
Expand Down Expand Up @@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements
/** Thread level IOStatistics Aggregator. */
private final IOStatisticsAggregator threadIOStatisticsAggregator;

/** Is multipart upload enabled? */
private final boolean isMultipartEnabled;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand All @@ -181,7 +184,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.builder = builder;
this.key = builder.key;
this.blockFactory = builder.blockFactory;
this.blockSize = (int) builder.blockSize;
this.blockSize = builder.blockSize;
this.statistics = builder.statistics;
// test instantiations may not provide statistics;
this.iostatistics = statistics.getIOStatistics();
Expand All @@ -200,6 +203,7 @@ class S3ABlockOutputStream extends OutputStream implements
createBlockIfNeeded();
LOG.debug("Initialized S3ABlockOutputStream for {}" +
" output to {}", key, activeBlock);
this.isMultipartEnabled = builder.isMultipartEnabled;
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
Expand Down Expand Up @@ -318,7 +322,7 @@ public synchronized void write(byte[] source, int offset, int len)
statistics.writeBytes(len);
S3ADataBlocks.DataBlock block = createBlockIfNeeded();
int written = block.write(source, offset, len);
int remainingCapacity = block.remainingCapacity();
int remainingCapacity = (int) block.remainingCapacity();
if (written < len) {
// not everything was written —the block has run out
// of capacity
Expand Down Expand Up @@ -369,6 +373,8 @@ private synchronized void uploadCurrentBlock(boolean isLast)
*/
@Retries.RetryTranslated
private void initMultipartUpload() throws IOException {
Preconditions.checkState(!isMultipartEnabled,
"multipart upload is disabled");
if (multiPartUpload == null) {
LOG.debug("Initiating Multipart upload");
multiPartUpload = new MultiPartUpload(key);
Expand Down Expand Up @@ -558,19 +564,20 @@ public String toString() {
}

/**
* Upload the current block as a single PUT request; if the buffer
* is empty a 0-byte PUT will be invoked, as it is needed to create an
* entry at the far end.
* @throws IOException any problem.
* @return number of bytes uploaded. If thread was interrupted while
* waiting for upload to complete, returns zero with interrupted flag set
* on this thread.
* Upload the current block as a single PUT request; if the buffer is empty a
* 0-byte PUT will be invoked, as it is needed to create an entry at the far
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
* end.
* @return number of bytes uploaded. If thread was interrupted while waiting
* for upload to complete, returns zero with interrupted flag set on this
* thread.
* @throws IOException
* any problem.
*/
private int putObject() throws IOException {
private long putObject() throws IOException {
LOG.debug("Executing regular upload for {}", writeOperationHelper);

final S3ADataBlocks.DataBlock block = getActiveBlock();
int size = block.dataSize();
long size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(
Expand Down Expand Up @@ -683,6 +690,9 @@ public boolean hasCapability(String capability) {
case StreamCapabilities.IOSTATISTICS_CONTEXT:
return true;

case StreamCapabilities.MULTIPART_SUPPORTED:
return isMultipartEnabled;

default:
return false;
}
Expand Down Expand Up @@ -835,7 +845,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
Preconditions.checkNotNull(uploadId, "Null uploadId");
maybeRethrowUploadFailure();
partsSubmitted++;
final int size = block.dataSize();
final long size = block.dataSize();
bytesSubmitted += size;
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request;
Expand Down Expand Up @@ -1011,7 +1021,7 @@ public void progressChanged(ProgressEvent progressEvent) {
ProgressEventType eventType = progressEvent.getEventType();
long bytesTransferred = progressEvent.getBytesTransferred();

int size = block.dataSize();
long size = block.dataSize();
switch (eventType) {

case REQUEST_BYTE_TRANSFER_EVENT:
Expand Down Expand Up @@ -1126,6 +1136,11 @@ public static final class BlockOutputStreamBuilder {
*/
private IOStatisticsAggregator ioStatisticsAggregator;

/**
* Is Multipart Uploads enabled for the given upload.
*/
private boolean isMultipartEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1276,5 +1291,11 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator(
ioStatisticsAggregator = value;
return this;
}

public BlockOutputStreamBuilder withMultipartEnabled(
final boolean value) {
isMultipartEnabled = value;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ protected BlockFactory(S3AFileSystem owner) {
* @param statistics stats to work with
* @return a new block.
*/
abstract DataBlock create(long index, int limit,
abstract DataBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException;

Expand Down Expand Up @@ -258,7 +258,7 @@ final DestState getState() {
* Return the current data size.
* @return the size of the data
*/
abstract int dataSize();
abstract long dataSize();

/**
* Predicate to verify that the block has the capacity to write
Expand All @@ -280,7 +280,7 @@ boolean hasData() {
* The remaining capacity in the block before it is full.
* @return the number of bytes remaining.
*/
abstract int remainingCapacity();
abstract long remainingCapacity();

/**
* Write a series of bytes from the buffer, from the offset.
Expand Down Expand Up @@ -391,7 +391,7 @@ static class ArrayBlockFactory extends BlockFactory {
}

@Override
DataBlock create(long index, int limit,
DataBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
return new ByteArrayBlock(0, limit, statistics);
Expand Down Expand Up @@ -436,11 +436,11 @@ static class ByteArrayBlock extends DataBlock {
private Integer dataSize;

ByteArrayBlock(long index,
int limit,
long limit,
BlockOutputStreamStatistics statistics) {
super(index, statistics);
this.limit = limit;
buffer = new S3AByteArrayOutputStream(limit);
this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit;
buffer = new S3AByteArrayOutputStream(this.limit);
blockAllocated();
}

Expand All @@ -449,7 +449,7 @@ static class ByteArrayBlock extends DataBlock {
* @return the amount of data available to upload.
*/
@Override
int dataSize() {
long dataSize() {
return dataSize != null ? dataSize : buffer.size();
}

Expand All @@ -468,14 +468,14 @@ boolean hasCapacity(long bytes) {
}

@Override
int remainingCapacity() {
long remainingCapacity() {
return limit - dataSize();
}

@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
buffer.write(b, offset, written);
return written;
}
Expand Down Expand Up @@ -514,7 +514,7 @@ static class ByteBufferBlockFactory extends BlockFactory {
}

@Override
ByteBufferBlock create(long index, int limit,
ByteBufferBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
return new ByteBufferBlock(index, limit, statistics);
Expand Down Expand Up @@ -564,11 +564,12 @@ class ByteBufferBlock extends DataBlock {
* @param statistics statistics to update
*/
ByteBufferBlock(long index,
int bufferSize,
long bufferSize,
BlockOutputStreamStatistics statistics) {
super(index, statistics);
this.bufferSize = bufferSize;
blockBuffer = requestBuffer(bufferSize);
this.bufferSize = bufferSize > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) bufferSize;
blockBuffer = requestBuffer(this.bufferSize);
blockAllocated();
}

Expand All @@ -577,7 +578,7 @@ class ByteBufferBlock extends DataBlock {
* @return the amount of data available to upload.
*/
@Override
int dataSize() {
long dataSize() {
return dataSize != null ? dataSize : bufferCapacityUsed();
}

Expand All @@ -598,7 +599,7 @@ public boolean hasCapacity(long bytes) {
}

@Override
public int remainingCapacity() {
public long remainingCapacity() {
return blockBuffer != null ? blockBuffer.remaining() : 0;
}

Expand All @@ -609,7 +610,7 @@ private int bufferCapacityUsed() {
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
blockBuffer.put(b, offset, written);
return written;
}
Expand Down Expand Up @@ -809,7 +810,7 @@ static class DiskBlockFactory extends BlockFactory {
*/
@Override
DataBlock create(long index,
int limit,
long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
File destFile = getOwner()
Expand All @@ -825,14 +826,14 @@ DataBlock create(long index,
*/
static class DiskBlock extends DataBlock {

private int bytesWritten;
private long bytesWritten;
private final File bufferFile;
private final int limit;
private final long limit;
private BufferedOutputStream out;
private final AtomicBoolean closed = new AtomicBoolean(false);

DiskBlock(File bufferFile,
int limit,
long limit,
long index,
BlockOutputStreamStatistics statistics)
throws FileNotFoundException {
Expand All @@ -844,7 +845,7 @@ static class DiskBlock extends DataBlock {
}

@Override
int dataSize() {
long dataSize() {
return bytesWritten;
}

Expand All @@ -854,14 +855,14 @@ boolean hasCapacity(long bytes) {
}

@Override
int remainingCapacity() {
long remainingCapacity() {
return limit - bytesWritten;
}

@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
out.write(b, offset, written);
bytesWritten += written;
return written;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private ArnResource accessPoint;

/**
* Is this S3A FS instance has multipart uploads enabled?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grammar nit
"is multipart upload enabled?"

*/
private boolean isMultipartEnabled;

/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
Expand Down Expand Up @@ -533,6 +538,8 @@ public void initialize(URI name, Configuration originalConf)
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
this.isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
MULTIPART_UPLOAD_ENABLED_DEFAULT);

initThreadPools(conf);

Expand Down Expand Up @@ -595,7 +602,6 @@ public void initialize(URI name, Configuration originalConf)
}
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
DEFAULT_FAST_UPLOAD_BUFFER);
partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
blockOutputActiveBlocks = intOption(conf,
FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
Expand Down Expand Up @@ -1831,6 +1837,11 @@ private FSDataOutputStream innerCreateFile(
final PutObjectOptions putOptions =
new PutObjectOptions(keep, null, options.getHeaders());

if(!checkDiskBuffer(getConf())){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.

Copy link
Contributor

@mukund-thakur mukund-thakur Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.

This is still pending. I don't really mind leaving it as it is but I think my suggestion is consistent with other parts of the code and is more readable.
CC @steveloughran

throw new IOException("Unable to create OutputStream with the given"
+ "multipart upload and buffer configuration.");
}

final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
.withKey(destKey)
Expand All @@ -1854,7 +1865,8 @@ private FSDataOutputStream innerCreateFile(
.withCSEEnabled(isCSEEnabled)
.withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withMultipartEnabled(isMultipartEnabled);
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
null);
Expand Down
Loading