Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink-5734] code generation for normalizedkey sorter #3511

Closed
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
6fa431b
[FLINK-5734] prepare project structure
p16i Feb 19, 2017
e302747
[FLINK-5734] implement basic functionalities for code generation
p16i Feb 20, 2017
321acdc
[FLINK-5734] use NormaliKeysorter's appoach to compute numKeyByte
p16i Feb 24, 2017
8675db8
[FLINK-5734] use synchronized block for SorterFactory and TemplateModel
p16i Mar 4, 2017
e1abaad
[FLINK-5734] replace string concat with stringbuilder
p16i Mar 5, 2017
0c765b1
[FLINK-5734] add testcase for variable-length string
p16i Mar 5, 2017
fd67e14
[FLINK-5734] user proper logger and also add comments
p16i Mar 5, 2017
2f0e082
[FLINK-5734] checking endianness should not be involved in generating…
p16i Mar 5, 2017
64d73c9
[FLINK-5734] move byte-operator-mapping to be a constant of class scope
p16i Mar 5, 2017
ade1d60
[FLINK-5734] add enable/disable flag in ExecutionConfig
p16i Mar 5, 2017
f1398ba
[FLINK-5734] rename variable names in old compare/swap functions to m…
p16i Mar 5, 2017
850930f
[FLINK-5734] improve tests
p16i Mar 5, 2017
ccc0ed0
[FLINK-5734] add timestamp to generated sorter
p16i Mar 6, 2017
6c28a63
[FLINK-5734] RESOURCE_PATH exhibits to a Temporary directory
SerkanAli Mar 7, 2017
2e0fc90
[FLINK-5734] integrate code generation to flink code
p16i Mar 7, 2017
5cb9945
[FLINK-5734] prevent generating same sorter at the same time
p16i Mar 8, 2017
576bcb0
[FLINK-5734] fix sorting in desc order failed
p16i Mar 8, 2017
6ad0669
[FLINK-5734] also cache constructor to save cooking time
p16i Mar 8, 2017
dc3bf6e
[FLINK-5734] refactor integration tests
p16i Mar 8, 2017
5037cfb
[FLINK-5734] add benchmarking code
p16i Mar 8, 2017
602543c
[FLINK-5734] get temporary directory for generated code from task con…
SerkanAli Mar 9, 2017
60c776c
[FLINK-5734] remove cache mechanism from TemplateManager
p16i Mar 10, 2017
5ec5197
[FLINK-5734] Inherit generated sorters from NormalizedKeySorter
p16i May 6, 2017
3265aef
[FLINK-5734] Update sorter.ftlh template to match with the newest code
p16i May 6, 2017
a643eb9
[FLINK-5734] Fix coding style
p16i Aug 26, 2017
48f46ad
[FLINK-5734] Instantiate TestEnvironment with proper constructor
p16i Aug 27, 2017
5b0bae0
[FLINK-5734] Fix the switch in DriverBaseITCase
ggevay Aug 28, 2017
41b244f
[FLINK-5734] Narrow down some access modifiers and fix some minor typos
ggevay Aug 28, 2017
af35766
[FLINK-5734] Eliminate some warnings about generics
ggevay Aug 28, 2017
b6d6462
[FLINK-5734] Fix synchronized in getInstance
ggevay Aug 29, 2017
c723ffd
[FLINK-5734] Write generated code to a string instead of a temp file
ggevay Aug 29, 2017
fd0a6f7
[FLINK-5734] Remove testSorterIsGeneratedOnlyOnceForSameComparator
ggevay Aug 29, 2017
530a6ff
[FLINK-5734] Fix codestyle
ggevay Aug 29, 2017
79c4b76
[FLINK-5734] Remove unused methods from TemplateManager.
p16i Aug 29, 2017
1028f5a
[FLINK-5734] We don't need to pass the TaskConfig to SorterFactory
ggevay Aug 30, 2017
5ccb169
[FLINK-5734] Move decision between FixedLengthRecordSorter and Normal…
ggevay Aug 31, 2017
196f42f
[FLINK-5734] Make sizes in CodeGenerationSorterBaseTest to speed up t…
ggevay Aug 31, 2017
7346ae8
[FLINK-5734] Add condition for isNormalizedKeyPrefixOnly to sorter ch…
ggevay Aug 31, 2017
82ef38e
[FLINK-5734] Minor changes
ggevay Aug 31, 2017
77d5e06
[FLINK-5734] Some renaming and commenting around fixedByteChunks
ggevay Aug 31, 2017
2275538
[FLINK-5734] Commenting
ggevay Aug 31, 2017
7341e9e
[FLINK-5734] Call getTemplate only once
ggevay Aug 31, 2017
b41f71f
[FLINK-5734] Remove unused method
ggevay Sep 14, 2017
fec0278
[FLINK-5734] Clean up exceptions
ggevay Sep 14, 2017
33cbc36
[FLINK-5734] Codestyle
ggevay Sep 15, 2017
54b3fcb
[FLINK-5734] Break long lines in SorterFactory
ggevay Sep 20, 2017
b8f1e53
[FLINK-5734] Move TemplateManager's functionality into SorterFactory
ggevay Sep 20, 2017
36419be
[FLINK-5734] Consolidate code generation setting in ExecutionConfig, …
ggevay Sep 20, 2017
84cb9a8
[FLINK-5734] Fix versions of Janino and Freemarker
ggevay Sep 20, 2017
db14ac5
[FLINK-5734] Make protected methods in NormalizedKeySorter final
ggevay Sep 20, 2017
41fd844
[FLINK-5734] Fix coding style errors
p16i Sep 24, 2017
d12c55f
[FLINK-5734] Set complier's classloader as parent classloader of user…
p16i Sep 24, 2017
1c9830f
[FLINK-5734] Update logging description for createCodegenSorter method
p16i Sep 24, 2017
909b59e
[FLINK-5734] Set parent classloader to the user code classloader
ggevay Sep 24, 2017
cce40c5
[FLINK-5734] Prevent the cache from keeping alive generated classes f…
ggevay Sep 24, 2017
5e31cf0
[FLINK-5734] Add comment to the protected final methods in Normalized…
ggevay Sep 24, 2017
b2bf565
[FLINK-5734] Fix condition of choosing between NormalizedKeySorter an…
ggevay Sep 24, 2017
9016cce
[FLINK-5734] Fix cache lookup to call get only once on the WeakReference
ggevay Sep 24, 2017
ff3a35e
[FLINK-5734] Simplify the cache lookup
ggevay Oct 1, 2017
157cd4c
[FLINK-5734] Use CLUSTER_SORTER_CODEGEN TestExecutionMode only in cer…
ggevay Oct 1, 2017
c9705fa
[FLINK-5734] checkstyle
ggevay Oct 1, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/dev/execution_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ With the closure cleaner disabled, it might happen that an anonymous user functi

- `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.

- `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flink's internal serializers fail to handle a POJO properly.
- `enableForceKryo()` / **`disableForceKryo()`**. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flink's internal serializers fail to handle a POJO properly.

- `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by default. Forces the Flink AvroTypeInformation to use the Avro serializer instead of Kryo for serializing Avro POJOs.

- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.

- `enableCodeGenerationForSorters()` / **`disableCodeGenerationForSorters()`**. This setting can increase the performance of batch jobs by instructing Flink to generate specialized code for every sorting situation (i.e., type and comparator).

- **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status updates are printed to `System.out` by default. This setting allows to disable this behavior.

- `getGlobalJobParameters()` / `setGlobalJobParameters()` This method allows users to set custom objects as a global configuration for the job. Since the `ExecutionConfig` is accessible in all user defined functions, this is an easy method for making configuration globally available in a job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut

private long autoWatermarkInterval = 0;

/**
* The flag determines whether a custom NormalizedKeySorter will be dynamically created
* for underlying data
*/
private boolean codeGenerationForSortersEnabled = false;

/**
* Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
*/
Expand Down Expand Up @@ -610,7 +616,33 @@ public ExecutionConfig disableObjectReuse() {
public boolean isObjectReuseEnabled() {
return objectReuse;
}


/**
* Enables code generation for sorters. This can increase the performance of batch jobs, as
* Flink generates specialized sorting code for every type and comparator.
*
* <p>This is disabled by default.
*/
public ExecutionConfig enableCodeGenerationForSorters() {
this.codeGenerationForSortersEnabled = true;
return this;
}

/**
* Disables code generation for sorters. @see #enableCodeGenerationForSorters()
*/
public ExecutionConfig disableCodeGenerationForSorters() {
this.codeGenerationForSortersEnabled = false;
return this;
}

/**
* Returns whether code generation for sorters has been enabled or disabled. @see #enableCodeGenerationForSorters()
*/
public boolean isCodeGenerationForSortersEnabled() {
return codeGenerationForSortersEnabled;
}

/**
* Sets the {@link CodeAnalysisMode} of the program. Specifies to which extent user-defined
* functions are analyzed in order to give the Flink optimizer an insight of UDF internals
Expand Down Expand Up @@ -877,7 +909,8 @@ public boolean equals(Object obj) {
registeredKryoTypes.equals(other.registeredKryoTypes) &&
registeredPojoTypes.equals(other.registeredPojoTypes) &&
taskCancellationIntervalMillis == other.taskCancellationIntervalMillis &&
useSnapshotCompression == other.useSnapshotCompression;
useSnapshotCompression == other.useSnapshotCompression &&
codeGenerationForSortersEnabled == other.codeGenerationForSortersEnabled;

} else {
return false;
Expand Down Expand Up @@ -905,7 +938,9 @@ public int hashCode() {
registeredKryoTypes,
registeredPojoTypes,
taskCancellationIntervalMillis,
useSnapshotCompression);
useSnapshotCompression,
codeGenerationForSortersEnabled
);
}

public boolean canEqual(Object obj) {
Expand All @@ -918,7 +953,6 @@ public ArchivedExecutionConfig archive() {
return new ArchivedExecutionConfig(this);
}


// ------------------------------ Utilities ----------------------------------

public static class SerializableSerializer<T extends Serializer<?> & Serializable> implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public boolean supportsCompareAgainstReference() {
*
* @param first The first record.
* @param second The second record.
* @return An integer defining the oder among the objects in the same way as {@link java.util.Comparator#compare(Object, Object)}.
* @return An integer defining the order among the objects in the same way as {@link java.util.Comparator#compare(Object, Object)}.
*
* @see java.util.Comparator#compare(Object, Object)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1290,4 +1290,13 @@ else if (seg2.address > seg2.addressLimit) {
String.format("offset1=%d, offset2=%d, len=%d, bufferSize=%d, address1=%d, address2=%d",
offset1, offset2, len, tempBuffer.length, this.address, seg2.address));
}

public final byte getByte(int index){
return get(index);
}

public final void putByte(int index, byte b){
put(index,b);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private String getSystemOutput(String[] args) throws Exception {
switch (mode) {
case CLUSTER:
case COLLECTION:
case CLUSTER_WITH_CODEGENERATION_ENABLED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is good to enable this here but just for curiosity, how much does this increase the build time for flink-gelly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, I'm not sure whether this is a good way to get this estimation.

I estimated the build time by running all tests from flink-gelly-examples inside IntelliJ IDEA.

With CLUSTER_WITH_CODEGENERATION_ENABLED : 2m 20s
Without CLUSTER_WITH_CODEGENERATION_ENABLED: 1m 27s

Patch for disabling CLUSTER_WITH_CODEGENERATION_ENABLED case : https://gist.github.com/heytitle/89961fcaabcf326eadee190b9d6085a6

args = ArrayUtils.add(args, "--__disable_object_reuse");
break;

Expand Down
2 changes: 1 addition & 1 deletion flink-libraries/flink-table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ under the License.
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.7</version>
<version>${janino.version}</version>
</dependency>

<dependency>
Expand Down
12 changes: 12 additions & 0 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,18 @@ under the License.
<artifactId>reflections</artifactId>
</dependency>

<dependency>
<groupId>org.freemarker</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use latest version?

<dependency>
    <groupId>org.freemarker</groupId>
    <artifactId>freemarker</artifactId>
    <version>2.3.26-incubating</version>
</dependency>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<artifactId>freemarker</artifactId>
<version>2.3.26-incubating</version>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>${janino.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.codegeneration;

import freemarker.template.Version;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;

import freemarker.template.Configuration;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import freemarker.template.TemplateExceptionHandler;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.janino.SimpleCompiler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.List;

/**
* {@link SorterFactory} is a singleton class that provides functionalities to create the most suitable sorter
* for underlying data based on {@link TypeComparator}.
* Note: the generated code can be inspected by configuring Janino to write the code that is being compiled
* to a file, see http://janino-compiler.github.io/janino/#debugging
*/
public class SorterFactory {
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
private static final Logger LOG = LoggerFactory.getLogger(SorterFactory.class);

/** Fixed length records with a length below this threshold will be in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;

// ------------------------------------------------------------------------
// Singleton Attribute
// ------------------------------------------------------------------------
private static SorterFactory sorterFactory;

// ------------------------------------------------------------------------
// Attributes
// ------------------------------------------------------------------------
private SimpleCompiler classCompiler;
private HashMap<String, Constructor> constructorCache;
private final Template template;

/**
* This is only for testing. If an error occurs, we want to fail the test, instead of falling back
* to a non-generated sorter.
*/
public boolean forceCodeGeneration = false;

/**
* Constructor.
*/
private SorterFactory() {
this.classCompiler = new SimpleCompiler();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should set the classloader of the compiler as the usercode classloader to ensure that the generated classes are cleaned up when the job terminates.
this.classCompiler.setParentClassLoader(cl)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this issue. Actually it's a bit more complicated than just setting the parent classloader, because of the caching of the generated classes. The problem is that the constructorCache would keep the old user code classloader alive. I'm currently thinking of the following solution:

  1. include also the user code classloader in the key of the cache, and
  2. use WeakReference for both the values and keys of the cache.

This will ensure that we don't try to reuse generated classes from a previous job, since the classloader will be different across jobs, so we will have different keys. And the cache won't keep anything alive, since it will only have WeakReferences.
(For 2., I will use the Guava CacheBuilder.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

909b59e
cce40c5

This was a bit tricky, I hope I haven't messed it up. I will think about this a bit more in the next few days, and maybe do some more testing, to see that we are not keeping alive anything from past jobs. Unfortunately, I don't know how this could be tested in an automated way. I think I will test it manually by just submitting hundreds of jobs, and watching in a profiler that object counts are not growing.

this.constructorCache = new HashMap<>();
Configuration templateConf;
templateConf = new Configuration(new Version(2,3,26));
templateConf.setClassForTemplateLoading(SorterFactory.class, "/templates");
templateConf.setDefaultEncoding("UTF-8");
templateConf.setTemplateExceptionHandler(TemplateExceptionHandler.RETHROW_HANDLER);
try {
template = templateConf.getTemplate(SorterTemplateModel.TEMPLATE_NAME);
} catch (IOException e) {
throw new RuntimeException("Couldn't read sorter template.", e);
}
}

/**
* A method to get a singleton instance
* or create one if it hasn't been created yet.
* @return
*/
public static synchronized SorterFactory getInstance() {
if (sorterFactory == null){
sorterFactory = new SorterFactory();
}

return sorterFactory;
}


/**
* Create a sorter for the given type comparator and
* assign serializer, comparator and memory to the sorter.
* @param serializer
* @param comparator
* @param memory
* @return
*/
public <T> InMemorySorter<T> createSorter(ExecutionConfig config, TypeSerializer<T> serializer,
TypeComparator<T> comparator, List<MemorySegment> memory) {
if (config.isCodeGenerationForSortersEnabled()){
try {
return createCodegenSorter(serializer, comparator, memory);
} catch (IOException | TemplateException | ClassNotFoundException | IllegalAccessException |
InstantiationException | NoSuchMethodException | InvocationTargetException | CompileException e) {

String msg = "Serializer: " + serializer +
"[" + serializer + "], comparator: [" + comparator + "], exception: " + e.toString();
if (!forceCodeGeneration) {
LOG.warn("An error occurred while trying to create a code-generated sorter. " +
"Using non-codegen sorter instead. " + msg);
return createNonCodegenSorter(serializer, comparator, memory);
} else {
throw new RuntimeException("An error occurred while trying to create a code-generated sorter. " +
"Failing the job, because forceCodeGeneration is true. " + msg);
}
}
} else {
return createNonCodegenSorter(serializer, comparator, memory);
}
}

private <T> InMemorySorter<T> createCodegenSorter(TypeSerializer<T> serializer, TypeComparator<T> comparator,
List<MemorySegment> memory)
throws IOException, TemplateException, ClassNotFoundException, IllegalAccessException,
InstantiationException, NoSuchMethodException, InvocationTargetException, CompileException {
SorterTemplateModel sorterModel = new SorterTemplateModel(comparator);

Constructor sorterConstructor;

synchronized (this){
if (constructorCache.getOrDefault(sorterModel.getSorterName(), null) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constructorCache.containsKey(sorterModel.getSorterName())?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we use a WeakHashMap, it has to be getOrDefault, see the comment in
cce40c5
The issue with containsKey and then get is that elements might disappear from a WeakHashMap any time, e.g. between these two calls.

sorterConstructor = constructorCache.get(sorterModel.getSorterName());
} else {
String sorterName = sorterModel.getSorterName();

StringWriter generatedCodeWriter = new StringWriter();
template.process(sorterModel.getTemplateVariables(), generatedCodeWriter);

this.classCompiler.cook(generatedCodeWriter.toString());

sorterConstructor = this.classCompiler.getClassLoader().loadClass(sorterName).getConstructor(
TypeSerializer.class, TypeComparator.class, List.class
);

constructorCache.put(sorterName, sorterConstructor);
}
}

@SuppressWarnings("unchecked")
InMemorySorter<T> sorter = (InMemorySorter<T>) sorterConstructor.newInstance(serializer, comparator, memory);

if (LOG.isInfoEnabled()){
LOG.info("Using a custom sorter : " + sorter.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> Using a code-generated sorter: ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

return sorter;
}

private <T> InMemorySorter<T> createNonCodegenSorter(TypeSerializer<T> serializer, TypeComparator<T> comparator,
List<MemorySegment> memory) {
InMemorySorter<T> sorter;
// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
if (comparator.supportsSerializationWithKeyNormalization() &&
serializer.getLength() > 0 && serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING &&
comparator.isNormalizedKeyPrefixOnly(comparator.getNormalizeKeyLen())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the additional condition is fixing a bug?

Shouldn't the condition should be inverted, i.e., !comparator.isNormalizedKeyPrefixOnly(comparator.getNormalizeKeyLen())?
Otherwise we would use a FixedLengthRecordSorter (which only compares normalized keys) if the normalized keys are only a prefix?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the additional condition is fixing a bug?

Yes, but currently the FixedLengthRecordSorter is never used, so that is why this hasn't manifested yet (see https://issues.apache.org/jira/browse/FLINK-4705)

Shouldn't the condition should be inverted

Yes, sorry, I've fixed the condition in b2bf565 .

// Note about the last part of the condition:
// FixedLengthRecordSorter doesn't do an additional check after the bytewise comparison, so
// we cannot choose that if the normalized key doesn't always determine the order.
// (cf. the part of NormalizedKeySorter.compare after the if)
sorter = new FixedLengthRecordSorter<>(serializer, comparator.duplicate(), memory);
} else {
sorter = new NormalizedKeySorter<>(serializer, comparator.duplicate(), memory);
}
return sorter;
}
}
Loading