Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format #24730

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.formats.common.Converter;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.jackson.JacksonMapperFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

Expand All @@ -40,7 +42,7 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
private final FSDataOutputStream stream;
private final Converter<T, R, C> converter;
@Nullable private final C converterContext;
private final ObjectWriter csvWriter;
private final JsonGenerator generator;

CsvBulkWriter(
CsvMapper mapper,
Expand All @@ -51,13 +53,18 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
checkNotNull(mapper);
checkNotNull(schema);

// Prevent Jackson's writeValue() method calls from closing the stream.
mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
mapper.disable(SerializationFeature.FLUSH_AFTER_WRITE_VALUE);

this.converter = checkNotNull(converter);
this.stream = checkNotNull(stream);
this.converterContext = converterContext;
this.csvWriter = mapper.writer(schema);

// Prevent Jackson's writeValue() method calls from closing the stream.
mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
try {
this.generator = mapper.writer(schema).createGenerator(stream, JsonEncoding.UTF8);
} catch (IOException e) {
throw new FlinkRuntimeException("Could not create CSV generator.", e);
}
}

/**
Expand Down Expand Up @@ -98,16 +105,17 @@ static <T> CsvBulkWriter<T, T, Void> forPojo(Class<T> pojoClass, FSDataOutputStr
@Override
public void addElement(T element) throws IOException {
final R r = converter.convert(element, converterContext);
csvWriter.writeValue(stream, r);
generator.writeObject(r);
}

@Override
public void flush() throws IOException {
stream.flush();
generator.flush();
}

@Override
public void finish() throws IOException {
generator.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CsvGenerator honours AUTO_CLOSE_TARGET so generator.close() will flush the underlying Writer but not close it.

It would be great to have unit tests proving this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add unit test to verify the desired behavior.

Copy link
Contributor

@afedulov afedulov May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GOODBOY008 properly testing this behavior with a unit test might be tricky. I wrote a quick sketch of an integration test that you could consider making use of:
https://github.com/afedulov/flink/blob/fix-csv-flush-test/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
I did not spend much time coming up with proper assertions, you can surely come up with something more elegant.

stream.sync();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.csv;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.TestDataGenerators;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.commons.io.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

public class CsvBulkWriterIT {

@TempDir File outDir;

/**
* FLINK-35240 : Verifies that Jackson CSV writer does not flush per record but waits for a
* flush signal from Flink.
*/
@Test
public void testNoDataIsWrittenBeforeFlinkFlush() throws Exception {

Configuration config = new Configuration();
config.set(
RestartStrategyOptions.RESTART_STRATEGY,
RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(1);
env.enableCheckpointing(100);

// Workaround serialization limitations
File outDirRef = new File(outDir.getAbsolutePath());

FileSink<Pojo> sink =
FileSink.forBulkFormat(
new org.apache.flink.core.fs.Path(outDir.getAbsolutePath()),
out -> {
FSDataOutputStreamWrapper outputStreamWrapper =
new FSDataOutputStreamWrapper(out);
return new CsvBulkWriterWrapper<>(
CsvBulkWriter.forPojo(Pojo.class, outputStreamWrapper),
outputStreamWrapper,
outDirRef);
})
.build();

List<Pojo> integers = Arrays.asList(new Pojo(1), new Pojo(2));
DataGeneratorSource<Pojo> generatorSource =
TestDataGenerators.fromDataWithSnapshotsLatch(
integers, TypeInformation.of(Pojo.class));
env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "").sinkTo(sink);
env.execute();
assertThat(getResultsFromSinkFiles(outDir)).containsSequence("1", "2", "1", "2");
}

private static class CsvBulkWriterWrapper<T> implements BulkWriter<T> {

private static int addedElements = 0;

private static int expectedFlushedElements = 0;

private final CsvBulkWriter<T, ?, ?> csvBulkWriter;

private final File outDir;

private final FSDataOutputStreamWrapper stream;

CsvBulkWriterWrapper(
CsvBulkWriter<T, ?, ?> csvBulkWriter,
FSDataOutputStreamWrapper stream,
File outDir) {
this.csvBulkWriter = csvBulkWriter;
this.stream = stream;
this.outDir = outDir;
}

@Override
public void addElement(T element) throws IOException {
addedElements++;
csvBulkWriter.addElement(element);
assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements);
}

@Override
public void flush() throws IOException {
csvBulkWriter.flush();
expectedFlushedElements = addedElements;
assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements);
}

@Override
public void finish() throws IOException {
csvBulkWriter.finish();
// The stream should not be closed by the CsvBulkWriter.finish() method
assertThat(stream.closed).isFalse();
}
}

private static class FSDataOutputStreamWrapper extends FSDataOutputStream {

private boolean closed = false;

private final FSDataOutputStream stream;

FSDataOutputStreamWrapper(FSDataOutputStream stream) {
this.stream = stream;
}

@Override
public long getPos() throws IOException {
return stream.getPos();
}

@Override
public void write(int b) throws IOException {
stream.write(b);
}

@Override
public void write(byte @NotNull [] b) throws IOException {
stream.write(b);
}

@Override
public void write(byte @NotNull [] b, int off, int len) throws IOException {
stream.write(b, off, len);
}

@Override
public void flush() throws IOException {
stream.flush();
}

@Override
public void sync() throws IOException {
stream.sync();
}

@Override
public void close() throws IOException {
stream.close();
closed = true;
}
}

public static class Pojo {
public long x;

public Pojo(long x) {
this.x = x;
}

public Pojo() {}
}

private static List<String> getResultsFromSinkFiles(File outDir) throws IOException {
final Map<File, String> contents = getFileContentByPath(outDir);
return contents.entrySet().stream()
.flatMap(e -> Arrays.stream(e.getValue().split("\n")))
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}

private static Map<File, String> getFileContentByPath(File directory) throws IOException {
Map<File, String> contents = new HashMap<>();

final Collection<File> filesInBucket = FileUtils.listFiles(directory, null, true);
for (File file : filesInBucket) {
contents.put(file, FileUtils.readFileToString(file));
}
return contents;
}
}
28 changes: 28 additions & 0 deletions flink-formats/flink-csv/src/test/resources/log4j2-test.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n