Skip to content

Commit

Permalink
[FLINK-28899][table-planner] Fix LOOKUP hint with retry option on asy…
Browse files Browse the repository at this point in the history
…nc lookup mode

This closes apache#20531
  • Loading branch information
lincoln-lil authored and huangxiaofeng10047 committed Nov 3, 2022
1 parent cdbf5c0 commit 22230bb
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,12 @@ private StreamOperatorFactory<RowData> createAsyncLookupJoin(
isLeftOuterJoin,
asyncLookupOptions.asyncBufferCapacity);
}
// TODO async retry to be supported, can not directly enable retry on 'AsyncWaitOperator'
// because of two reasons: 1. AsyncLookupJoinRunner has a 'stateful' resultFutureBuffer bind
// to each input record (it's non-reenter-able) 2. can not lookup new value if cache empty
// enabled when chained with the new AsyncCachingLookupFunction. This two issues should be
// resolved first before enable async retry.

// Why not directly enable retry on 'AsyncWaitOperator'? because of two reasons:
// 1. AsyncLookupJoinRunner has a 'stateful' resultFutureBuffer bind to each input record
// (it's non-reenter-able) 2. can not lookup new value if cache empty values enabled when
// chained with the new AsyncCachingLookupFunction. So similar to sync lookup join with
// retry, use a 'RetryableAsyncLookupFunctionDelegator' to support retry.
return new AsyncWaitOperatorFactory<>(
asyncFunc,
asyncLookupOptions.asyncTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
Expand All @@ -49,6 +50,7 @@
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader;
import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
import org.apache.flink.table.runtime.operators.join.lookup.RetryableAsyncLookupFunctionDelegator;
import org.apache.flink.table.runtime.operators.join.lookup.RetryableLookupFunctionDelegator;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand Down Expand Up @@ -196,9 +198,22 @@ public static int[] getOrderedLookupKeys(Collection<Integer> allLookupKeys) {
}

/**
* Gets LookupFunction from temporal table according to the given lookup keys with preference.
* Gets lookup function (async or sync) from temporal table according to the given lookup keys
* with considering {@link LookupJoinHintSpec} and required upsertMaterialize. Note: if required
* upsertMaterialize is true, will return synchronous lookup function only, otherwise prefers
* asynchronous lookup function except there's a hint option 'async' = 'false', will raise an
* error if both candidates not found.
*
* @return the UserDefinedFunction by preferable lookup mode, if require
* <pre>{@code
* 1. if upsertMaterialize == true : require sync lookup or else error
*
* 2. preferAsync = except there is a hint option 'async' = 'false'
* if (preferAsync) {
* async lookup != null ? async : sync or else error
* } else {
* sync lookup != null ? sync : async or else error
* }
* }</pre>
*/
public static UserDefinedFunction getLookupFunction(
RelOptTable temporalTable,
Expand Down Expand Up @@ -272,6 +287,22 @@ private static LookupFunction wrapSyncRetryDelegator(
return provider.createLookupFunction();
}

/**
* Wraps AsyncLookupFunction into a RetryableAsyncLookupFunctionDelegator to support retry.
* Note: only AsyncLookupFunction is supported.
*/
private static AsyncLookupFunction wrapASyncRetryDelegator(
AsyncLookupFunctionProvider provider, LookupJoinHintSpec joinHintSpec) {
if (joinHintSpec != null) {
ResultRetryStrategy retryStrategy = joinHintSpec.toRetryStrategy();
if (retryStrategy != NO_RETRY_STRATEGY) {
return new RetryableAsyncLookupFunctionDelegator(
provider.createAsyncLookupFunction(), joinHintSpec.toRetryStrategy());
}
}
return provider.createAsyncLookupFunction();
}

private static void findLookupFunctionFromNewSource(
TableSourceTable temporalTable,
int[] lookupKeyIndicesInOrder,
Expand Down Expand Up @@ -312,6 +343,7 @@ private static void findLookupFunctionFromNewSource(
lookupKeyIndicesInOrder,
classLoader,
tableSourceRowType);
// retry on fullCachingLookupFunction is meaningless
syncLookupFunction =
new CachingLookupFunction(
fullCache, fullCachingLookupProvider.createLookupFunction());
Expand All @@ -327,10 +359,12 @@ private static void findLookupFunctionFromNewSource(
asyncLookupFunction =
new CachingAsyncLookupFunction(
partialCachingLookupProvider.getCache(),
partialCachingLookupProvider.createAsyncLookupFunction());
wrapASyncRetryDelegator(
partialCachingLookupProvider, joinHintSpec));
} else {
asyncLookupFunction =
((AsyncLookupFunctionProvider) provider).createAsyncLookupFunction();
wrapASyncRetryDelegator(
(AsyncLookupFunctionProvider) provider, joinHintSpec);
}
}
if (provider instanceof TableFunctionProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ class AsyncLookupJoinITCase(

val expected = if (legacyTableSource) {
// test legacy lookup source do not support lookup threshold
// for real async lookup functions(both new and legacy api) do support retry
// also legacy lookup source do not support retry
Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
} else {
// the user_table_with_lookup_threshold3 will return null result before 3rd lookup
Expand Down Expand Up @@ -503,14 +503,7 @@ class AsyncLookupJoinITCase(
.addSink(sink)
env.execute()

val expected = if (legacyTableSource) {
// test legacy lookup source do not support lookup threshold
// for real async lookup functions(both new and legacy api) do support retry
Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
} else {
// TODO retry on async is not supported currently, this should be updated after supported
Seq()
}
val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.runtime.operators.join.lookup;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
Expand All @@ -36,7 +37,8 @@ public class ResultRetryStrategy implements AsyncRetryStrategy<RowData> {
new ResultRetryStrategy(AsyncRetryStrategies.NO_RETRY_STRATEGY);
private AsyncRetryStrategy retryStrategy;

private ResultRetryStrategy(AsyncRetryStrategy retryStrategy) {
@VisibleForTesting
public ResultRetryStrategy(AsyncRetryStrategy retryStrategy) {
this.retryStrategy = retryStrategy;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.join.lookup;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;

import javax.annotation.Nonnull;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** A delegator holds user's {@link AsyncLookupFunction} to handle retries. */
public class RetryableAsyncLookupFunctionDelegator extends AsyncLookupFunction {

private final AsyncLookupFunction userLookupFunction;

private final ResultRetryStrategy retryStrategy;

private final boolean retryEnabled;

private transient Predicate<Collection<RowData>> retryResultPredicate;

public RetryableAsyncLookupFunctionDelegator(
@Nonnull AsyncLookupFunction userLookupFunction,
@Nonnull ResultRetryStrategy retryStrategy) {
this.userLookupFunction = checkNotNull(userLookupFunction);
this.retryStrategy = checkNotNull(retryStrategy);
this.retryEnabled = retryStrategy.getRetryPredicate().resultPredicate().isPresent();
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
userLookupFunction.open(context);
retryResultPredicate =
retryStrategy.getRetryPredicate().resultPredicate().orElse(ignore -> false);
}

@Override
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
if (!retryEnabled) {
return userLookupFunction.asyncLookup(keyRow);
}
CompletableFuture<Collection<RowData>> resultFuture = new CompletableFuture<>();
lookupWithRetry(resultFuture, 1, keyRow);
return resultFuture;
}

private void lookupWithRetry(
final CompletableFuture<Collection<RowData>> resultFuture,
final int currentAttempts,
final RowData keyRow) {
CompletableFuture<Collection<RowData>> lookupFuture =
userLookupFunction.asyncLookup(keyRow);

lookupFuture.whenCompleteAsync(
(result, throwable) -> {
if (retryResultPredicate.test(result)
&& retryStrategy.canRetry(currentAttempts)) {
long backoff = retryStrategy.getBackoffTimeMillis(currentAttempts);
try {
Thread.sleep(backoff);
} catch (InterruptedException e) {
// Do not raise an error when interrupted, just complete with last
// result intermediately.
resultFuture.complete(result);
return;
}
lookupWithRetry(resultFuture, currentAttempts + 1, keyRow);
} else {
resultFuture.complete(result);
}
});
}

@Override
public void close() throws Exception {
userLookupFunction.close();
super.close();
}
}

0 comments on commit 22230bb

Please sign in to comment.