diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index a1d429c99a..583717a149 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -48,6 +48,7 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metadata.TableStats;
import org.apache.fluss.rpc.GatewayClientProxy;
+import org.apache.fluss.rpc.RetryableGatewayClientProxy;
import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.gateway.AdminGateway;
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
@@ -139,13 +140,21 @@ public class FlussAdmin implements Admin {
private final AdminReadOnlyGateway readOnlyGateway;
private final MetadataUpdater metadataUpdater;
+ private static final int READ_ONLY_GATEWAY_MAX_RETRIES = 3;
+
public FlussAdmin(RpcClient client, MetadataUpdater metadataUpdater) {
this.gateway =
GatewayClientProxy.createGatewayProxy(
metadataUpdater::getCoordinatorServer, client, AdminGateway.class);
- this.readOnlyGateway =
+ AdminGateway rawReadOnlyGateway =
GatewayClientProxy.createGatewayProxy(
metadataUpdater::getRandomTabletServer, client, AdminGateway.class);
+ this.readOnlyGateway =
+ RetryableGatewayClientProxy.createRetryableGatewayProxy(
+ rawReadOnlyGateway,
+ () -> metadataUpdater.updateMetadata(null, null, null),
+ READ_ONLY_GATEWAY_MAX_RETRIES,
+ AdminGateway.class);
this.metadataUpdater = metadataUpdater;
}
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/RetryableGatewayClientProxy.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/RetryableGatewayClientProxy.java
new file mode 100644
index 0000000000..7965567ff8
--- /dev/null
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/RetryableGatewayClientProxy.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.exception.RetriableException;
+import org.apache.fluss.utils.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A proxy that wraps an existing {@link RpcGateway} proxy and adds automatic retry with metadata
+ * refresh on retriable (network) errors.
+ *
+ *
This is designed to solve the stale metadata problem where cached server addresses become
+ * invalid (e.g., during rolling upgrades in Kubernetes). When an RPC call fails with a {@link
+ * RetriableException}, this proxy triggers a metadata refresh callback and retries the request with
+ * potentially updated server addresses.
+ *
+ *
The retry flow for a cluster with N stale tablet servers:
+ *
+ *
+ * - RPC fails with {@link RetriableException} (e.g., connection refused to stale IP)
+ *
- Metadata refresh is triggered, which marks the failed server as unavailable
+ *
- After N failed refreshes, all servers are marked unavailable, triggering re-initialization
+ * from bootstrap servers
+ *
- The next retry succeeds with the refreshed server addresses
+ *
+ */
+@Internal
+public class RetryableGatewayClientProxy implements InvocationHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RetryableGatewayClientProxy.class);
+
+ private final Object delegate;
+ private final Runnable metadataRefreshAction;
+ private final int maxRetries;
+
+ RetryableGatewayClientProxy(Object delegate, Runnable metadataRefreshAction, int maxRetries) {
+ this.delegate = delegate;
+ this.metadataRefreshAction = metadataRefreshAction;
+ this.maxRetries = maxRetries;
+ }
+
+ /**
+ * Creates a retryable proxy wrapping an existing gateway proxy. On {@link RetriableException},
+ * the proxy will invoke {@code metadataRefreshAction} and retry the failed RPC call.
+ *
+ * @param delegate the underlying gateway proxy to wrap
+ * @param metadataRefreshAction callback to refresh metadata (e.g., update cluster info)
+ * @param maxRetries maximum number of retries before propagating the error
+ * @param gatewayClass the gateway interface class
+ * @param the gateway type
+ * @return a retryable gateway proxy
+ */
+ public static T createRetryableGatewayProxy(
+ T delegate, Runnable metadataRefreshAction, int maxRetries, Class gatewayClass) {
+ ClassLoader classLoader = gatewayClass.getClassLoader();
+
+ @SuppressWarnings("unchecked")
+ T proxy =
+ (T)
+ Proxy.newProxyInstance(
+ classLoader,
+ new Class>[] {gatewayClass},
+ new RetryableGatewayClientProxy(
+ delegate, metadataRefreshAction, maxRetries));
+ return proxy;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ return invokeWithRetry(method, args, 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ private CompletableFuture invokeWithRetry(Method method, Object[] args, int attempt) {
+ CompletableFuture future;
+ try {
+ future = (CompletableFuture) method.invoke(delegate, args);
+ } catch (InvocationTargetException e) {
+ CompletableFuture failed = new CompletableFuture<>();
+ failed.completeExceptionally(e.getCause());
+ return failed;
+ } catch (Exception e) {
+ CompletableFuture failed = new CompletableFuture<>();
+ failed.completeExceptionally(e);
+ return failed;
+ }
+
+ CompletableFuture resultFuture = new CompletableFuture<>();
+ future.whenComplete(
+ (result, throwable) -> {
+ if (throwable == null) {
+ resultFuture.complete(result);
+ return;
+ }
+ Throwable cause = ExceptionUtils.stripCompletionException(throwable);
+ if (!(cause instanceof RetriableException) || attempt >= maxRetries) {
+ resultFuture.completeExceptionally(cause);
+ return;
+ }
+ LOG.warn(
+ "RPC call {} failed with retriable error (attempt {}/{}), "
+ + "refreshing metadata and retrying.",
+ method.getName(),
+ attempt + 1,
+ maxRetries,
+ cause);
+ // Run metadata refresh and retry on a separate thread to avoid
+ // blocking Netty IO threads that may complete the failed future.
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ metadataRefreshAction.run();
+ } catch (Exception e) {
+ LOG.warn("Failed to refresh metadata during retry", e);
+ }
+ })
+ .thenCompose(
+ ignored ->
+ RetryableGatewayClientProxy.this.invokeWithRetry(
+ method, args, attempt + 1))
+ .whenComplete(
+ (retryResult, retryError) -> {
+ if (retryError != null) {
+ resultFuture.completeExceptionally(
+ ExceptionUtils.stripCompletionException(
+ retryError));
+ } else {
+ resultFuture.complete(retryResult);
+ }
+ });
+ });
+ return resultFuture;
+ }
+}
diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/RetryableGatewayClientProxyTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/RetryableGatewayClientProxyTest.java
new file mode 100644
index 0000000000..d11d5bfbb7
--- /dev/null
+++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/RetryableGatewayClientProxyTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc;
+
+import org.apache.fluss.exception.NetworkException;
+import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.rpc.messages.ApiVersionsRequest;
+import org.apache.fluss.rpc.messages.ApiVersionsResponse;
+import org.apache.fluss.rpc.messages.AuthenticateRequest;
+import org.apache.fluss.rpc.messages.AuthenticateResponse;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit tests for {@link RetryableGatewayClientProxy}. */
+class RetryableGatewayClientProxyTest {
+
+ @Test
+ void testSuccessfulCallWithoutRetry() throws Exception {
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicInteger refreshCount = new AtomicInteger(0);
+
+ RpcGateway delegate = createGateway(callCount, 0);
+ RpcGateway proxy =
+ RetryableGatewayClientProxy.createRetryableGatewayProxy(
+ delegate, refreshCount::incrementAndGet, 3, RpcGateway.class);
+
+ CompletableFuture result = proxy.apiVersions(new ApiVersionsRequest());
+ assertThat(result.get()).isNotNull();
+ assertThat(callCount.get()).isEqualTo(1);
+ assertThat(refreshCount.get()).isEqualTo(0);
+ }
+
+ @Test
+ void testRetryOnNetworkExceptionThenSuccess() throws Exception {
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicInteger refreshCount = new AtomicInteger(0);
+
+ // Fail with NetworkException for the first 2 calls, then succeed
+ RpcGateway delegate = createGateway(callCount, 2);
+ RpcGateway proxy =
+ RetryableGatewayClientProxy.createRetryableGatewayProxy(
+ delegate, refreshCount::incrementAndGet, 3, RpcGateway.class);
+
+ CompletableFuture result = proxy.apiVersions(new ApiVersionsRequest());
+ assertThat(result.get()).isNotNull();
+ // Initial call + 2 retries = 3 total calls
+ assertThat(callCount.get()).isEqualTo(3);
+ // Metadata refresh should be called before each retry
+ assertThat(refreshCount.get()).isEqualTo(2);
+ }
+
+ @Test
+ void testExhaustsRetriesAndPropagatesError() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicInteger refreshCount = new AtomicInteger(0);
+
+ // Always fail with NetworkException (more failures than max retries)
+ RpcGateway delegate = createGateway(callCount, Integer.MAX_VALUE);
+ RpcGateway proxy =
+ RetryableGatewayClientProxy.createRetryableGatewayProxy(
+ delegate, refreshCount::incrementAndGet, 3, RpcGateway.class);
+
+ CompletableFuture result = proxy.apiVersions(new ApiVersionsRequest());
+ assertThatThrownBy(result::get)
+ .isInstanceOf(ExecutionException.class)
+ .rootCause()
+ .isInstanceOf(NetworkException.class)
+ .hasMessageContaining("Simulated network error");
+ // Initial call + 3 retries = 4 total calls
+ assertThat(callCount.get()).isEqualTo(4);
+ // Metadata refresh should be called for each retry attempt
+ assertThat(refreshCount.get()).isEqualTo(3);
+ }
+
+ @Test
+ void testNonRetriableExceptionNotRetried() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicInteger refreshCount = new AtomicInteger(0);
+
+ // Fail with a non-retriable exception
+ RpcGateway delegate =
+ new TestRpcGateway() {
+ @Override
+ public CompletableFuture apiVersions(
+ ApiVersionsRequest request) {
+ callCount.incrementAndGet();
+ CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(
+ new TableNotExistException("table does not exist"));
+ return future;
+ }
+ };
+
+ RpcGateway proxy =
+ RetryableGatewayClientProxy.createRetryableGatewayProxy(
+ delegate, refreshCount::incrementAndGet, 3, RpcGateway.class);
+
+ CompletableFuture result = proxy.apiVersions(new ApiVersionsRequest());
+ assertThatThrownBy(result::get)
+ .isInstanceOf(ExecutionException.class)
+ .rootCause()
+ .isInstanceOf(TableNotExistException.class)
+ .hasMessageContaining("table does not exist");
+ // Should only be called once - no retries for non-retriable exceptions
+ assertThat(callCount.get()).isEqualTo(1);
+ assertThat(refreshCount.get()).isEqualTo(0);
+ }
+
+ @Test
+ void testMetadataRefreshFailureDoesNotPreventRetry() throws Exception {
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicInteger refreshCount = new AtomicInteger(0);
+
+ // Fail first call, succeed second call
+ RpcGateway delegate = createGateway(callCount, 1);
+
+ // Metadata refresh throws exception
+ Runnable failingRefresh =
+ () -> {
+ refreshCount.incrementAndGet();
+ throw new RuntimeException("Simulated refresh failure");
+ };
+
+ RpcGateway proxy =
+ RetryableGatewayClientProxy.createRetryableGatewayProxy(
+ delegate, failingRefresh, 3, RpcGateway.class);
+
+ // Should still succeed because the retry goes through even if refresh fails
+ CompletableFuture result = proxy.apiVersions(new ApiVersionsRequest());
+ assertThat(result.get()).isNotNull();
+ assertThat(callCount.get()).isEqualTo(2);
+ assertThat(refreshCount.get()).isEqualTo(1);
+ }
+
+ @Test
+ void testRetryWithMaxRetriesZeroDoesNotRetry() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicInteger refreshCount = new AtomicInteger(0);
+
+ RpcGateway delegate = createGateway(callCount, Integer.MAX_VALUE);
+ RpcGateway proxy =
+ RetryableGatewayClientProxy.createRetryableGatewayProxy(
+ delegate, refreshCount::incrementAndGet, 0, RpcGateway.class);
+
+ CompletableFuture result = proxy.apiVersions(new ApiVersionsRequest());
+ assertThatThrownBy(result::get)
+ .isInstanceOf(ExecutionException.class)
+ .rootCause()
+ .isInstanceOf(NetworkException.class);
+ // Only the initial call, no retries
+ assertThat(callCount.get()).isEqualTo(1);
+ assertThat(refreshCount.get()).isEqualTo(0);
+ }
+
+ /**
+ * Creates a test gateway that fails with {@link NetworkException} for the first {@code
+ * failCount} invocations, then returns a successful response.
+ */
+ private static RpcGateway createGateway(AtomicInteger callCount, int failCount) {
+ return new TestRpcGateway() {
+ @Override
+ public CompletableFuture apiVersions(ApiVersionsRequest request) {
+ int count = callCount.incrementAndGet();
+ CompletableFuture future = new CompletableFuture<>();
+ if (count <= failCount) {
+ future.completeExceptionally(
+ new NetworkException("Simulated network error on call " + count));
+ } else {
+ future.complete(new ApiVersionsResponse());
+ }
+ return future;
+ }
+ };
+ }
+
+ /** Base test implementation of {@link RpcGateway} that throws on unimplemented methods. */
+ private abstract static class TestRpcGateway implements RpcGateway {
+
+ @Override
+ public CompletableFuture apiVersions(ApiVersionsRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture authenticate(AuthenticateRequest request) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}