diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java index 8cce4ce44c47c..49130aeecdd84 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java @@ -126,7 +126,16 @@ public long getPos() throws IOException { @Override public void sync() throws IOException { - fileStream.sync(); + lock(); + try { + fileStream.flush(); + openNewPartIfNecessary(userDefinedMinPartSize); + Committer committer = upload.snapshotAndGetCommitter(); + committer.commitAfterRecovery(); + closeForCommit(); + } finally { + unlock(); + } } @Override diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java index 5c7156513b567..29193dceb6b88 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java @@ -254,6 +254,16 @@ public void closeForCommitOnClosedStreamShouldFail() throws IOException { streamUnderTest.closeForCommit().commit(); } + @Test(expected = Exception.class) + public void testSync() throws IOException { + streamUnderTest.write(bytesOf("hello")); + streamUnderTest.write(bytesOf(" world")); + streamUnderTest.sync(); + assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world"))); + streamUnderTest.write(randomBuffer(RefCountedBufferingFileStream.BUFFER_SIZE + 1)); + assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world"))); + } + // ------------------------------------------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------------------------------------------