Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19981][core][table] Add name-based field mode for Row #14420

Closed
wants to merge 3 commits into from

Conversation

twalthr
Copy link
Contributor

@twalthr twalthr commented Dec 18, 2020

What is the purpose of the change

This adds a name-based field mode to the Row class. It simplifies the handling of large rows (possibly with hundreds of fields) and will make it easier to switch between DataStream API and Table API.

See the documentation of Row:

 * <p>Fields of a row can be accessed either position-based or name-based. An implementer can decide
 * in which field mode a row should operate during creation. Rows that were produced by the framework
 * support a hybrid of both field modes (i.e. named positions):
 *
 * <h1>Position-based field mode</h1>
 *
 * <p>{@link Row#withPositions(int)} creates a fixed-length row. The fields can be accessed by position
 * (zero-based) using {@link #getField(int)} and {@link #setField(int, Object)}. Every field is initialized
 * with {@code null} by default.
 *
 * <h1>Name-based field mode</h1>
 *
 * <p>{@link Row#withNames()} creates a variable-length row. The fields can be accessed by name using
 * {@link #getField(String)} and {@link #setField(String, Object)}. Every field is initialized during
 * the first call to {@link #setField(String, Object)} for the given name. However, the framework will
 * initialize missing fields with {@code null} and reorder all fields once more type information is
 * available during serialization or input conversion. Thus, even name-based rows eventually become
 * fixed-length composite types with a deterministic field order.
 *
 * <h1>Hybrid / named-position field mode</h1>
 *
 * <p>Rows that were produced by the framework (after deserialization or output conversion) are fixed-length
 * rows with a deterministic field order that can map static field names to field positions. Thus, fields
 * can be accessed both via {@link #getField(int)} and {@link #getField(String)}. Both {@link #setField(int, Object)}
 * and {@link #setField(String, Object)} are supported for existing fields. However, adding new field
 * names via {@link #setField(String, Object)} is not allowed. A hybrid row's {@link #equals(Object)}
 * supports comparing to all kinds of rows. A hybrid row's {@link #hashCode()} is only valid for position-based
 * rows.

Brief change log

  • Update Row
  • Update RowSerializer
  • Update RowRowConverter

Verifying this change

This change added tests and can be verified as follows:

  • RowTest
  • RowSerializerTest
  • DataStructureConverterTest
  • RowFunctionTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 18, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit d5d48b9 (Fri May 28 07:00:33 UTC 2021)

Warnings:

  • 1 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 18, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, this looks very good! I had some inline questions.

And we should also wait until we have some benchmarking results.

public RowSerializer(
TypeSerializer<?>[] fieldSerializers,
@Nullable LinkedHashMap<String, Integer> positionByName,
boolean legacyModeEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if it wouldn't make sense to drop legacy mode now, for Flink 1.13.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I can open a PR for it

@@ -90,8 +88,6 @@ private boolean deepEquals0(Object e1, Object e2) {
return deepEqualsArray(e1, e2);
} else if (e1 instanceof Tuple && e2 instanceof Tuple) {
return deepEqualsTuple((Tuple) e1, (Tuple) e2);
} else if (e1 instanceof Row && e2 instanceof Row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed anymore because the Row itself now does the deep equals, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the Row implementation supports that now. And if a custom implementation is still needed the checker allows to override it if necessary.

final Object value = external.getField(pos);
genericRow.setField(pos, fieldConverters[pos].toInternalOrNull(value));

final Set<String> fieldNames = external.getFieldNames(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be interesting to see in benchmarks what the impact of this is, and of name-based field access in general.

@twalthr
Copy link
Contributor Author

twalthr commented Dec 18, 2020

Thanks for the review @aljoscha. I will prepare some benchmarks as the next step.

@twalthr
Copy link
Contributor Author

twalthr commented Jan 4, 2021

I added a commit for with benchmarks in this PR. I will remove it again during merging. As expected, the name-based rows perform 35 % worse then position-based rows. But I think for the sake of code readability and not having to deal with indices, this is acceptable. Users can always fallback to position-based fields. I also added a comment to the JavaDoc about this.

Here are the benchmark results:

RowBenchmark.testBefore                   thrpt   45  2.559 ± 0.020  ops/s        // before this PR
RowBenchmark.testNamed                    thrpt   45  1.632 ± 0.014  ops/s
RowBenchmark.testPositioned               thrpt   45  2.653 ± 0.014  ops/s
RowBenchmark.testNamedNotCopy             thrpt   45  2.886 ± 0.042  ops/s
RowBenchmark.testPositionedNotCopy        thrpt   45  4.441 ± 0.013  ops/s
RowBenchmark.testPojo                     thrpt   45  2.904 ± 0.027  ops/s
RowBenchmark.testPositionedSerialization  thrpt   45  4.591 ± 0.030  ops/s
RowBenchmark.testNamedSerialization       thrpt   45  3.396 ± 0.020  ops/s

Interestingly, the new refactoring of the serializer seems to slightly improve the position-based rows as well. This might be due to a modified RowSerializer.deserialize.

@aljoscha aljoscha self-assigned this Jan 4, 2021
@aljoscha
Copy link
Contributor

aljoscha commented Jan 4, 2021

Changes still look good to me! And thanks for taking the time to do the benchmark. And yes, it's really interesting that this also increases perf for positional rows... 😅

twalthr added a commit to twalthr/flink that referenced this pull request Jan 4, 2021
This adds a name-based field mode to the Row class. A row can
 operate in 3 different modes: name-based, position-based, or
a hybrid of both when leaving the Flink runtime. It simplifies
the handling of large rows (possibly with hundreds of fields)
and will make it easier to switch between DataStream API and
Table API.

See the documentation of the Row class for more information.

This closes apache#14420.
twalthr added a commit to twalthr/flink that referenced this pull request Jan 4, 2021
This adds a name-based field mode to the Row class. A row can
 operate in 3 different modes: name-based, position-based, or
a hybrid of both when leaving the Flink runtime. It simplifies
the handling of large rows (possibly with hundreds of fields)
and will make it easier to switch between DataStream API and
Table API.

See the documentation of the Row class for more information.

This closes apache#14420.
@twalthr twalthr closed this in d2e9aeb Jan 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants