Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink-5734] code generation for normalizedkey sorter #3511

Conversation

@heytitle
Copy link
Contributor

heytitle commented Mar 10, 2017

This pull-request is the implementation of applying code generation to NormalizedKeySorter. It is built on top of FLINK-3722 and based on FLIP: Code Generation for NormalizedKeySorter.

image
Result from SortPerformance.java

@heytitle heytitle changed the title Flink 5734 code generation for normalizedkey sorter [Flink-5734] code generation for normalizedkey sorter Mar 10, 2017
@greghogan

This comment has been minimized.

Copy link
Contributor

greghogan commented Mar 12, 2017

Has the FLIP been posted and officially discussed on the mailing list?

@heytitle

This comment has been minimized.

Copy link
Contributor Author

heytitle commented Mar 12, 2017

I haven't posted it yet. I will do it next week.

@greghogan

This comment has been minimized.

Copy link
Contributor

greghogan commented Apr 26, 2017

@heytitle please remove the old FLINK-3722 commits and rebase to master.

@heytitle

This comment has been minimized.

Copy link
Contributor Author

heytitle commented Apr 26, 2017

@greghogan May I ask you how to remove FLINK-3722 commits?. Only way I can think of is git rebase -i, but this will rewrite history of this PR.

@greghogan

This comment has been minimized.

Copy link
Contributor

greghogan commented Apr 26, 2017

@heytitle, you can rebase -i dd20^ and then delete the first two lines which removes those commits from the history.

An initial thought from skimming the code: should we create an abstract NormalizedKeySorterBase with common code from the generated and non-generated implementations? This way the lines of code in the template would be minimized and we wouldn't need to synchronize changes. I don't see a reason why this would decrease performance.

@heytitle heytitle force-pushed the heytitle:FLINK-5734-code-generation-for-normalizedkey-sorter branch from 988b207 to e1f9822 Apr 26, 2017
@heytitle

This comment has been minimized.

Copy link
Contributor Author

heytitle commented Apr 26, 2017

I also think about the abstract class but I'm not sure how to do it properly.

@greghogan

This comment has been minimized.

Copy link
Contributor

greghogan commented May 2, 2017

@heytitle, the code generation is only for a few methods, right? So the other methods in the sorter template could be moved into a NormalizedKeySorterBase which would be subclasses by the template. Then NormalizedKeySorter could also subclass NormalizedKeySorterBase. This would reduce the duplicated code and also minimize the amount of code 'hidden' from IDEs in the template.

@heytitle

This comment has been minimized.

Copy link
Contributor Author

heytitle commented May 3, 2017

Hi @greghogan,

Thanks for the explanation. I like the idea. I also think we might not need NormalizedKeySorterBase, we can just extend generated sorters from NormalizedKeySorter and override those methods .

What do you think?

@greghogan

This comment has been minimized.

Copy link
Contributor

greghogan commented May 3, 2017

@heytitle yes, let's try that first.


private static final long POINTER_MASK = LARGE_RECORD_TAG - 1;
public static final long POINTER_MASK = LARGE_RECORD_TAG - 1;

This comment has been minimized.

Copy link
@heytitle

heytitle May 6, 2017

Author Contributor

The reason public is used here because Janino first check accessibility of these variables and it seems not able to access them when protected is used and it throws the error below.

org.codehaus.commons.compiler.CompileException: Field "LARGE_RECORD_THRESHOLD" is not accessible

	at org.codehaus.janino.ReflectionIClass$ReflectionIField.getConstantValue(ReflectionIClass.java:340)
	at org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:4433)
	at org.codehaus.janino.UnitCompiler.access$10000(UnitCompiler.java:182)
	at org.codehaus.janino.UnitCompiler$11.visitFieldAccess(UnitCompiler.java:4407)
	at org.codehaus.janino.Java$FieldAccess.accept(Java.java:3229)

This comment has been minimized.

Copy link
@pnowojski

pnowojski Sep 21, 2017

Contributor

Maybe put that into the comment inside the code?

@greghogan

This comment has been minimized.

Copy link
Contributor

greghogan commented Jul 10, 2017

@heytitle apologies for the long delay. I've been working to improve the Gelly examples to process with each of the standard data types (from byte to string). I think we can validate both the correctness and performance of this PR in time for the 1.4 release.

Are you able to continue working on this feature? There are several steps to proceed with:

  • rebase to master
  • fix checkstyle warnings against the new "strict checkstyle"
  • implement code generation for FixedLengthRecordSorter

Rather than extending NormalizedKeySorter, we could pull the compare and swap functions out of MemorySegment into an interface with a default implementation moved from NormalizedKeySorter and FixedLengthRecordSorter. There is overlap in that the normalizedKeyFullyDetermines case of NormalizedKeySorter results in the same operations as in FixedLengthRecordSorter.

@heytitle

This comment has been minimized.

Copy link
Contributor Author

heytitle commented Jul 10, 2017

Hi @greghogan,

Thank very much for the feedback.

Are you able to continue working on this feature?

Yes, I would like to complete the feature and will take a look into the issues you mentioned in next couple of weeks.

@heytitle

This comment has been minimized.

Is it possible that the class will be disappear after the first get() from if's condition?

@ggevay

This comment has been minimized.

Copy link
Contributor

ggevay commented Sep 24, 2017

Oh, you are right @heytitle, thanks! Fixed in 9016cce
These WeakReferences are scary :)

@ggevay

This comment has been minimized.

Copy link
Contributor

ggevay commented Sep 24, 2017

Thanks @fhueske and @heytitle!

Most of the comments have been addressed. The two outstanding issues are

  1. Addressing #3511 (comment)
  2. Making sure that #3511 (comment) indeed fixes the leak and doesn't introduce any new issues.

I think I can do both next week. Or, @heytitle , if you have time you could also work on them. (Please drop me an email if you start working on 1., to avoid both of us doing it. For 2., the more eyes on it, the better.)

@heytitle

This comment has been minimized.

Can we simplify it to the code below?

Class generatedClass = null
WeakReference<Class> fromCache = generatedClassCache.getOrDefault(cacheKey, null);

generatedClass = fromCache != null ? fromCache.get() : null;

if ( genenetedClass == null ) {
// cache miss
...
}

So, we don't need to introduce cacheHit variable.

@KurtYoung

This comment has been minimized.

Copy link
Contributor

KurtYoung commented Sep 29, 2017

@heytitle @ggevay Great work!
We are working on a project to fully code generate the algorithm Flink runtime used, and borrowed lots of idea of this PR, thanks! IMHO, in addition to these changes, there are still some potential improvements we can do about the sorter, like deserialization when comparing the real records. To achieve this, we need more type information control and flexible code generating supports, so we choose to do it through the Table API & SQL. How do you see this approach?

@ggevay

This comment has been minimized.

Copy link
Contributor

ggevay commented Sep 29, 2017

IMHO, in addition to these changes, there are still some potential improvements we can do about the sorter, like deserialization when comparing the real records.

Do you mean NormalizedKeySorter.compareRecords? This calls comparator.compareSerialized, which is currently implemented for most types by simply deserializing the records and then comparing the deserialized form. It would maybe bring some performance improvement if this had a real implementation, which would deserialize only the key fields (which are relevant to the comparison). The hard part here is how to handle nullable fields, which make the offset of individual fields unpredictable.

So I think a simpler and better approach is to just make sure that most types have a good implementation of putNormalizedKey, and then NormalizedKeySorter.compareRecords would be called only rarely, so its performance wouldn't really matter.

We are working on a project to fully code generate the algorithm Flink runtime used, and borrowed lots of idea of this PR, thanks!

I'm happy to hear that!

Btw. I think a large potential in code-generation is to eliminate the overheads of the very many virtual function calls throughout the runtime, by making the calls monomorphic [1,2]. For example, there could be a custom MapDriver generated for each map UDF, and then this custom class would always call the same UDF, making the call monomorphic, and thus easily optimizable by the JVM [1,2]. (I think @greghogan was also thinking about this.)

For example, the following virtual calls are on the per-record code path:

  • drivers (such as MapDriver) calling MutableObjectIterator.next
  • drivers calling the UDF
  • drivers calling output.collect
  • ReusingDeserializationDelegate or NonReusingDeserializationDelegate calling serializer.deserialize
  • SpillingResettableMutableObjectIterator calling serializer.serialize and serializer.deserialize
  • PojoSerializer, TupleSerializer, RowSerializer, etc. calling their field serializers
  • CountingCollector calling collector.collect
  • RecordWriter calling channelSelector.selectChannels
  • SerializationDelegate calling serializer.serialize
  • StreamElementSerializer calling methods on its typeSerializer
  • OutputEmitter calling comparator.hash
  • ComparableKeySelector calling comparator.extractKeys
  • assignToKeyGroup calling key.hashCode

I think most of the above calls are megamorphic (especially in larger Flink jobs with many operators), which makes them slow [1,2]. They could be made monomorphic by code-generating custom versions of these classes, where the customization would be to fix the type of the targets of these calls. (I think this could probably be done independently of the Table API.)

Another potential for code-generation is customizing OutputEmitter for the number of channels: there are places where a modulo operation is performed, which is much faster if the divisor is a compile-time constant, since then the compiler uses such tricks as described in this paper:
https://gmplib.org/~tege/divcnst-pldi94.pdf
And the same optimization could be made in KeyGroupRangeAssignment, where there are divisions by maxParallelism.

we need more type information control and flexible code generating supports, so we choose to do it through the Table API & SQL. How do you see this approach?

Having more info about the types, UDFs, etc. of your program certainly can help. Unfortunately I don't know too much about the Table API & SQL, but I have a few random thoughts:

  • Since you are generating the UDFs, it should be possible to make them have good object reuse behavior, without the users having to worry about the non-trivial object reuse rules.
  • For basic types (int, boolean, etc.) the generated code could use IntValue, BooleanValue, etc., facilitating more object reuse, again without making the users write ugly boilerplate (as they have to do in the DataSet/DataStream API when they want to use these classes).
  • Same thing with StringValue

Btw. have you seen this PR for code generation for POJO serializers and comparators? #2211
It has some issues, so it is not as close to merging as this PR, but maybe I'll try to tie that up as well some time.

[1] http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
[2] https://shipilev.net/blog/2015/black-magic-method-dispatch/

@KurtYoung

This comment has been minimized.

Copy link
Contributor

KurtYoung commented Sep 30, 2017

Hi @ggevay , thanks for the detailed response.

So I think a simpler and better approach is to just make sure that most types have a good implementation of putNormalizedKey, and then NormalizedKeySorter.compareRecords would be called only rarely, so its performance wouldn't really matter.

You are right when the sort keys are simple numeric types, but not with strings, which maybe the most popular choice in some ETL and data warehouse pipelines. But i agree that code generation can't help with this situation, so we investigate some binary data formats to represent our record and modify the interface of TypeSerializer & TypeComparator when doing ser/de. We don't have to consume the input/output view byte by byte, but has the ability to random access the underlying data, aka MemorySegment. It acts like spark's UnsafeRow: https://reviewable.io/reviews/apache/spark/5725, so we can eliminate the most deserialization cost such as read byte[] and then new String(byte[]). We combine this approach with some code generation to eliminate the virtual function call of the TypeComparator and see a 10x performance improvements with sorting on strings.

I think a large potential in code-generation is to eliminate the overheads of the very many virtual function calls throughout the runtime

Totally agreed, after we finish dealing with the code generation and improving the ser/de, we will investigate more about this. Good to see that you have a list of all the megamorphic calls. BTW, we are actually translating the batch jobs into the streaming runtime, i think there will be lots in common.

Having and control more type informations, and code generation the whole operator have lots of benefits, it can also help to making most of the calls monomorphic, such as:

  • fully control of the object reusing, yes
  • comparators
  • generating hash codes
  • potential improvements of some algorithm which finds out they only need to deal with fixed length data
  • Directly using primitive variables when dealing with simple type

And you are right this is orthogonal with runtime improvements, and we see the boundary is the Operator. The framework should provide the most efficient environment for operators to run, and we will code generating the most efficient operators to live in it.

Btw. have you seen this PR for code generation for POJO serializers and comparators? #2211

I didn't see it yet, will find some time to check it out.

@ggevay

This comment has been minimized.

Copy link
Contributor

ggevay commented Oct 1, 2017

Can we simplify it to the code below?
So, we don't need to introduce cacheHit variable.

Thanks @heytitle , done in ff3a35e .

@ggevay

This comment has been minimized.

Copy link
Contributor

ggevay commented Oct 1, 2017

The WeakHashMap with the WeakReferences seem to be working correctly. I've checked that the generated classes are reused from the cache inside a single job, but are not reused across jobs.

@fhueske , I think all the comments have been addressed.

@heytitle

This comment has been minimized.

Copy link
Contributor

heytitle commented on ff3a35e Oct 1, 2017

👍

@twalthr

This comment has been minimized.

Copy link
Contributor

twalthr commented Jan 30, 2019

This PR has not found support by the community for quite some time. I'm not sure if it is still mergeable. @KurtYoung you mentioned that Blink's implementation was inspired by this contribution, right? So I guess we can close this PR now. What do you guys think?

@ggevay

This comment has been minimized.

Copy link
Contributor

ggevay commented Jan 30, 2019

Hi @twalthr ,

This PR has a similar status as the serializer codegen PR, explained here:
#2211 (comment)

In the last paragraph of that comment I mentioned that there will be an MSc student working on an alternative approach, which would subsume both PRs. In the meantime, @mukrram-bajwa wrote a nice MSc thesis on this alternative approach, but unfortunately it's far from a PR-ready state, and I'm not sure at the moment whether we will push it further, or how feasible is the approach for pushing it to production-readiness.

Btw. I don't know the details about the Blink improvements. Maybe that subsumes both of these PRs and even the alternative approach that was pursued in the MSc thesis, but I don't know. Or it might win solely on the basis of being closer to production readiness.

I suggest to just close both of these PRs for now, and then maybe later get back to these performance issues with a fresh mind.

@twalthr

This comment has been minimized.

Copy link
Contributor

twalthr commented Jan 30, 2019

Hi @ggevay, thanks for letting us know the current status of both PRs. Currently, we are trying to perform a cleanup of stale PRs and issue before the big Blink reviewing/merging starts. It would be great if there is a solution that even beats Blink's performance. But as you said "with a fresh mind". Feel free to close both PRs for now.

@ggevay

This comment has been minimized.

Copy link
Contributor

ggevay commented Jan 30, 2019

OK, but could you please close them @twalthr ? I don't have a close button; I guess I don't have permission because I'm not a commiter.

@twalthr

This comment has been minimized.

Copy link
Contributor

twalthr commented Jan 30, 2019

Yes, sure. Sorry, I thought you were opening the PR. Will close it for now...

@twalthr twalthr closed this Jan 30, 2019
@KurtYoung

This comment has been minimized.

Copy link
Contributor

KurtYoung commented Feb 15, 2019

@twalthr Sorry for the late response, but yes we borrowed lots of idea from this PR in blink's sorter code generation. And i want to thank @heytitle for these cool ideas.

@heytitle

This comment has been minimized.

Copy link
Contributor Author

heytitle commented Feb 15, 2019

Actually, the original ideas were from @ggevay.

@KurtYoung

This comment has been minimized.

Copy link
Contributor

KurtYoung commented Feb 16, 2019

Big thanks to the original author @ggevay , hope you don't mind for us borrowing your ideas.

@ggevay

This comment has been minimized.

Copy link
Contributor

ggevay commented Feb 16, 2019

Of course! I'm actually happy that they are finally making it into Flink in some form. So thanks for borrowing them :)

@KurtYoung

This comment has been minimized.

Copy link
Contributor

KurtYoung commented Mar 12, 2019

As you may want to know, we have opened a PR to introduce code generated sorter, check out #7958 if you are interested. @ggevay @heytitle

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants
You can’t perform that action at this time.