Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18452][table-runtime-blink] Fix StateMigrationException because RetractableTopNFunction#ComparatorWrapper might be incompatible #14150

Merged
merged 1 commit into from Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -29,7 +29,7 @@ import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, RelExplainUtil, SortUtil}
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector
import org.apache.flink.table.runtime.operators.rank.{AppendOnlyTopNFunction, ConstantRankRange, RankType, RetractableTopNFunction}
import org.apache.flink.table.runtime.operators.rank.{AppendOnlyTopNFunction, ConstantRankRange, RankType, RetractableTopNFunction, ComparableRecordComparator}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
Expand Down Expand Up @@ -133,11 +133,17 @@ class StreamExecLimit(
} else {
val equaliserCodeGen = new EqualiserCodeGenerator(inputRowTypeInfo.toRowFieldTypes)
val generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("LimitValueEqualiser")
val comparator = new ComparableRecordComparator(
sortKeyComparator,
Array(),
Array(),
Array(),
Array())
new RetractableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
sortKeyComparator,
comparator,
sortKeySelector,
rankType,
rankRange,
Expand Down
Expand Up @@ -185,12 +185,17 @@ class StreamExecRank(
case RetractStrategy =>
val equaliserCodeGen = new EqualiserCodeGenerator(inputRowTypeInfo.toRowFieldTypes)
val generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser")

val comparator = new ComparableRecordComparator(
sortKeyComparator,
sortFields.indices.toArray,
sortKeyType.toRowFieldTypes,
sortDirections,
nullsIsLast)
new RetractableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
sortKeyComparator,
comparator,
sortKeySelector,
rankType,
rankRange,
Expand Down
Expand Up @@ -177,11 +177,17 @@ class StreamExecSortLimit(
case RetractStrategy =>
val equaliserCodeGen = new EqualiserCodeGenerator(inputRowTypeInfo.toRowFieldTypes)
val generatedEqualiser = equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser")
val comparator = new ComparableRecordComparator(
sortKeyComparator,
sortFields.indices.toArray,
sortKeyType.toRowFieldTypes,
sortDirections,
nullsIsLast)
new RetractableTopNFunction(
minIdleStateRetentionTime,
maxIdleStateRetentionTime,
inputRowTypeInfo,
sortKeyComparator,
comparator,
sortKeySelector,
rankType,
rankRange,
Expand Down
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.rank;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.types.logical.LogicalType;

import java.util.Arrays;
import java.util.Comparator;

/**
* Because it's impossible to restore a {@link RecordComparator} instance generated by
* {@link GeneratedRecordComparator} from checkpoint snapshot. Hence, we introduce
* {@link ComparableRecordComparator} class to wrap the {@link GeneratedRecordComparator}.
* A {@link ComparableRecordComparator} instance is serializable and can restore the {@link RecordComparator}
* from the serialized {@link ComparableRecordComparator}. Besides, the {@link ComparableRecordComparator#equals(Object)}
* doesn't take {@link GeneratedRecordComparator} into account, because the code is not deterministic
* across different client. Therefore, {@link ComparableRecordComparator#equals(Object)} only compares the
* meta information used for generating code of {@link RecordComparator}.
*
* <p>Note: currently, this class is only used for {@link RetractableTopNFunction}.
*
* @see RetractableTopNFunction
*/
public final class ComparableRecordComparator implements RecordComparator {
private static final long serialVersionUID = 4386377835781068140L;

private transient Comparator<RowData> comparator;
private final GeneratedRecordComparator generatedRecordComparator;

// used for compare equals for instances of RowDataComparator
private final int[] compareKeyPositions;
private final LogicalType[] compareKeyTypes;
private final boolean[] compareOrders;
private final boolean[] nullsIsLast;

public ComparableRecordComparator(
GeneratedRecordComparator generatedRecordComparator,
int[] compareKeyPositions,
LogicalType[] compareKeyTypes,
boolean[] compareOrders,
boolean[] nullsIsLast) {
this.generatedRecordComparator = generatedRecordComparator;
this.compareKeyPositions = compareKeyPositions;
this.compareKeyTypes = compareKeyTypes;
this.compareOrders = compareOrders;
this.nullsIsLast = nullsIsLast;
}

public GeneratedRecordComparator getGeneratedRecordComparator() {
return generatedRecordComparator;
}

@Override
public int compare(RowData o1, RowData o2) {
if (comparator == null) {
comparator = generatedRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
}
return comparator.compare(o1, o2);
}

@Override
public int hashCode() {
int result = Arrays.hashCode(compareKeyPositions);
result = 31 * result + Arrays.hashCode(compareKeyTypes);
result = 31 * result + Arrays.hashCode(compareOrders);
result = 31 * result + Arrays.hashCode(nullsIsLast);
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ComparableRecordComparator that = (ComparableRecordComparator) o;
return Arrays.equals(compareKeyPositions, that.compareKeyPositions) &&
Arrays.equals(compareKeyTypes, that.compareKeyTypes) &&
Arrays.equals(compareOrders, that.compareOrders) &&
Arrays.equals(nullsIsLast, that.nullsIsLast);
}
}
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
Expand All @@ -39,10 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,23 +75,31 @@ public class RetractableTopNFunction extends AbstractTopNFunction {
private GeneratedRecordEqualiser generatedEqualiser;
private RecordEqualiser equaliser;

private Comparator<RowData> serializableComparator;
private final ComparableRecordComparator serializableComparator;

public RetractableTopNFunction(
long minRetentionTime,
long maxRetentionTime,
InternalTypeInfo<RowData> inputRowType,
GeneratedRecordComparator generatedRecordComparator,
ComparableRecordComparator comparableRecordComparator,
RowDataKeySelector sortKeySelector,
RankType rankType,
RankRange rankRange,
GeneratedRecordEqualiser generatedEqualiser,
boolean generateUpdateBefore,
boolean outputRankNumber) {
super(minRetentionTime, maxRetentionTime, inputRowType, generatedRecordComparator, sortKeySelector, rankType,
rankRange, generateUpdateBefore, outputRankNumber);
super(
minRetentionTime,
maxRetentionTime,
inputRowType,
comparableRecordComparator.getGeneratedRecordComparator(),
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
this.sortKeyType = sortKeySelector.getProducedType();
this.serializableComparator = new ComparatorWrapper(generatedRecordComparator);
this.serializableComparator = comparableRecordComparator;
this.generatedEqualiser = generatedEqualiser;
}

Expand Down Expand Up @@ -416,43 +420,4 @@ private void retractRecordWithoutRowNumber(
}
}
}

/**
* Note: Because it's impossible to restore a RecordComparator instance generated by GeneratedRecordComparator from
* snapshot, We introduce ComparatorWrapper class to wrap the GeneratedRecordComparator, a ComparatorWrapper
* instance is serializable, and a RecordComparator instance could be restored based on the deserialized
* ComparatorWrapper instance.
*/
private static class ComparatorWrapper implements Comparator<RowData>, Serializable {

private static final long serialVersionUID = 4386377835781068140L;

private transient Comparator<RowData> comparator;
private GeneratedRecordComparator generatedRecordComparator;

private ComparatorWrapper(GeneratedRecordComparator generatedRecordComparator) {
this.generatedRecordComparator = generatedRecordComparator;
}

@Override
public int compare(RowData o1, RowData o2) {
if (comparator == null) {
comparator = generatedRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
}
return comparator.compare(o1, o2);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ComparatorWrapper) {
ComparatorWrapper o = (ComparatorWrapper) obj;
GeneratedRecordComparator oGeneratedComparator = o.generatedRecordComparator;
return generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName()) &&
generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) &&
Arrays.equals(generatedRecordComparator.getReferences(), oGeneratedComparator.getReferences());
} else {
return false;
}
}
}
}
Expand Up @@ -38,7 +38,7 @@ public class AppendOnlyTopNFunctionTest extends TopNFunctionTestBase {
protected AbstractTopNFunction createFunction(RankType rankType, RankRange rankRange,
boolean generateUpdateBefore, boolean outputRankNumber) {
return new AppendOnlyTopNFunction(minTime.toMilliseconds(), maxTime.toMilliseconds(), inputRowType,
sortKeyComparator, sortKeySelector, rankType, rankRange, generateUpdateBefore, outputRankNumber,
generatedSortKeyComparator, sortKeySelector, rankType, rankRange, generateUpdateBefore, outputRankNumber,
cacheSize);
}

Expand Down
Expand Up @@ -44,7 +44,7 @@ protected AbstractTopNFunction createFunction(RankType rankType, RankRange rankR
minTime.toMilliseconds(),
maxTime.toMilliseconds(),
inputRowType,
sortKeyComparator,
comparableRecordComparator,
sortKeySelector,
rankType,
rankRange,
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.runtime.util.RowDataRecordEqualiser;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;

import org.junit.Test;
Expand All @@ -62,17 +63,24 @@ abstract class TopNFunctionTestBase {
new BigIntType(),
new IntType());

static GeneratedRecordComparator sortKeyComparator = new GeneratedRecordComparator("", "", new Object[0]) {
static GeneratedRecordComparator generatedSortKeyComparator = new GeneratedRecordComparator("", "", new Object[0]) {

private static final long serialVersionUID = 1434685115916728955L;

@Override
public RecordComparator newInstance(ClassLoader classLoader) {

return IntRecordComparator.INSTANCE;
}
};

static ComparableRecordComparator comparableRecordComparator = new ComparableRecordComparator(
generatedSortKeyComparator,
new int[]{0},
new LogicalType[]{new IntType()},
new boolean[]{true},
new boolean[]{true}
);

private int sortKeyIdx = 2;

BinaryRowDataKeySelector sortKeySelector = new BinaryRowDataKeySelector(
Expand Down
Expand Up @@ -47,7 +47,7 @@ protected AbstractTopNFunction createFunction(
maxTime.toMilliseconds(),
inputRowType,
rowKeySelector,
sortKeyComparator,
generatedSortKeyComparator,
sortKeySelector,
rankType,
rankRange,
Expand Down