Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-23531][table]Allow skip all change log for row-time deduplicate mini-batch #16630

Merged
merged 4 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchLatestChangeDeduplicateFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -89,6 +91,17 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
+ "but there will be additional overhead."
+ "Default is true.");

@Experimental
public static final ConfigOption<Boolean> TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES =
ConfigOptions.key("table.exec.deduplicate.mini-batch.compact-changes")
.booleanType()
.defaultValue(false)
.withDescription(
"Set whether to compact the changes sent downstream in row-time mini-batch. "
+ "If true, Flink will compact changes, only send the latest change to downstream. "
+ "Notes: If the downstream needs the details of versioned data, this optimization cannot be opened. "
+ "If false, Flink will send all changes to downstream just like when the mini-batch is not on.");

@JsonProperty(FIELD_NAME_UNIQUE_KEYS)
private final int[] uniqueKeys;

Expand Down Expand Up @@ -223,6 +236,12 @@ protected boolean isMiniBatchEnabled() {
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
}

protected boolean isCompactChanges() {
return tableConfig
.getConfiguration()
.getBoolean(TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES);
}

protected long getMinRetentionTime() {
return tableConfig.getMinIdleStateRetentionTime();
}
Expand Down Expand Up @@ -275,15 +294,28 @@ OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
checkArgument(rowtimeIndex >= 0);
if (isMiniBatchEnabled()) {
CountBundleTrigger<RowData> trigger = new CountBundleTrigger<>(getMiniBatchSize());
RowTimeMiniBatchDeduplicateFunction processFunction =
new RowTimeMiniBatchDeduplicateFunction(
rowTypeInfo,
typeSerializer,
getMinRetentionTime(),
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
MapBundleFunction processFunction;
if (isCompactChanges()) {
processFunction =
new RowTimeMiniBatchLatestChangeDeduplicateFunction(
rowTypeInfo,
typeSerializer,
getMinRetentionTime(),
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
} else {
processFunction =
new RowTimeMiniBatchDeduplicateFunction(
rowTypeInfo,
typeSerializer,
getMinRetentionTime(),
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
}
return new KeyedMapBundleOperator<>(processFunction, trigger);
} else {
RowTimeDeduplicateFunction processFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExt
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate
import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOn}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.table.utils.LegacyRowResource
import org.apache.flink.types.Row

import org.junit.Assert._
import org.junit.{Rule, Test}
import org.junit.{Assume, Rule, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized

Expand Down Expand Up @@ -189,6 +189,40 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
assertEquals(expected.sorted, rawResult.sorted)
}

@Test
def testFirstRowWithoutAllChangelogOnRowtime(): Unit = {
Assume.assumeTrue("Without all change log only for minibatch.", miniBatch == MiniBatchOn)
tEnv.getConfig.getConfiguration.setBoolean(
StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES, true)
val t = env.fromCollection(rowtimeTestData)
.assignTimestampsAndWatermarks(new RowtimeExtractor)
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
tEnv.registerTable("T", t)
createSinkTable("rowtime_sink")

val sql =
"""
|INSERT INTO rowtime_sink
| SELECT a, b, c, rowtime
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime) as rowNum
| FROM T
| )
| WHERE rowNum = 1
""".stripMargin

tEnv.executeSql(sql).await()
val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")

val expected = List(
"+I(1,1,Hi,1970-01-01T00:00:00.001)",
"+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
"+I(3,4,Comment#2,1970-01-01T00:00:00.004)",
"+I(4,4,Comment#3,1970-01-01T00:00:00.004)")
assertEquals(expected.sorted, rawResult.sorted)
}

@Test
def testFirstRowOnRowTimeFollowedByUnboundedAgg(): Unit = {
val t = env.fromCollection(rowtimeTestData)
Expand Down Expand Up @@ -262,6 +296,42 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
assertEquals(expected.sorted, rawResult.sorted)
}

@Test
def testLastRowWithoutAllChangelogOnRowtime(): Unit = {
Assume.assumeTrue("Without all change log only for minibatch.", miniBatch == MiniBatchOn)
tEnv.getConfig.getConfiguration.setBoolean(
StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES, true)
val t = env.fromCollection(rowtimeTestData)
.assignTimestampsAndWatermarks(new RowtimeExtractor)
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
tEnv.registerTable("T", t)
createSinkTable("rowtime_sink")

val sql =
"""
|INSERT INTO rowtime_sink
| SELECT a, b, c, rowtime
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
| FROM T
| )
| WHERE rowNum = 1
""".stripMargin

tEnv.executeSql(sql).await()
val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")

val expected = List(
"+I(1,1,Hi,1970-01-01T00:00:00.001)",
"+I(1,2,Hello world,1970-01-01T00:00:00.002)",
"+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
"+I(2,6,Comment#1,1970-01-01T00:00:00.006)",
"+I(3,5,Comment#2,1970-01-01T00:00:00.005)",
"+I(4,4,Comment#3,1970-01-01T00:00:00.004)")
assertEquals(expected.sorted, rawResult.sorted)
}

@Test
def testLastRowOnRowTimeFollowedByUnboundedAgg(): Unit = {
val t = env.fromCollection(rowtimeTestData)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.deduplicate;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

import java.util.Map;

import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly;
import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult;

/**
* This function is used to get the first or last row for every key partition in miniBatch mode. But
* only send latest change log to downstream.
*/
public class RowTimeMiniBatchLatestChangeDeduplicateFunction
extends MiniBatchDeduplicateFunctionBase<RowData, RowData, RowData, RowData, RowData> {

private static final long serialVersionUID = 1L;

private final TypeSerializer<RowData> serializer;
private final boolean generateUpdateBefore;
private final boolean generateInsert;
private final int rowtimeIndex;
private final boolean keepLastRow;

public RowTimeMiniBatchLatestChangeDeduplicateFunction(
InternalTypeInfo<RowData> typeInfo,
TypeSerializer<RowData> serializer,
long minRetentionTime,
int rowtimeIndex,
boolean generateUpdateBefore,
boolean generateInsert,
boolean keepLastRow) {
super(typeInfo, minRetentionTime);
this.serializer = serializer;
this.generateUpdateBefore = generateUpdateBefore;
this.generateInsert = generateInsert;
this.rowtimeIndex = rowtimeIndex;
this.keepLastRow = keepLastRow;
}

@Override
public RowData addInput(@Nullable RowData value, RowData input) throws Exception {
if (isDuplicate(value, input, rowtimeIndex, keepLastRow)) {
return serializer.copy(input);
}
return value;
}

@Override
public void finishBundle(Map<RowData, RowData> buffer, Collector<RowData> out)
throws Exception {
for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {
RowData currentKey = entry.getKey();
RowData bufferedRow = entry.getValue();
ctx.setCurrentKey(currentKey);
RowData preRow = state.value();
checkInsertOnly(bufferedRow);
if (isDuplicate(preRow, bufferedRow, rowtimeIndex, keepLastRow)) {
updateDeduplicateResult(
generateUpdateBefore, generateInsert, preRow, bufferedRow, out);
state.update(bufferedRow);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,13 @@

package org.apache.flink.table.runtime.operators.deduplicate;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;

import org.junit.Test;
Expand All @@ -55,24 +44,7 @@
* RowTimeMiniBatchDeduplicateFunction}.
*/
@RunWith(Parameterized.class)
public class RowTimeDeduplicateFunctionTest {

private final long miniBatchSize = 4L;
private Time minTtlTime = Time.milliseconds(10);
private InternalTypeInfo inputRowType =
InternalTypeInfo.ofFields(
new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType());
private TypeSerializer<RowData> serializer = inputRowType.toSerializer();
private int rowTimeIndex = 2;
private int rowKeyIndex = 0;
private RowDataKeySelector rowKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {rowKeyIndex}, inputRowType.toRowFieldTypes());
private RowDataHarnessAssertor assertor =
new RowDataHarnessAssertor(
inputRowType.toRowFieldTypes(),
new GenericRowRecordSortComparator(
rowKeyIndex, inputRowType.toRowFieldTypes()[rowKeyIndex]));
public class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase {

private final boolean miniBatchEnable;

Expand Down Expand Up @@ -328,18 +300,6 @@ private void testRowTimeDeduplicateKeepLastRow(
testHarness.close();
}

private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
return new KeyedOneInputStreamOperatorTestHarness<>(
operator, rowKeySelector, rowKeySelector.getProducedType());
}

private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
KeyedMapBundleOperator<RowData, RowData, RowData, RowData> operator) throws Exception {
return new KeyedOneInputStreamOperatorTestHarness<>(
operator, rowKeySelector, rowKeySelector.getProducedType());
}

@Parameterized.Parameters(name = "miniBatchEnable = {0}")
public static Collection<Boolean[]> runMode() {
return Arrays.asList(new Boolean[] {false}, new Boolean[] {true});
Expand Down