Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded()
LOG.error("Number of partitions in stream exceeds limit for S3: "
+ Constants.MAX_MULTIPART_COUNT + " write may fail.");
}
activeBlock = blockFactory.create(blockCount, this.blockSize, statistics);
activeBlock = blockFactory.create(
writeOperationHelper.getAuditSpan().getSpanId(),
key, blockCount, this.blockSize, statistics);
}
return activeBlock;
}
Expand Down Expand Up @@ -728,8 +730,14 @@ public void hsync() throws IOException {

/**
* Shared processing of Syncable operation reporting/downgrade.
*
* Syncable API is not supported, so calls to hsync/hflush will throw an
* UnsupportedOperationException unless the stream was constructed with
* {@link #downgradeSyncableExceptions} set to true, in which case the stream is flushed.
* @throws IOException IO Problem
* @throws UnsupportedOperationException if downgrade syncable exceptions is set to false
*/
private void handleSyncableInvocation() {
private void handleSyncableInvocation() throws IOException {
final UnsupportedOperationException ex
= new UnsupportedOperationException(E_NOT_SYNCABLE);
if (!downgradeSyncableExceptions) {
Expand All @@ -741,6 +749,7 @@ private void handleSyncableInvocation() {
key);
// and log at debug
LOG.debug("Downgrading Syncable call", ex);
flush();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,14 @@ protected BlockFactory(S3AFileSystem owner) {
/**
* Create a block.
*
* @param spanId id of the audit span
* @param key key of s3 object being written to
* @param index index of block
* @param limit limit of the block.
* @param statistics stats to work with
* @return a new block.
*/
abstract DataBlock create(long index, long limit,
abstract DataBlock create(String spanId, String key, long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException;

Expand Down Expand Up @@ -391,11 +393,11 @@ static class ArrayBlockFactory extends BlockFactory {
}

@Override
DataBlock create(long index, long limit,
DataBlock create(String spanId, String key, long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit > 0,
"Invalid block size: %d", limit);
"Invalid block size: %d [%s]", limit, key);
return new ByteArrayBlock(0, limit, statistics);
}

Expand Down Expand Up @@ -516,11 +518,11 @@ static class ByteBufferBlockFactory extends BlockFactory {
}

@Override
ByteBufferBlock create(long index, long limit,
ByteBufferBlock create(String spanId, String key, long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit > 0,
"Invalid block size: %d", limit);
"Invalid block size: %d [%s]", limit, key);
return new ByteBufferBlock(index, limit, statistics);
}

Expand Down Expand Up @@ -798,6 +800,8 @@ public String toString() {
* Buffer blocks to disk.
*/
static class DiskBlockFactory extends BlockFactory {
private static final String ESCAPED_FORWARD_SLASH = "EFS";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think I might prefer something more distinguishable from text, e.g "FS" and "BS" so its easier to read in a dir listing

private static final String ESCAPED_BACKSLASH = "EBS";

DiskBlockFactory(S3AFileSystem owner) {
super(owner);
Expand All @@ -806,24 +810,31 @@ static class DiskBlockFactory extends BlockFactory {
/**
* Create a temp file and a {@link DiskBlock} instance to manage it.
*
* @param spanId id of the audit span
* @param key of the s3 object being written
* @param index block index
* @param limit limit of the block. -1 means "no limit"
* @param statistics statistics to update
* @return the new block
* @throws IOException IO problems
*/
@Override
DataBlock create(long index,
DataBlock create(String spanId, String key, long index,
long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit != 0,
"Invalid block size: %d", limit);
File destFile = getOwner()
.createTmpFileForWrite(String.format("s3ablock-%04d-", index),
limit, getOwner().getConf());
"Invalid block size: %d [%s]", limit, key);
String prefix = String.format("s3ablock-%04d-%s-%s-", index, spanId, escapeS3Key(key));
File destFile = getOwner().createTmpFileForWrite(prefix, limit, getOwner().getConf());
return new DiskBlock(destFile, limit, index, statistics);
}

protected static String escapeS3Key(String key) {
return key
.replace("\\", ESCAPED_BACKSLASH)
.replace("/", ESCAPED_FORWARD_SLASH);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1369,12 +1369,69 @@ public S3AEncryptionMethods getS3EncryptionAlgorithm() {
File createTmpFileForWrite(String pathStr, long size,
Configuration conf) throws IOException {
initLocalDirAllocatorIfNotInitialized(conf);
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
size, conf);
Path path = directoryAllocator.getLocalPathForWrite(pathStr, size, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
// create a temp file on this directory
return File.createTempFile(prefix, null, dir);
return safeCreateTempFile(pathStr, null, dir);
}

// TODO remove this method when hadoop upgrades to a newer version of java than 1.8
/**
* Ensure that the temp file prefix and suffix don't exceed the maximum number of characters
* allowed by the underlying file system. This validation isn't required in Java 9+ since
* {@link java.io.File#createTempFile(String, String, File)} automatically truncates file names.
*
* @param prefix prefix for the temporary file
* @param suffix suffix for the temporary file
* @param dir directory to create the temporary file in
* @return a unique temporary file
* @throws IOException
*/
static File safeCreateTempFile(String prefix, String suffix, File dir) throws IOException
{
// avoid validating multiple times.
// if the jvm running is version 9+ then defer to java.io.File validation implementation
if(Float.parseFloat(System.getProperty("java.class.version")) >= 53) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should go in org.apache.hadoop.util.Shell; there's already something similar. will need a test somehow.

return File.createTempFile(prefix, null, dir);
}

// if no suffix was defined assume the default
if(suffix == null) {
suffix = ".tmp";
}
// Use only the file name from the supplied prefix
prefix = (new File(prefix)).getName();

int prefixLength = prefix.length();
int suffixLength = suffix.length();
int maxRandomSuffixLen = 19; // Long.toUnsignedString(Long.MAX_VALUE).length()

String name;
int nameMax = 255; // unable to access the underlying FS directly, so assume 255
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a constant, e.g ASSUMED_MAX_FILENAME

int excess = prefixLength + maxRandomSuffixLen + suffixLength - nameMax;

// shorten the prefix length if the file name exceeds 255 chars
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and replace explicit size with "too long"

if (excess > 0) {
// Attempt to shorten the prefix length to no less than 3
prefixLength = shortenSubName(prefixLength, excess, 3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, make a constant and use in both places

prefix = prefix.substring(0, prefixLength);
}
// shorten the suffix if the file name still exceeds 255 chars
excess = prefixLength + maxRandomSuffixLen + suffixLength - nameMax;
if (excess > 0) {
// Attempt to shorten the suffix length to no less than 3
suffixLength = shortenSubName(suffixLength, excess, 3);
suffix = suffix.substring(0, suffixLength);
}

return File.createTempFile(prefix, suffix, dir);
}

private static int shortenSubName(int subNameLength, int excess, int nameMin) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadocs. a unit test would be good too and straightforward to add.

int newLength = Math.max(nameMin, subNameLength - excess);
if (newLength < subNameLength) {
return newLength;
}
return subNameLength;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public <T> T retry(String action,
* Get the audit span this object was created with.
* @return the audit span
*/
@Override
public AuditSpan getAuditSpan() {
return auditSpan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.util.functional.CallableRaisingIOE;

Expand Down Expand Up @@ -305,6 +306,12 @@ UploadPartResult uploadPart(UploadPartRequest request,
*/
Configuration getConf();

/**
* Get the audit span this object was created with.
* @return the audit span
*/
AuditSpan getAuditSpan();

/**
* Create a S3 Select request for the destination path.
* This does not build the query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Objects;

import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
import static org.apache.hadoop.fs.s3a.Constants.*;
Expand Down Expand Up @@ -79,6 +82,46 @@ public void testRegularUpload() throws IOException {
verifyUpload("regular", 1024);
}

/**
* Test that the DiskBlock's local file doesn't result in error when the S3 key exceeds the max
* char limit of the local file system. Currently
* {@link java.io.File#createTempFile(String, String, File)} is being relied on to handle the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is out of date

* truncation.
* @throws IOException
*/
@Test
public void testDiskBlockCreate() throws IOException {
String s3Key = // 1024 char
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. you could do this with a shorter string and simply concatenate it a few times
  2. its duplicated in TestS3aFilesystem...it should be a constant in S3ATestConstants

"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
"very_long_s3_key";
long blockSize = getFileSystem().getDefaultBlockSize();
try (S3ADataBlocks.BlockFactory diskBlockFactory =
new S3ADataBlocks.DiskBlockFactory(getFileSystem());
S3ADataBlocks.DataBlock dataBlock =
diskBlockFactory.create("spanId", s3Key, 1, blockSize, null);
) {
String tmpDir = getConfiguration().get("hadoop.tmp.dir");
boolean created = Arrays.stream(
Objects.requireNonNull(new File(tmpDir).listFiles()))
.anyMatch(f -> f.getName().contains("very_long_s3_key"));
assertTrue(String.format("tmp file should have been created locally in %s", tmpDir), created);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • use AssertJ. it should actually be possible to add an assert that the array matches the requirement.
  • should be exactly one; test suite setup should delete everything with "very_long_s3_key" in it; so should teardown too, but to support things like IDE debugging, its best to do both

LOG.info(dataBlock.toString()); // block file name/location can be viewed in failsafe-report
}
}

@Test(expected = IOException.class)
public void testWriteAfterStreamClose() throws Throwable {
Path dest = path("testWriteAfterStreamClose");
Expand Down Expand Up @@ -136,7 +179,7 @@ private void markAndResetDatablock(S3ADataBlocks.BlockFactory factory)
new S3AInstrumentation(new URI("s3a://example"));
BlockOutputStreamStatistics outstats
= instrumentation.newOutputStreamStatistics(null);
S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats);
S3ADataBlocks.DataBlock block = factory.create("spanId", "object/key", 1, BLOCK_SIZE, outstats);
block.write(dataset, 0, dataset.length);
S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
InputStream stream = uploadData.getUploadStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testByteBufferIO() throws Throwable {
new S3ADataBlocks.ByteBufferBlockFactory(null)) {
int limit = 128;
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
= factory.create(1, limit, null);
= factory.create("spanId", "s3\\object/key", 1, limit, null);
assertOutstandingBuffers(factory, 1);

byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.fs.s3a.test.MinimalWriteOperationHelperCallbacks;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.util.Progressable;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -38,7 +39,10 @@
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand All @@ -59,6 +63,9 @@ private S3ABlockOutputStream.BlockOutputStreamBuilder mockS3ABuilder() {
mock(S3ADataBlocks.BlockFactory.class);
long blockSize = Constants.DEFAULT_MULTIPART_SIZE;
WriteOperationHelper oHelper = mock(WriteOperationHelper.class);
AuditSpan auditSpan = mock(AuditSpan.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just use org.apache.hadoop.fs.s3a.audit.impl.NoopSpan here; one less thing to mock.

when(auditSpan.getSpanId()).thenReturn("spanId");
when(oHelper.getAuditSpan()).thenReturn(auditSpan);
PutTracker putTracker = mock(PutTracker.class);
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
Expand Down Expand Up @@ -156,6 +163,7 @@ public void testSyncableUnsupported() throws Exception {
stream = spy(new S3ABlockOutputStream(builder));
intercept(UnsupportedOperationException.class, () -> stream.hflush());
intercept(UnsupportedOperationException.class, () -> stream.hsync());
verify(stream, never()).flush();
}

/**
Expand All @@ -169,8 +177,11 @@ public void testSyncableDowngrade() throws Exception {
builder.withDowngradeSyncableExceptions(true);
stream = spy(new S3ABlockOutputStream(builder));

verify(stream, never()).flush();
stream.hflush();
verify(stream, times(1)).flush();
stream.hsync();
verify(stream, times(2)).flush();
}

}
Loading