Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14266][table] Introduce RowCsvInputFormat to new CSV module #9884

Merged
merged 5 commits into from Apr 7, 2020

Conversation

JingsongLi
Copy link
Contributor

What is the purpose of the change

Now, we have an old CSV, but that is not standard CSV support. we should support the RFC-compliant CSV format for table/sql.

Brief change log

Add RowCsvInputFormat and Use jackson ObjectReader.readValues(InputStream). We need deal with half-line reading when splitting large files into multiple splits. The difficulties are:

  1. ObjectReader do not know current read offset, it has buffer to cache more bytes. But we need stop in the right place for reading a FileSplit. We use BoundedInputStream.
  2. For the half-line reading, in open, we look for the next delimiter for split start, discard the first half of the line; and look for the next delimiter for split end to complete the whole line.

Verifying this change

RowCsvInputFormatTest and RowCsvInputFormatSplitTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 12, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit a15a359 (Wed Dec 04 14:56:42 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 12, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JingsongLi for the PR. I added an initial set of comments. It would be great if we could further reduce the number of limitations. The CSV format is one of the most important batch connectors and should have a feature set similar to the (de)serialization schema. Otherwise we need to document a lot of limitations in descriptors and docs.

private int[] selectedFields;

/**
* Creates a CSV deserialization schema for the given {@link TypeInformation} and file paths
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure to update all comments if you copy code.

/**
* Test split logic for {@link RowCsvInputFormat}.
*/
public class RowCsvInputFormatSplitTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if these tests are sufficient. What about strings sourrounded by " or escaped delimiters? Could you copy more tests around escaping for the deserialization schema tests?

assertTrue(format.reachedEnd());
}

// NOTE: new csv not support configure multi chars field delimiter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all limitations mentioned here should also be mentioned in the input format class as well as in the descriptor

@twalthr
Copy link
Contributor

twalthr commented Oct 17, 2019

Btw did you have a look at #4660. Isn't this issue a duplicate?

@JingsongLi
Copy link
Contributor Author

Thanks @JingsongLi for the PR. I added an initial set of comments. It would be great if we could further reduce the number of limitations. The CSV format is one of the most important batch connectors and should have a feature set similar to the (de)serialization schema. Otherwise we need to document a lot of limitations in descriptors and docs.

Thanks @twalthr for your review.
These limitations are compared with the previous CsvInputFormat in flink-java, not the RFC-(de)serialization schema in flink-csv. Some can continue to improve, some are more difficult (relying on Jackson).
You are right, we need to document a lot of limitations in docs.

@JingsongLi
Copy link
Contributor Author

Btw did you have a look at #4660. Isn't this issue a duplicate?

The differences are:

  • This PR is in flink-csv instead of flink-table.
  • This PR is consistent with the existing (de)serialization schema.
  • This PR deals with escaping characters with line delimiter.

There are some other reasons why I put forward this PR:

@JingsongLi
Copy link
Contributor Author

Hi @twalthr , fixed comments, please take a look~

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi , Thanks for your great contribution, I left some minor comments


long realStart = splitStart;
if (splitStart != 0) {
realStart = findNextLegalSeparator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe findLegalSplitStart will be more meaningful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use findNextLineStartOffset.


if (splitLength != READ_WHOLE_SPLIT_FLAG) {
stream.seek(splitStart + splitLength);
long firstByteOfNextLine = findNextLegalSeparator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using startOfNextSplit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextLineStartOffset

*/
public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {

private static final long serialVersionUID = 1L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generating serialVersionUID using IDE would be better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According Flink code style, 1 is the first uid.


long pos = stream.getPos();

// deal with "\r\n", next one maybe '\n', so we need skip it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there exists other similar special chars combination in csv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jackson/standard csv only support '\r' '\n' '\r\n'. No '\n\r'

OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8);
wrt.write(content);
wrt.close();
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), start,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests that input split comes from input spilt's num > 1 file so we can cover the findNextLegalSeparator ?
eg:
FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames());
FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowCsvInputFormatSplitTest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowCsvInputFormatSplitTest?
ok

@JingsongLi
Copy link
Contributor Author

Hi @twalthr , do you have other concerns?

@JingsongLi
Copy link
Contributor Author

@JingsongLi JingsongLi merged commit 04096fc into apache:master Apr 7, 2020
KarmaGYZ pushed a commit to KarmaGYZ/flink that referenced this pull request Apr 10, 2020
leonardBang pushed a commit to leonardBang/flink that referenced this pull request Apr 10, 2020
@JingsongLi JingsongLi deleted the csv branch April 26, 2020 05:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants