Skip to content

Commit

Permalink
[FLINK-9920] Only check part files in BucketingSinkFaultToleranceITCase
Browse files Browse the repository at this point in the history
Before, it could happen that other files are in the same directory. Then
the verification logic in the test would pick up those files and the
test would fail. Now we ignore all files that don't match our expected
pattern.
  • Loading branch information
aljoscha committed Feb 4, 2019
1 parent f303b98 commit 24c2e17
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 39 deletions.
Expand Up @@ -533,26 +533,33 @@ private Method reflectTruncate(FileSystem fs) {
}

// verify that truncate actually works
Path testPath = new Path(UUID.randomUUID().toString());
try (FSDataOutputStream outputStream = fs.create(testPath)) {
outputStream.writeUTF("hello");
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException("Could not create file for checking if truncate works.", e);
}

Path testPath = new Path(basePath, UUID.randomUUID().toString());
try {
m.invoke(fs, testPath, 2);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.debug("Truncate is not supported.", e);
m = null;
}
try (FSDataOutputStream outputStream = fs.create(testPath)) {
outputStream.writeUTF("hello");
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException(
"Could not create file for checking if truncate works. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}

try {
fs.delete(testPath, false);
} catch (IOException e) {
LOG.error("Could not delete truncate test file.", e);
throw new RuntimeException("Could not delete truncate test file.", e);
try {
m.invoke(fs, testPath, 2);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.debug("Truncate is not supported.", e);
m = null;
}
} finally {
try {
fs.delete(testPath, false);
} catch (IOException e) {
LOG.error("Could not delete truncate test file.", e);
throw new RuntimeException("Could not delete truncate test file. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}
}
}
return m;
Expand Down
Expand Up @@ -632,29 +632,32 @@ private Method reflectTruncate(FileSystem fs) {

// verify that truncate actually works
Path testPath = new Path(basePath, UUID.randomUUID().toString());
try (FSDataOutputStream outputStream = fs.create(testPath)) {
outputStream.writeUTF("hello");
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException("Could not create file for checking if truncate works. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}

try {
m.invoke(fs, testPath, 2);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.debug("Truncate is not supported.", e);
m = null;
}
try (FSDataOutputStream outputStream = fs.create(testPath)) {
outputStream.writeUTF("hello");
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException(
"Could not create file for checking if truncate works. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}

try {
fs.delete(testPath, false);
} catch (IOException e) {
LOG.error("Could not delete truncate test file.", e);
throw new RuntimeException("Could not delete truncate test file. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
try {
m.invoke(fs, testPath, 2);
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.debug("Truncate is not supported.", e);
m = null;
}
} finally {
try {
fs.delete(testPath, false);
} catch (IOException e) {
LOG.error("Could not delete truncate test file.", e);
throw new RuntimeException("Could not delete truncate test file. " +
"You can disable support for truncate() completely via " +
"BucketingSink.setUseTruncate(false).", e);
}
}
}
return m;
Expand Down
Expand Up @@ -66,6 +66,9 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB

static final long NUM_STRINGS = 16_000;

// this is already the default, but we explicitly set it to make the test explicit
static final String PART_PREFIX = "part";

@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();

Expand Down Expand Up @@ -115,6 +118,7 @@ public void testProgram(StreamExecutionEnvironment env) {
.setBucketer(new BasePathBucketer<String>())
.setBatchSize(10000)
.setValidLengthPrefix("")
.setPartPrefix(PART_PREFIX)
.setPendingPrefix("")
.setPendingSuffix(PENDING_SUFFIX)
.setInProgressSuffix(IN_PROGRESS_SUFFIX);
Expand Down Expand Up @@ -144,6 +148,10 @@ public void postSubmit() throws Exception {

while (files.hasNext()) {
LocatedFileStatus file = files.next();
if (!file.getPath().getName().startsWith(PART_PREFIX)) {
// ignore files that don't match with our expected part prefix
continue;
}

if (!file.getPath().toString().endsWith(".valid-length")) {
int validLength = (int) file.getLen();
Expand Down

0 comments on commit 24c2e17

Please sign in to comment.