From 22230bb91e5ec72a889be78755d11e516038cb4b Mon Sep 17 00:00:00 2001 From: "lincoln.lil" Date: Wed, 10 Aug 2022 13:33:24 +0800 Subject: [PATCH] [FLINK-28899][table-planner] Fix LOOKUP hint with retry option on async lookup mode This closes #20531 --- .../exec/common/CommonExecLookupJoin.java | 10 +- .../planner/plan/utils/LookupJoinUtil.java | 42 ++++- .../stream/sql/AsyncLookupJoinITCase.scala | 11 +- .../join/lookup/ResultRetryStrategy.java | 4 +- ...RetryableAsyncLookupFunctionDelegator.java | 102 +++++++++++ ...yableAsyncLookupFunctionDelegatorTest.java | 168 ++++++++++++++++++ 6 files changed, 318 insertions(+), 19 deletions(-) create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableAsyncLookupFunctionDelegator.java create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableAsyncLookupFunctionDelegatorTest.java diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java index d00c888e1fa6d..b68f0309326a4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java @@ -542,12 +542,12 @@ private StreamOperatorFactory createAsyncLookupJoin( isLeftOuterJoin, asyncLookupOptions.asyncBufferCapacity); } - // TODO async retry to be supported, can not directly enable retry on 'AsyncWaitOperator' - // because of two reasons: 1. AsyncLookupJoinRunner has a 'stateful' resultFutureBuffer bind - // to each input record (it's non-reenter-able) 2. can not lookup new value if cache empty - // enabled when chained with the new AsyncCachingLookupFunction. This two issues should be - // resolved first before enable async retry. + // Why not directly enable retry on 'AsyncWaitOperator'? because of two reasons: + // 1. AsyncLookupJoinRunner has a 'stateful' resultFutureBuffer bind to each input record + // (it's non-reenter-able) 2. can not lookup new value if cache empty values enabled when + // chained with the new AsyncCachingLookupFunction. So similar to sync lookup join with + // retry, use a 'RetryableAsyncLookupFunctionDelegator' to support retry. return new AsyncWaitOperatorFactory<>( asyncFunc, asyncLookupOptions.asyncTimeout, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java index 529f79b706e64..ad4769072ac84 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java @@ -35,6 +35,7 @@ import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.LookupFunction; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; @@ -49,6 +50,7 @@ import org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader; import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector; import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy; +import org.apache.flink.table.runtime.operators.join.lookup.RetryableAsyncLookupFunctionDelegator; import org.apache.flink.table.runtime.operators.join.lookup.RetryableLookupFunctionDelegator; import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -196,9 +198,22 @@ public static int[] getOrderedLookupKeys(Collection allLookupKeys) { } /** - * Gets LookupFunction from temporal table according to the given lookup keys with preference. + * Gets lookup function (async or sync) from temporal table according to the given lookup keys + * with considering {@link LookupJoinHintSpec} and required upsertMaterialize. Note: if required + * upsertMaterialize is true, will return synchronous lookup function only, otherwise prefers + * asynchronous lookup function except there's a hint option 'async' = 'false', will raise an + * error if both candidates not found. * - * @return the UserDefinedFunction by preferable lookup mode, if require + *
{@code
+     * 1. if upsertMaterialize == true : require sync lookup or else error
+     *
+     * 2. preferAsync = except there is a hint option 'async' = 'false'
+     *  if (preferAsync) {
+     *    async lookup != null ? async : sync or else error
+     *  } else {
+     *    sync lookup != null ? sync : async or else error
+     *  }
+     * }
*/ public static UserDefinedFunction getLookupFunction( RelOptTable temporalTable, @@ -272,6 +287,22 @@ private static LookupFunction wrapSyncRetryDelegator( return provider.createLookupFunction(); } + /** + * Wraps AsyncLookupFunction into a RetryableAsyncLookupFunctionDelegator to support retry. + * Note: only AsyncLookupFunction is supported. + */ + private static AsyncLookupFunction wrapASyncRetryDelegator( + AsyncLookupFunctionProvider provider, LookupJoinHintSpec joinHintSpec) { + if (joinHintSpec != null) { + ResultRetryStrategy retryStrategy = joinHintSpec.toRetryStrategy(); + if (retryStrategy != NO_RETRY_STRATEGY) { + return new RetryableAsyncLookupFunctionDelegator( + provider.createAsyncLookupFunction(), joinHintSpec.toRetryStrategy()); + } + } + return provider.createAsyncLookupFunction(); + } + private static void findLookupFunctionFromNewSource( TableSourceTable temporalTable, int[] lookupKeyIndicesInOrder, @@ -312,6 +343,7 @@ private static void findLookupFunctionFromNewSource( lookupKeyIndicesInOrder, classLoader, tableSourceRowType); + // retry on fullCachingLookupFunction is meaningless syncLookupFunction = new CachingLookupFunction( fullCache, fullCachingLookupProvider.createLookupFunction()); @@ -327,10 +359,12 @@ private static void findLookupFunctionFromNewSource( asyncLookupFunction = new CachingAsyncLookupFunction( partialCachingLookupProvider.getCache(), - partialCachingLookupProvider.createAsyncLookupFunction()); + wrapASyncRetryDelegator( + partialCachingLookupProvider, joinHintSpec)); } else { asyncLookupFunction = - ((AsyncLookupFunctionProvider) provider).createAsyncLookupFunction(); + wrapASyncRetryDelegator( + (AsyncLookupFunctionProvider) provider, joinHintSpec); } } if (provider instanceof TableFunctionProvider) { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala index cb66ed0aff766..06b6d5daa27ce 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala @@ -475,7 +475,7 @@ class AsyncLookupJoinITCase( val expected = if (legacyTableSource) { // test legacy lookup source do not support lookup threshold - // for real async lookup functions(both new and legacy api) do support retry + // also legacy lookup source do not support retry Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") } else { // the user_table_with_lookup_threshold3 will return null result before 3rd lookup @@ -503,14 +503,7 @@ class AsyncLookupJoinITCase( .addSink(sink) env.execute() - val expected = if (legacyTableSource) { - // test legacy lookup source do not support lookup threshold - // for real async lookup functions(both new and legacy api) do support retry - Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") - } else { - // TODO retry on async is not supported currently, this should be updated after supported - Seq() - } + val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") assertEquals(expected.sorted, sink.getAppendResults.sorted) } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/ResultRetryStrategy.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/ResultRetryStrategy.java index 33e8ab564a786..67358ed869860 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/ResultRetryStrategy.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/ResultRetryStrategy.java @@ -18,6 +18,7 @@ package org.apache.flink.table.runtime.operators.join.lookup; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate; import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; @@ -36,7 +37,8 @@ public class ResultRetryStrategy implements AsyncRetryStrategy { new ResultRetryStrategy(AsyncRetryStrategies.NO_RETRY_STRATEGY); private AsyncRetryStrategy retryStrategy; - private ResultRetryStrategy(AsyncRetryStrategy retryStrategy) { + @VisibleForTesting + public ResultRetryStrategy(AsyncRetryStrategy retryStrategy) { this.retryStrategy = retryStrategy; } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableAsyncLookupFunctionDelegator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableAsyncLookupFunctionDelegator.java new file mode 100644 index 0000000000000..6aee85802ecf2 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableAsyncLookupFunctionDelegator.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.lookup; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.AsyncLookupFunction; +import org.apache.flink.table.functions.FunctionContext; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A delegator holds user's {@link AsyncLookupFunction} to handle retries. */ +public class RetryableAsyncLookupFunctionDelegator extends AsyncLookupFunction { + + private final AsyncLookupFunction userLookupFunction; + + private final ResultRetryStrategy retryStrategy; + + private final boolean retryEnabled; + + private transient Predicate> retryResultPredicate; + + public RetryableAsyncLookupFunctionDelegator( + @Nonnull AsyncLookupFunction userLookupFunction, + @Nonnull ResultRetryStrategy retryStrategy) { + this.userLookupFunction = checkNotNull(userLookupFunction); + this.retryStrategy = checkNotNull(retryStrategy); + this.retryEnabled = retryStrategy.getRetryPredicate().resultPredicate().isPresent(); + } + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + userLookupFunction.open(context); + retryResultPredicate = + retryStrategy.getRetryPredicate().resultPredicate().orElse(ignore -> false); + } + + @Override + public CompletableFuture> asyncLookup(RowData keyRow) { + if (!retryEnabled) { + return userLookupFunction.asyncLookup(keyRow); + } + CompletableFuture> resultFuture = new CompletableFuture<>(); + lookupWithRetry(resultFuture, 1, keyRow); + return resultFuture; + } + + private void lookupWithRetry( + final CompletableFuture> resultFuture, + final int currentAttempts, + final RowData keyRow) { + CompletableFuture> lookupFuture = + userLookupFunction.asyncLookup(keyRow); + + lookupFuture.whenCompleteAsync( + (result, throwable) -> { + if (retryResultPredicate.test(result) + && retryStrategy.canRetry(currentAttempts)) { + long backoff = retryStrategy.getBackoffTimeMillis(currentAttempts); + try { + Thread.sleep(backoff); + } catch (InterruptedException e) { + // Do not raise an error when interrupted, just complete with last + // result intermediately. + resultFuture.complete(result); + return; + } + lookupWithRetry(resultFuture, currentAttempts + 1, keyRow); + } else { + resultFuture.complete(result); + } + }); + } + + @Override + public void close() throws Exception { + userLookupFunction.close(); + super.close(); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableAsyncLookupFunctionDelegatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableAsyncLookupFunctionDelegatorTest.java new file mode 100644 index 0000000000000..249d90db8b90b --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableAsyncLookupFunctionDelegatorTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join; + +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.flink.streaming.util.retryable.RetryPredicates; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.AsyncLookupFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy; +import org.apache.flink.table.runtime.operators.join.lookup.RetryableAsyncLookupFunctionDelegator; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.LogicalType; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.table.data.StringData.fromString; + +/** Harness tests for {@link RetryableAsyncLookupFunctionDelegator}. */ +public class RetryableAsyncLookupFunctionDelegatorTest { + + private final AsyncLookupFunction userLookupFunc = new TestingAsyncLookupFunction(); + + private final ResultRetryStrategy retryStrategy = + ResultRetryStrategy.fixedDelayRetry(3, 10, RetryPredicates.EMPTY_RESULT_PREDICATE); + + private static final Map> data = new HashMap<>(); + + static { + data.put( + GenericRowData.of(1), + Collections.singletonList(GenericRowData.of(1, fromString("Julian")))); + data.put( + GenericRowData.of(3), + Arrays.asList( + GenericRowData.of(3, fromString("Jark")), + GenericRowData.of(3, fromString("Jackson")))); + data.put( + GenericRowData.of(4), + Collections.singletonList(GenericRowData.of(4, fromString("Fabian")))); + } + + private RetryableAsyncLookupFunctionDelegator createDelegator( + ResultRetryStrategy retryStrategy) { + return new RetryableAsyncLookupFunctionDelegator(userLookupFunc, retryStrategy); + } + + private final RowDataHarnessAssertor assertor = + new RowDataHarnessAssertor( + new LogicalType[] { + DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType() + }); + + @Test + public void testLookupWithRetry() throws Exception { + final RetryableAsyncLookupFunctionDelegator delegator = createDelegator(retryStrategy); + delegator.open(new FunctionContext(new MockStreamingRuntimeContext(false, 1, 1))); + for (int i = 1; i <= 5; i++) { + RowData key = GenericRowData.of(i); + assertor.assertOutputEquals( + "output wrong", + Collections.singleton(data.get(key)), + Collections.singleton(delegator.asyncLookup(key))); + } + delegator.close(); + } + + @Test + public void testLookupWithRetryDisabled() throws Exception { + final RetryableAsyncLookupFunctionDelegator delegator = + createDelegator(ResultRetryStrategy.NO_RETRY_STRATEGY); + delegator.open(new FunctionContext(new MockStreamingRuntimeContext(false, 1, 1))); + for (int i = 1; i <= 5; i++) { + RowData key = GenericRowData.of(i); + assertor.assertOutputEquals( + "output wrong", + Collections.singleton(data.get(key)), + Collections.singleton(delegator.asyncLookup(key))); + } + delegator.close(); + } + + @Test + public void testLookupWithCustomRetry() throws Exception { + AsyncRetryStrategy retryStrategy = + new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<>( + 3, 1, 100, 1.1d) + .build(); + final RetryableAsyncLookupFunctionDelegator delegator = + createDelegator(new ResultRetryStrategy(retryStrategy)); + delegator.open(new FunctionContext(new MockStreamingRuntimeContext(false, 1, 1))); + for (int i = 1; i <= 5; i++) { + RowData key = GenericRowData.of(i); + assertor.assertOutputEquals( + "output wrong", + Collections.singleton(data.get(key)), + Collections.singleton(delegator.asyncLookup(key))); + } + delegator.close(); + } + + /** The {@link TestingAsyncLookupFunction} is a {@link AsyncLookupFunction} for testing. */ + private static final class TestingAsyncLookupFunction extends AsyncLookupFunction { + + private static final long serialVersionUID = 1L; + + private final Random random = new Random(); + private transient ExecutorService executor; + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + this.executor = Executors.newFixedThreadPool(2); + } + + @Override + public CompletableFuture> asyncLookup(RowData keyRow) { + return CompletableFuture.supplyAsync( + () -> { + try { + Thread.sleep(random.nextInt(5)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return data.get(keyRow); + }, + executor); + } + + @Override + public void close() throws Exception { + if (null != executor && !executor.isShutdown()) { + executor.shutdown(); + } + super.close(); + } + } +}