From 6487f0fb03870a74b948105ec685462e7b00cbc2 Mon Sep 17 00:00:00 2001 From: Jelmer Kuperus Date: Wed, 28 Feb 2018 21:34:08 +0100 Subject: [PATCH] [FLINK-8814] [file system sinks] Control over the extension of part files created by BucketingSink. --- .../fs/bucketing/BucketingSink.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 6e7f460a6dc63..faf3c566803e8 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -226,7 +226,12 @@ public class BucketingSink /** * The default prefix for part files. */ - private static final String DEFAULT_PART_REFIX = "part"; + private static final String DEFAULT_PART_PREFIX = "part"; + + /** + * The default suffix for part files. + */ + private static final String DEFAULT_PART_SUFFIX = null; /** * The default timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}). @@ -263,7 +268,8 @@ public class BucketingSink private String validLengthSuffix = DEFAULT_VALID_SUFFIX; private String validLengthPrefix = DEFAULT_VALID_PREFIX; - private String partPrefix = DEFAULT_PART_REFIX; + private String partPrefix = DEFAULT_PART_PREFIX; + private String partSuffix = DEFAULT_PART_SUFFIX; private boolean useTruncate = true; @@ -530,6 +536,10 @@ private void openNewPartFile(Path bucketPath, BucketState bucketState) throws partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); } + if (partSuffix != null) { + partPath = partPath.suffix(partSuffix); + } + // increase, so we don't have to check for this name next time bucketState.partCounter++; @@ -986,6 +996,14 @@ public BucketingSink setValidLengthPrefix(String validLengthPrefix) { return this; } + /** + * Sets the prefix of part files. The default is no suffix. + */ + public BucketingSink setPartSuffix(String partSuffix) { + this.partSuffix = partSuffix; + return this; + } + /** * Sets the prefix of part files. The default is {@code "part"}. */