Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11146] Add fasterCopy option to Flink runner #13240

Merged
merged 3 commits into from
Nov 3, 2020

Conversation

rHermes
Copy link
Contributor

@rHermes rHermes commented Oct 31, 2020

The fasterCopy option, removes an unnecessary deep copy in the Flink runner between each operator. This should lead to improved performance in all cases. The Jira issue has more info.

This is my first contribution to Beam and I just did what I thought was right. The biggest issue here was to get pipeline options into the CoderTypeSerializer. I tried my best, but welcome any feedback!

One thing I am not quite sure how to do is test this. Should all flink tests be ran twice with this option toggled? All tests pass with the change in CoderTypeSerializer applied, but I've not tried all of them with the flag enabled. How would one go about this?


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@rHermes
Copy link
Contributor Author

rHermes commented Oct 31, 2020

R: @angoenka
R: @mxm
R: @tweise

(Hope I'm doing this correct)

The copyFaster option, removes an unnecessary deep copy in the Flink
runner between each operator. This should lead to improved performance
in all cases.
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rHermes! Very solid change. I like that we are always passing the pipeline options now.

I don't think we have to re-run the tests with the option enabled, but it would make sense to add a dedicated integration test with the option enabled. Other than that, the changes look good to me.

@rHermes
Copy link
Contributor Author

rHermes commented Nov 3, 2020

Very glad to hear that I did it the right way 😄

I don't think we have to re-run the tests with the option enabled, but it would make sense to add a dedicated integration test with the option enabled.

I get the idea here, but where do you think this test would fit in, as in which file and what setup? The way I tested that this worked for me was to create a pipeline with a custom coder, and check that the coder wasn't called when running a simple pipeline. I don't have a great overview of the test methodology yet, could you point me in the right direction?

This was requested in the pull request, to make things neater.
Copy link
Contributor

@je-ik je-ik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice change Teodor! I have a minor question about naming of the flag. Thanks!

this.coder = coder;
this.pipelineOptions = null;
}
private final boolean fasterCopy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small suggestion, can we name this zeroCopy, as that is actually what it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about zeroCopy as a name, but for me this is associated with the lower level concept and I thought it might create confusion. The name fasterCopy came from the idea that it is making the copying faster, but I am very much open to other names!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, what about disableValueClone, I think that fasterCopy is a little misleading as well. Faster than what? And why it was slower before? :) It would be good for the flag to describe direct effect (disable, enable something), not a consequence (being faster).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very much agree with the name being descriptive. How about we go more opinionated, something like disableExcessCopy? There where some back and forth on the mailing list about this point, so I don't want to sneak in my opinion if that is not right. What do you think @mxm ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Folks, this is an internal boolean flag which is only used at a single place. If in doubt, one can clearly see what it does. We could add a /** */ Javadoc field comment for it if you will. IMHO the name is really personal preference and I have nothing against fasterCopy as it clearly indicates that the copy() method will return faster. Nothing semantically wrong about that. zeroCopy is not 100% true because we will perform a copy for primitive types but please let's not argue about that. DisableExcessCopy is too opinionated IMHO 😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with DisableExcessCopy being too opinionated. My vote is for fasterCopy, but it is not a hill I will die on :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it clear - although I put my comment to the internal field, I was having in mind mostly the flag in PipelineOptions. These should be aligned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that this is not 100% important, but if possible, we should make options passed in PipelineOptions the most self-explanatory as possible, because these options are user-facing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been talking about the user facing flag from the beginning, so I understood you.

@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

Run Flink ValidatesRunner

@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

Run Java Flink PortableValidatesRunner Streaming

@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

Run Java Flink PortableValidatesRunner Batch

@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

Run Nexmark Flink

@rHermes
Copy link
Contributor Author

rHermes commented Nov 3, 2020

Where do I see the result of the Nexmark flink run?

@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

The phrase has changed. Running it now.

@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

Run Flink Runner Nexmark Tests

@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

FYI, the test failure in https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch_PR/189/ is known.

@rHermes
Copy link
Contributor Author

rHermes commented Nov 3, 2020

I'm not sure how this works, but what is left now before this is merged? Will it make it before the release window tomorrow?

@rHermes rHermes changed the title [BEAM-11146] Add copyFaster option to Flink runner [BEAM-11146] Add fasterCopy option to Flink runner Nov 3, 2020
@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

There is nothing blocking this. Merging.

@rHermes
Copy link
Contributor Author

rHermes commented Nov 3, 2020

But those runs don't include when the option is on, so why run them? Did they not pass the first time?

@mxm mxm merged commit 4836937 into apache:master Nov 3, 2020
@mxm
Copy link
Contributor

mxm commented Nov 3, 2020

The changes touch a lot of other code paths which also get executed when the option is disabled. Just making sure (learned this the hard way).

@rHermes rHermes deleted the BEAM-11146-faster-copy branch November 3, 2020 13:14
@je-ik
Copy link
Contributor

je-ik commented Nov 4, 2020

@mxm one more question regarding the flag - could we reuse the existing objectReuse flag? I'm aware that this would be backwards incompatible change, but - when you enable this flag, would it sound reasonable to mutate data that has been output? For me, semantically, enabling objectReuse would be declaring that my pipeline is using immutable objects.

@rHermes
Copy link
Contributor Author

rHermes commented Nov 4, 2020

I think reusing the objectReuse flag is a poor choice. The flink runner might not use it today, but it might do sometime in the future. It is also good to avoid overlap with the Flink options to avoid confusing people.

@je-ik
Copy link
Contributor

je-ik commented Nov 4, 2020

I see it from a different perspective - every option we add makes is exponentially harder for users to setup the runner 'correctly'. Mostly when a configuration option is not strictly orthogonal with all other options.

@dmvk
Copy link
Member

dmvk commented Nov 5, 2020

@rHermes good job ;)

@je-ik -1 for reusing the flag

Anyway, I think it's safe to change default value of objectReuse to true and maybe deprecate it?

Assumption: Beam has full control of all used TypeSerializer instances. The only one user can "tweak" is CoderTypeSerializer, where copy(T, T) is basically no-op.

Quick verification of used serializers on two of our production pipelines...

First:

$ kubectl exec pipelines-runner-identity-8123e61d-tm-d9f85785-nhv78 -- jmap -histo 1 | grep flink | grep Serializer | grep -v '\$'
  36:        563802       13531248  org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
 633:            68           4896  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 945:            74           1184  org.apache.flink.runtime.state.ArrayListSerializer
 977:            44           1056  org.apache.flink.runtime.types.PriorityQueueSerializer
1029:            36            864  [Lorg.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
1053:            26            832  org.apache.flink.streaming.api.operators.TimerSerializer
1057:            34            816  org.apache.flink.api.common.typeutils.base.MapSerializer
1184:            44            704  org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer
1187:            44            704  org.apache.flink.runtime.types.JavaIterableWrapperSerializer
1218:            20            640  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData
1272:            36            576  org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate
1412:            26            416  org.apache.flink.runtime.state.JavaSerializer
1443:            16            384  org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot
1538:            20            320  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot
1594:            12            288  org.apache.flink.streaming.api.operators.TimerSerializerSnapshot
1653:            16            256  org.apache.flink.api.common.typeutils.base.ListSerializer
1820:             8            192  org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot
1822:             6            192  org.apache.flink.core.memory.DataOutputSerializer
2872:             4             64  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
3155:             3             48  org.apache.flink.api.common.typeutils.base.StringSerializer
3162:             2             48  org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer
5768:             1             16  org.apache.flink.runtime.state.VoidNamespaceSerializer

Second:

$ kubectl exec pipelines-runner-default-39c08fc2-tm-6474fb97c5-64s8q -- jmap -histo 1 | grep flink | grep Serializer | grep -v '\$'
 244:         18950         454800  org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
 688:          2031          32496  org.apache.flink.runtime.state.ArrayListSerializer
 791:          1307          20912  org.apache.flink.api.common.typeutils.base.ListSerializer
 884:           682          16368  [Lorg.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 976:           682          10912  org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate
1014:           320          10240  org.apache.flink.core.memory.DataOutputSerializer
1072:           324           7776  org.apache.flink.api.common.typeutils.base.MapSerializer
1120:           212           6784  org.apache.flink.streaming.api.operators.TimerSerializer
1139:           253           6072  org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot
1156:           241           5784  org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot
1159:           360           5760  org.apache.flink.runtime.state.JavaSerializer
1236:           188           4512  org.apache.flink.streaming.api.operators.TimerSerializerSnapshot
1267:           168           4032  [Lorg.apache.flink.api.common.typeutils.TypeSerializer;
1473:            80           1920  org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer
1480:           120           1920  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
2025:            29            464  org.apache.flink.api.common.typeutils.base.StringSerializer
5926:             1             16  org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer
5992:             1             16  org.apache.flink.runtime.state.VoidNamespaceSerializer

WDYT?

@rHermes
Copy link
Contributor Author

rHermes commented Nov 5, 2020

@rHermes good job ;)

Thanks! It's my first change to a big open source project!

Anyway, I think it's safe to change default value of objectReuse to true and maybe deprecate it?

As it stands now, I don't think Beam is in a position to utilize it for its intended purpose. I have no thoughts on deprecating it, as I don't know if the type system is something Beam will change in the future.

Quick verification of used serializers on two of our production pipelines...

First:

$ kubectl exec pipelines-runner-identity-8123e61d-tm-d9f85785-nhv78 -- jmap -histo 1 | grep flink | grep Serializer | grep -v '\$'
  36:        563802       13531248  org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
 633:            68           4896  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 945:            74           1184  org.apache.flink.runtime.state.ArrayListSerializer
 977:            44           1056  org.apache.flink.runtime.types.PriorityQueueSerializer
1029:            36            864  [Lorg.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
1053:            26            832  org.apache.flink.streaming.api.operators.TimerSerializer
1057:            34            816  org.apache.flink.api.common.typeutils.base.MapSerializer
1184:            44            704  org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer
1187:            44            704  org.apache.flink.runtime.types.JavaIterableWrapperSerializer
1218:            20            640  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData
1272:            36            576  org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate
1412:            26            416  org.apache.flink.runtime.state.JavaSerializer
1443:            16            384  org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot
1538:            20            320  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot
1594:            12            288  org.apache.flink.streaming.api.operators.TimerSerializerSnapshot
1653:            16            256  org.apache.flink.api.common.typeutils.base.ListSerializer
1820:             8            192  org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot
1822:             6            192  org.apache.flink.core.memory.DataOutputSerializer
2872:             4             64  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
3155:             3             48  org.apache.flink.api.common.typeutils.base.StringSerializer
3162:             2             48  org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer
5768:             1             16  org.apache.flink.runtime.state.VoidNamespaceSerializer

Second:

$ kubectl exec pipelines-runner-default-39c08fc2-tm-6474fb97c5-64s8q -- jmap -histo 1 | grep flink | grep Serializer | grep -v '\$'
 244:         18950         454800  org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
 688:          2031          32496  org.apache.flink.runtime.state.ArrayListSerializer
 791:          1307          20912  org.apache.flink.api.common.typeutils.base.ListSerializer
 884:           682          16368  [Lorg.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 976:           682          10912  org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate
1014:           320          10240  org.apache.flink.core.memory.DataOutputSerializer
1072:           324           7776  org.apache.flink.api.common.typeutils.base.MapSerializer
1120:           212           6784  org.apache.flink.streaming.api.operators.TimerSerializer
1139:           253           6072  org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot
1156:           241           5784  org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot
1159:           360           5760  org.apache.flink.runtime.state.JavaSerializer
1236:           188           4512  org.apache.flink.streaming.api.operators.TimerSerializerSnapshot
1267:           168           4032  [Lorg.apache.flink.api.common.typeutils.TypeSerializer;
1473:            80           1920  org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer
1480:           120           1920  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
2025:            29            464  org.apache.flink.api.common.typeutils.base.StringSerializer
5926:             1             16  org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer
5992:             1             16  org.apache.flink.runtime.state.VoidNamespaceSerializer

WDYT?

Is this with and without my change enabled? If yes, would you mind sending me an email with the setup and the two pipelines? It would be very useful data for my thesis!

If no, I'm kind of confused 😅

@je-ik
Copy link
Contributor

je-ik commented Nov 5, 2020

If we have reasons to believe, that reusing the objectReuse could result in breaking existing pipelines (which I think is low probability, but is admittedly non-zero), I'd suggest the following:

  • deprecate objectReuse
  • introduce usingImmutableTypes (or similar), which would have the meaning of both objectReuse and fasterCopy
  • remove fasterCopy and use only usingImmutableTypes in place of it
    Using of objectReuse would result in deprecation warning, suggesting to validate pipeline against DirectRunner with immutability enforced and then to enable usingImmutableTypes.

@dmvk
Copy link
Member

dmvk commented Nov 5, 2020

introduce usingImmutableTypes (or similar), which would have the meaning of both objectReuse and fasterCopy

I think you got me wrong, what I'm trying to suggest is that there is basically no chance objectReuse can affect user code. Only think it affects in existing pipelines is runner code, which we have full control of. So it makes sense get rid of this flag completely as there is absolutely no benefit of exposing it to the user. Hiding it behind another knob is even worse than current state as it would be hard to understand what it actually does.

Is this with and without my change enabled? If yes, would you mind sending me an email with the setup and the two pipelines? It would be very useful data for my thesis!

It's without your change. Just to note, this patch should have no effect on distribution of used Serializers as it only affects internal behavior of CoderTypeSerializer.

We can do some basic profiling with & without your patch within next few weeks. I'll let you know once we have results ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants