From e62f412e90da6161611e1ab15ba32c197966edbd Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Fri, 26 Apr 2024 14:32:48 +0800 Subject: [PATCH 1/6] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format --- .../flink/formats/csv/CsvBulkWriter.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java index f9a8e01bb0704..5684222015a66 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java @@ -23,8 +23,10 @@ import org.apache.flink.formats.common.Converter; import org.apache.flink.util.jackson.JacksonMapperFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; @@ -41,6 +43,7 @@ class CsvBulkWriter implements BulkWriter { private final Converter converter; @Nullable private final C converterContext; private final ObjectWriter csvWriter; + private final JsonGenerator generator; CsvBulkWriter( CsvMapper mapper, @@ -51,13 +54,19 @@ class CsvBulkWriter implements BulkWriter { checkNotNull(mapper); checkNotNull(schema); + // Prevent Jackson's writeValue() method calls from closing the stream. + mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + mapper.disable(SerializationFeature.FLUSH_AFTER_WRITE_VALUE); + this.converter = checkNotNull(converter); this.stream = checkNotNull(stream); this.converterContext = converterContext; this.csvWriter = mapper.writer(schema); - - // Prevent Jackson's writeValue() method calls from closing the stream. - mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + try { + this.generator = csvWriter.createGenerator(stream, JsonEncoding.UTF8); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** @@ -98,11 +107,12 @@ static CsvBulkWriter forPojo(Class pojoClass, FSDataOutputStr @Override public void addElement(T element) throws IOException { final R r = converter.convert(element, converterContext); - csvWriter.writeValue(stream, r); + csvWriter.writeValue(generator, r); } @Override public void flush() throws IOException { + generator.flush(); stream.flush(); } From 478c42d32284abed1f7eb805a252d28569c94348 Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Mon, 29 Apr 2024 13:33:51 +0800 Subject: [PATCH 2/6] Address comment --- .../java/org/apache/flink/formats/csv/CsvBulkWriter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java index 5684222015a66..784a08e9042b3 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.formats.common.Converter; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding; @@ -65,7 +66,7 @@ class CsvBulkWriter implements BulkWriter { try { this.generator = csvWriter.createGenerator(stream, JsonEncoding.UTF8); } catch (IOException e) { - throw new RuntimeException(e); + throw new FlinkRuntimeException("Could not create CSV generator.", e); } } @@ -113,11 +114,11 @@ public void addElement(T element) throws IOException { @Override public void flush() throws IOException { generator.flush(); - stream.flush(); } @Override public void finish() throws IOException { + generator.close(); stream.sync(); } } From 4922de5b9ca3b2e70c5f7ff64bdd9d255cb18ee5 Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Tue, 28 May 2024 12:05:23 +0800 Subject: [PATCH 3/6] Add unit test --- .../flink/formats/csv/CsvBulkWriter.java | 7 +- .../flink/formats/csv/CsvBulkWriterIT.java | 205 ++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 28 +++ 3 files changed, 235 insertions(+), 5 deletions(-) create mode 100644 flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java create mode 100644 flink-formats/flink-csv/src/test/resources/log4j2-test.properties diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java index 784a08e9042b3..1d7ea6a6b0dd0 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java @@ -26,7 +26,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; @@ -43,7 +42,6 @@ class CsvBulkWriter implements BulkWriter { private final FSDataOutputStream stream; private final Converter converter; @Nullable private final C converterContext; - private final ObjectWriter csvWriter; private final JsonGenerator generator; CsvBulkWriter( @@ -62,9 +60,8 @@ class CsvBulkWriter implements BulkWriter { this.converter = checkNotNull(converter); this.stream = checkNotNull(stream); this.converterContext = converterContext; - this.csvWriter = mapper.writer(schema); try { - this.generator = csvWriter.createGenerator(stream, JsonEncoding.UTF8); + this.generator = mapper.writer(schema).createGenerator(stream, JsonEncoding.UTF8); } catch (IOException e) { throw new FlinkRuntimeException("Could not create CSV generator.", e); } @@ -108,7 +105,7 @@ static CsvBulkWriter forPojo(Class pojoClass, FSDataOutputStr @Override public void addElement(T element) throws IOException { final R r = converter.convert(element, converterContext); - csvWriter.writeValue(generator, r); + generator.writeObject(r); } @Override diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java new file mode 100644 index 0000000000000..52f8f9bc8462f --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.TestDataGenerators; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.io.FileUtils; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CsvBulkWriterIT { + + @TempDir File outDir; + + @Test + public void testNoDataIsWrittenBeforeFlush() throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(100); + env.setRestartStrategy(RestartStrategies.noRestart()); + + // Workaround serialization limitations + File outDirRef = new File(outDir.getAbsolutePath()); + + FileSink sink = + FileSink.forBulkFormat( + new org.apache.flink.core.fs.Path(outDir.getAbsolutePath()), + out -> { + FSDataOutputStreamWrapper outputStreamWrapper = + new FSDataOutputStreamWrapper(out); + return new CsvBulkWriterWrapper<>( + CsvBulkWriter.forPojo(Pojo.class, outputStreamWrapper), + outputStreamWrapper, + outDirRef); + }) + .build(); + + List integers = Arrays.asList(new Pojo(1), new Pojo(2)); + DataGeneratorSource generatorSource = + TestDataGenerators.fromDataWithSnapshotsLatch( + integers, TypeInformation.of(Pojo.class)); + env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "").sinkTo(sink); + env.execute(); + assertThat(getResultsFromSinkFiles(outDir)).containsSequence("1", "2", "1", "2"); + } + + private static class CsvBulkWriterWrapper implements BulkWriter { + + private static int callCounter = 0; + + private static int unFlushedCounter = 0; + + private final CsvBulkWriter csvBulkWriter; + + private final File outDir; + + private final FSDataOutputStreamWrapper stream; + + CsvBulkWriterWrapper( + CsvBulkWriter csvBulkWriter, + FSDataOutputStreamWrapper stream, + File outDir) { + this.csvBulkWriter = csvBulkWriter; + this.stream = stream; + this.outDir = outDir; + } + + @Override + public void addElement(T element) throws IOException { + + callCounter++; + unFlushedCounter++; + + csvBulkWriter.addElement(element); + + assertThat(getResultsFromSinkFiles(outDir)).hasSize(callCounter - unFlushedCounter); + } + + @Override + public void flush() throws IOException { + csvBulkWriter.flush(); + assertThat(getResultsFromSinkFiles(outDir)).hasSize(callCounter); + unFlushedCounter = 0; + } + + @Override + public void finish() throws IOException { + csvBulkWriter.finish(); + // The stream should not be closed by the CsvBulkWriter.finish() method + assertThat(stream.closed).isFalse(); + } + } + + private static class FSDataOutputStreamWrapper extends FSDataOutputStream { + + private boolean closed = false; + + private final FSDataOutputStream stream; + + FSDataOutputStreamWrapper(FSDataOutputStream stream) { + this.stream = stream; + } + + @Override + public long getPos() throws IOException { + return stream.getPos(); + } + + @Override + public void write(int b) throws IOException { + stream.write(b); + } + + @Override + public void write(byte @NotNull [] b) throws IOException { + stream.write(b); + } + + @Override + public void write(byte @NotNull [] b, int off, int len) throws IOException { + stream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + @Override + public void sync() throws IOException { + stream.sync(); + } + + @Override + public void close() throws IOException { + stream.close(); + closed = true; + } + } + + public static class Pojo { + public long x; + + public Pojo(long x) { + this.x = x; + } + + public Pojo() {} + } + + private static List getResultsFromSinkFiles(File outDir) throws IOException { + final Map contents = getFileContentByPath(outDir); + return contents.entrySet().stream() + .flatMap(e -> Arrays.stream(e.getValue().split("\n"))) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } + + private static Map getFileContentByPath(File directory) throws IOException { + Map contents = new HashMap<>(); + + final Collection filesInBucket = FileUtils.listFiles(directory, null, true); + for (File file : filesInBucket) { + contents.put(file, FileUtils.readFileToString(file)); + } + return contents; + } +} diff --git a/flink-formats/flink-csv/src/test/resources/log4j2-test.properties b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000000..e463a0e1a088c --- /dev/null +++ b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n From 9958af34d87b8915f13e7e3875c415b11c75e04f Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Wed, 29 May 2024 19:30:17 +0800 Subject: [PATCH 4/6] address comment --- .../flink/formats/csv/CsvBulkWriterIT.java | 26 +++++++++---------- .../src/test/resources/log4j2-test.properties | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java index 52f8f9bc8462f..4d383aa3e14f5 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java @@ -19,9 +19,10 @@ package org.apache.flink.formats.csv; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.datagen.source.TestDataGenerators; import org.apache.flink.connector.file.sink.FileSink; @@ -51,10 +52,13 @@ public class CsvBulkWriterIT { @Test public void testNoDataIsWrittenBeforeFlush() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration config = new Configuration(); + config.set( + RestartStrategyOptions.RESTART_STRATEGY, + RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue()); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.noRestart()); // Workaround serialization limitations File outDirRef = new File(outDir.getAbsolutePath()); @@ -83,9 +87,9 @@ public void testNoDataIsWrittenBeforeFlush() throws Exception { private static class CsvBulkWriterWrapper implements BulkWriter { - private static int callCounter = 0; + private static int addedElements = 0; - private static int unFlushedCounter = 0; + private static int expectedFlushedElements = 0; private final CsvBulkWriter csvBulkWriter; @@ -104,20 +108,16 @@ private static class CsvBulkWriterWrapper implements BulkWriter { @Override public void addElement(T element) throws IOException { - - callCounter++; - unFlushedCounter++; - + addedElements++; csvBulkWriter.addElement(element); - - assertThat(getResultsFromSinkFiles(outDir)).hasSize(callCounter - unFlushedCounter); + assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements); } @Override public void flush() throws IOException { csvBulkWriter.flush(); - assertThat(getResultsFromSinkFiles(outDir)).hasSize(callCounter); - unFlushedCounter = 0; + expectedFlushedElements = addedElements; + assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements); } @Override diff --git a/flink-formats/flink-csv/src/test/resources/log4j2-test.properties b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties index e463a0e1a088c..835c2ec9a3d02 100644 --- a/flink-formats/flink-csv/src/test/resources/log4j2-test.properties +++ b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = INFO +rootLogger.level = OFF rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger From 53b341dd445cbeef1f730981c7a4c41074676a85 Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Thu, 20 Jun 2024 11:45:19 +0800 Subject: [PATCH 5/6] address comment --- .../java/org/apache/flink/formats/csv/CsvBulkWriterIT.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java index 4d383aa3e14f5..84857030d01c6 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java @@ -49,8 +49,12 @@ public class CsvBulkWriterIT { @TempDir File outDir; + /** + * FLINK-35240 : Verifies that Jackson CSV writer does not flush per record but waits for a + * flush signal from Flink. + */ @Test - public void testNoDataIsWrittenBeforeFlush() throws Exception { + public void testNoDataIsWrittenBeforeFlinkFlush() throws Exception { Configuration config = new Configuration(); config.set( From 4d835f9272304bd5535f1d0ac96261e8e92eece9 Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Thu, 11 Jul 2024 16:32:47 +0800 Subject: [PATCH 6/6] rebase to master --- .../java/org/apache/flink/formats/csv/CsvBulkWriterIT.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java index 84857030d01c6..aa070b22abad5 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java @@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.commons.io.FileUtils; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -153,12 +152,12 @@ public void write(int b) throws IOException { } @Override - public void write(byte @NotNull [] b) throws IOException { + public void write(byte[] b) throws IOException { stream.write(b); } @Override - public void write(byte @NotNull [] b, int off, int len) throws IOException { + public void write(byte[] b, int off, int len) throws IOException { stream.write(b, off, len); }