diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java index f9a8e01bb0704..1d7ea6a6b0dd0 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java @@ -21,10 +21,12 @@ import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.formats.common.Converter; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.jackson.JacksonMapperFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; @@ -40,7 +42,7 @@ class CsvBulkWriter implements BulkWriter { private final FSDataOutputStream stream; private final Converter converter; @Nullable private final C converterContext; - private final ObjectWriter csvWriter; + private final JsonGenerator generator; CsvBulkWriter( CsvMapper mapper, @@ -51,13 +53,18 @@ class CsvBulkWriter implements BulkWriter { checkNotNull(mapper); checkNotNull(schema); + // Prevent Jackson's writeValue() method calls from closing the stream. + mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + mapper.disable(SerializationFeature.FLUSH_AFTER_WRITE_VALUE); + this.converter = checkNotNull(converter); this.stream = checkNotNull(stream); this.converterContext = converterContext; - this.csvWriter = mapper.writer(schema); - - // Prevent Jackson's writeValue() method calls from closing the stream. - mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + try { + this.generator = mapper.writer(schema).createGenerator(stream, JsonEncoding.UTF8); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not create CSV generator.", e); + } } /** @@ -98,16 +105,17 @@ static CsvBulkWriter forPojo(Class pojoClass, FSDataOutputStr @Override public void addElement(T element) throws IOException { final R r = converter.convert(element, converterContext); - csvWriter.writeValue(stream, r); + generator.writeObject(r); } @Override public void flush() throws IOException { - stream.flush(); + generator.flush(); } @Override public void finish() throws IOException { + generator.close(); stream.sync(); } } diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java new file mode 100644 index 0000000000000..aa070b22abad5 --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.TestDataGenerators; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CsvBulkWriterIT { + + @TempDir File outDir; + + /** + * FLINK-35240 : Verifies that Jackson CSV writer does not flush per record but waits for a + * flush signal from Flink. + */ + @Test + public void testNoDataIsWrittenBeforeFlinkFlush() throws Exception { + + Configuration config = new Configuration(); + config.set( + RestartStrategyOptions.RESTART_STRATEGY, + RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue()); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + env.setParallelism(1); + env.enableCheckpointing(100); + + // Workaround serialization limitations + File outDirRef = new File(outDir.getAbsolutePath()); + + FileSink sink = + FileSink.forBulkFormat( + new org.apache.flink.core.fs.Path(outDir.getAbsolutePath()), + out -> { + FSDataOutputStreamWrapper outputStreamWrapper = + new FSDataOutputStreamWrapper(out); + return new CsvBulkWriterWrapper<>( + CsvBulkWriter.forPojo(Pojo.class, outputStreamWrapper), + outputStreamWrapper, + outDirRef); + }) + .build(); + + List integers = Arrays.asList(new Pojo(1), new Pojo(2)); + DataGeneratorSource generatorSource = + TestDataGenerators.fromDataWithSnapshotsLatch( + integers, TypeInformation.of(Pojo.class)); + env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "").sinkTo(sink); + env.execute(); + assertThat(getResultsFromSinkFiles(outDir)).containsSequence("1", "2", "1", "2"); + } + + private static class CsvBulkWriterWrapper implements BulkWriter { + + private static int addedElements = 0; + + private static int expectedFlushedElements = 0; + + private final CsvBulkWriter csvBulkWriter; + + private final File outDir; + + private final FSDataOutputStreamWrapper stream; + + CsvBulkWriterWrapper( + CsvBulkWriter csvBulkWriter, + FSDataOutputStreamWrapper stream, + File outDir) { + this.csvBulkWriter = csvBulkWriter; + this.stream = stream; + this.outDir = outDir; + } + + @Override + public void addElement(T element) throws IOException { + addedElements++; + csvBulkWriter.addElement(element); + assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements); + } + + @Override + public void flush() throws IOException { + csvBulkWriter.flush(); + expectedFlushedElements = addedElements; + assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements); + } + + @Override + public void finish() throws IOException { + csvBulkWriter.finish(); + // The stream should not be closed by the CsvBulkWriter.finish() method + assertThat(stream.closed).isFalse(); + } + } + + private static class FSDataOutputStreamWrapper extends FSDataOutputStream { + + private boolean closed = false; + + private final FSDataOutputStream stream; + + FSDataOutputStreamWrapper(FSDataOutputStream stream) { + this.stream = stream; + } + + @Override + public long getPos() throws IOException { + return stream.getPos(); + } + + @Override + public void write(int b) throws IOException { + stream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + stream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + stream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + stream.flush(); + } + + @Override + public void sync() throws IOException { + stream.sync(); + } + + @Override + public void close() throws IOException { + stream.close(); + closed = true; + } + } + + public static class Pojo { + public long x; + + public Pojo(long x) { + this.x = x; + } + + public Pojo() {} + } + + private static List getResultsFromSinkFiles(File outDir) throws IOException { + final Map contents = getFileContentByPath(outDir); + return contents.entrySet().stream() + .flatMap(e -> Arrays.stream(e.getValue().split("\n"))) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } + + private static Map getFileContentByPath(File directory) throws IOException { + Map contents = new HashMap<>(); + + final Collection filesInBucket = FileUtils.listFiles(directory, null, true); + for (File file : filesInBucket) { + contents.put(file, FileUtils.readFileToString(file)); + } + return contents; + } +} diff --git a/flink-formats/flink-csv/src/test/resources/log4j2-test.properties b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000000..835c2ec9a3d02 --- /dev/null +++ b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n