From b5fcf9568abf24d3e8a7cf8127cfee811533c24f Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 20 May 2026 08:07:28 +0200 Subject: [PATCH 1/4] HDDS-15328. Fix sleep time in RetryInvocationHandler --- .../org/apache/hadoop/io_/retry/RetryInvocationHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java index 789f8357d24..171e582b9b6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java @@ -133,7 +133,7 @@ CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException { callId, retryInfo, waitTime); if (waitTime != null && waitTime > 0) { try { - Thread.sleep(retryInfo.delay); + Thread.sleep(waitTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); if (LOG.isDebugEnabled()) { From 0eb60b776a4b8d7b12b6d8b3af977aa67bf2bb12 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 20 May 2026 08:09:34 +0200 Subject: [PATCH 2/4] remove unused WAIT_RETRY --- .../apache/hadoop/io_/retry/CallReturn.java | 3 --- .../io_/retry/RetryInvocationHandler.java | 19 ++----------------- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/CallReturn.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/CallReturn.java index 514320f346f..c765d26b375 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/CallReturn.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/CallReturn.java @@ -30,12 +30,9 @@ enum State { EXCEPTION, /** Call should be retried according to the {@link RetryPolicy}. */ RETRY, - /** Call should wait and then retry according to the {@link RetryPolicy}. */ - WAIT_RETRY, } static final CallReturn RETRY = new CallReturn(State.RETRY); - static final CallReturn WAIT_RETRY = new CallReturn(State.WAIT_RETRY); private final Object returnValue; private final Throwable thrown; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java index 171e582b9b6..604b82ea2a6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java @@ -74,14 +74,6 @@ static class Call { this.retryInvocationHandler = retryInvocationHandler; } - int getCallId() { - return callId; - } - - Counters getCounters() { - return counters; - } - synchronized Long getWaitTime(final long now) { return retryInfo == null? null: retryInfo.retryTime - now; } @@ -120,12 +112,9 @@ synchronized CallReturn invokeOnce() { /** * It first processes the wait time, if there is any, * and then invokes {@link #processRetryInfo()}. + * If the wait time is positive, it sleeps. * - * If the wait time is positive, it either sleeps for synchronous calls - * or immediately returns for asynchronous calls. - * - * @return {@link CallReturn#RETRY} if the retryInfo is processed; - * otherwise, return {@link CallReturn#WAIT_RETRY}. + * @return {@link CallReturn#RETRY} */ CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException { final Long waitTime = getWaitTime(Time.monotonicNow()); @@ -184,10 +173,6 @@ static class Counters { private int retries; /** Counter for method invocation has been failed over. */ private int failovers; - - boolean isZeros() { - return retries == 0 && failovers == 0; - } } private static class ProxyDescriptor { From 17a7aaf0c297f44b917cc7dc119a24e47b0be11c Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 20 May 2026 09:00:45 +0200 Subject: [PATCH 3/4] port TestRetryProxy from Hadoop --- hadoop-hdds/common/pom.xml | 6 + .../hadoop/io_/retry/TestRetryProxy.java | 304 ++++++++++++++++++ .../io_/retry/UnreliableImplementation.java | 83 +++++ .../hadoop/io_/retry/UnreliableInterface.java | 66 ++++ 4 files changed, 459 insertions(+) create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/TestRetryProxy.java create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/UnreliableImplementation.java create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/UnreliableInterface.java diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml index daf7008fa83..0308398496e 100644 --- a/hadoop-hdds/common/pom.xml +++ b/hadoop-hdds/common/pom.xml @@ -174,6 +174,12 @@ commons-io test + + org.apache.hadoop + hadoop-common + test-jar + test + org.apache.ozone hdds-config diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/TestRetryProxy.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/TestRetryProxy.java new file mode 100644 index 00000000000..4239d49a02c --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/TestRetryProxy.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io_.retry; + +import static org.apache.hadoop.io_.retry.RetryPolicies.RETRY_FOREVER; +import static org.apache.hadoop.io_.retry.RetryPolicies.TRY_ONCE_THEN_FAIL; +import static org.apache.hadoop.io_.retry.RetryPolicies.exponentialBackoffRetry; +import static org.apache.hadoop.io_.retry.RetryPolicies.retryForeverWithFixedSleep; +import static org.apache.hadoop.io_.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.security.sasl.SaslException; +import org.apache.hadoop.io.retry.Idempotent; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; +import org.apache.hadoop.io_.retry.UnreliableInterface.UnreliableException; +import org.apache.hadoop.ipc_.ProtocolTranslator; +import org.apache.hadoop.security.AccessControlException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * TestRetryProxy tests the behaviour of the {@link RetryPolicy} class using + * a certain method of {@link UnreliableInterface} implemented by + * {@link UnreliableImplementation}. + * + * Some methods may be sensitive to the {@link Idempotent} annotation + * (annotated in {@link UnreliableInterface}). + */ +public class TestRetryProxy { + + private UnreliableImplementation unreliableImpl; + private RetryAction caughtRetryAction = null; + + @BeforeEach + public void setUp() throws Exception { + unreliableImpl = new UnreliableImplementation(); + } + + // answer mockPolicy's method with realPolicy, caught method's return value + private void setupMockPolicy(RetryPolicy mockPolicy, + final RetryPolicy realPolicy) throws Exception { + when(mockPolicy.shouldRetry(any(Exception.class), anyInt(), anyInt(), anyBoolean())) + .thenAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Exception e = (Exception) args[0]; + int retries = (int) args[1]; + int failovers = (int) args[2]; + boolean isIdempotentOrAtMostOnce = (boolean) args[3]; + caughtRetryAction = realPolicy.shouldRetry(e, retries, failovers, + isIdempotentOrAtMostOnce); + return caughtRetryAction; + }); + } + + @Test + public void testTryOnceThenFail() throws Exception { + RetryPolicy policy = mock(RetryPolicies.TryOnceThenFail.class); + RetryPolicy realPolicy = TRY_ONCE_THEN_FAIL; + setupMockPolicy(policy, realPolicy); + + UnreliableInterface unreliable = (UnreliableInterface) + RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy); + unreliable.alwaysSucceeds(); + try { + unreliable.failsOnceThenSucceeds(); + fail("Should fail"); + } catch (UnreliableException e) { + // expected + verify(policy, times(1)).shouldRetry(any(Exception.class), anyInt(), + anyInt(), anyBoolean()); + assertEquals(RetryDecision.FAIL, caughtRetryAction.action); + assertEquals("try once and fail.", caughtRetryAction.reason); + } catch (Exception e) { + fail("Other exception other than UnreliableException should also get " + + "failed."); + } + } + + /** + * Test for {@link RetryInvocationHandler#isRpcInvocation(Object)}. + */ + @Test + public void testRpcInvocation() throws Exception { + // For a proxy method should return true + final UnreliableInterface unreliable = (UnreliableInterface) + RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER); + assertTrue(RetryInvocationHandler.isRpcInvocation(unreliable)); + + final AtomicInteger count = new AtomicInteger(); + // Embed the proxy in ProtocolTranslator + ProtocolTranslator xlator = new ProtocolTranslator() { + @Override + public Object getUnderlyingProxyObject() { + count.getAndIncrement(); + return unreliable; + } + }; + + // For a proxy wrapped in ProtocolTranslator method should return true + assertTrue(RetryInvocationHandler.isRpcInvocation(xlator)); + // Ensure underlying proxy was looked at + assertEquals(1, count.get()); + + // For non-proxy the method must return false + assertFalse(RetryInvocationHandler.isRpcInvocation(new Object())); + } + + @Test + public void testRetryForever() throws UnreliableException { + UnreliableInterface unreliable = (UnreliableInterface) + RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER); + unreliable.alwaysSucceeds(); + unreliable.failsOnceThenSucceeds(); + unreliable.failsTenTimesThenSucceeds(); + } + + @Test + public void testRetryForeverWithFixedSleep() throws UnreliableException { + UnreliableInterface unreliable = (UnreliableInterface) RetryProxy.create( + UnreliableInterface.class, unreliableImpl, + retryForeverWithFixedSleep(1, TimeUnit.MILLISECONDS)); + unreliable.alwaysSucceeds(); + unreliable.failsOnceThenSucceeds(); + unreliable.failsTenTimesThenSucceeds(); + } + + @Test + public void testRetryUpToMaximumCountWithFixedSleep() throws + Exception { + + RetryPolicy policy = mock(RetryPolicies.RetryUpToMaximumCountWithFixedSleep.class); + int maxRetries = 8; + RetryPolicy realPolicy = retryUpToMaximumCountWithFixedSleep(maxRetries, 1, TimeUnit.NANOSECONDS); + setupMockPolicy(policy, realPolicy); + + UnreliableInterface unreliable = (UnreliableInterface) + RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy); + // shouldRetry += 1 + unreliable.alwaysSucceeds(); + // shouldRetry += 2 + unreliable.failsOnceThenSucceeds(); + try { + // shouldRetry += (maxRetries -1) (just failed once above) + unreliable.failsTenTimesThenSucceeds(); + fail("Should fail"); + } catch (UnreliableException e) { + // expected + verify(policy, times(maxRetries + 2)).shouldRetry(any(Exception.class), + anyInt(), anyInt(), anyBoolean()); + assertEquals(RetryDecision.FAIL, caughtRetryAction.action); + assertEquals(RetryPolicies.RetryUpToMaximumCountWithFixedSleep.constructReasonString( + maxRetries), caughtRetryAction.reason); + } catch (Exception e) { + fail("Other exception other than UnreliableException should also get " + + "failed."); + } + } + + @Test + public void testExponentialRetry() throws UnreliableException { + UnreliableInterface unreliable = (UnreliableInterface) RetryProxy.create(UnreliableInterface.class, unreliableImpl, + exponentialBackoffRetry(5, 1L, TimeUnit.NANOSECONDS)); + unreliable.alwaysSucceeds(); + unreliable.failsOnceThenSucceeds(); + try { + unreliable.failsTenTimesThenSucceeds(); + fail("Should fail"); + } catch (UnreliableException e) { + // expected + } + } + + @Test + public void testRetryInterruptible() throws Throwable { + final UnreliableInterface unreliable = (UnreliableInterface) + RetryProxy.create(UnreliableInterface.class, unreliableImpl, + retryUpToMaximumCountWithFixedSleep(10, 10, TimeUnit.SECONDS)); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference futureThread = new AtomicReference(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + Future future = exec.submit(() -> { + futureThread.set(Thread.currentThread()); + latch.countDown(); + try { + unreliable.alwaysFailsWithFatalException(); + } catch (UndeclaredThrowableException ute) { + return ute.getCause(); + } + return null; + }); + latch.await(); + Thread.sleep(1000); // time to fail and sleep + assertTrue(futureThread.get().isAlive()); + futureThread.get().interrupt(); + Throwable e = future.get(1, TimeUnit.SECONDS); // should return immediately + assertNotNull(e); + assertEquals(InterruptedIOException.class, e.getClass()); + assertEquals("Retry interrupted", e.getMessage()); + assertEquals(InterruptedException.class, e.getCause().getClass()); + assertEquals("sleep interrupted", e.getCause().getMessage()); + } finally { + exec.shutdown(); + } + } + + @Test + public void testNoRetryOnSaslError() throws Exception { + RetryPolicy policy = mock(RetryPolicy.class); + RetryPolicy realPolicy = RetryPolicies.failoverOnNetworkException(5); + setupMockPolicy(policy, realPolicy); + + UnreliableInterface unreliable = (UnreliableInterface) RetryProxy.create( + UnreliableInterface.class, unreliableImpl, policy); + + try { + unreliable.failsWithSASLExceptionTenTimes(); + fail("Should fail"); + } catch (SaslException e) { + // expected + verify(policy, times(1)).shouldRetry(any(Exception.class), anyInt(), + anyInt(), anyBoolean()); + assertEquals(RetryDecision.FAIL, caughtRetryAction.action); + } + } + + @Test + public void testNoRetryOnAccessControlException() throws Exception { + RetryPolicy policy = mock(RetryPolicy.class); + RetryPolicy realPolicy = RetryPolicies.failoverOnNetworkException(5); + setupMockPolicy(policy, realPolicy); + + UnreliableInterface unreliable = (UnreliableInterface) RetryProxy.create( + UnreliableInterface.class, unreliableImpl, policy); + + try { + unreliable.failsWithAccessControlExceptionEightTimes(); + fail("Should fail"); + } catch (AccessControlException e) { + // expected + verify(policy, times(1)).shouldRetry(any(Exception.class), anyInt(), + anyInt(), anyBoolean()); + assertEquals(RetryDecision.FAIL, caughtRetryAction.action); + } + } + + @Test + public void testWrappedAccessControlException() throws Exception { + RetryPolicy policy = mock(RetryPolicy.class); + RetryPolicy realPolicy = RetryPolicies.failoverOnNetworkException(5); + setupMockPolicy(policy, realPolicy); + + UnreliableInterface unreliable = (UnreliableInterface) RetryProxy.create( + UnreliableInterface.class, unreliableImpl, policy); + + try { + unreliable.failsWithWrappedAccessControlException(); + fail("Should fail"); + } catch (IOException expected) { + verify(policy, times(1)).shouldRetry(any(Exception.class), anyInt(), + anyInt(), anyBoolean()); + assertEquals(RetryDecision.FAIL, caughtRetryAction.action); + } + } +} diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/UnreliableImplementation.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/UnreliableImplementation.java new file mode 100644 index 00000000000..71fdc8956b9 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/UnreliableImplementation.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io_.retry; + +import java.io.IOException; +import javax.security.sasl.SaslException; +import org.apache.hadoop.security.AccessControlException; + +/** + * For the usage and purpose of this class see {@link UnreliableInterface} + * which this class implements. + * + * @see UnreliableInterface + */ +class UnreliableImplementation implements UnreliableInterface { + + private int failsOnceInvocationCount; + private int failsTenTimesInvocationCount; + private int failsWithSASLExceptionTenTimesInvocationCount; + private int failsWithAccessControlExceptionInvocationCount; + + @Override + public void alwaysSucceeds() { + // do nothing + } + + @Override + public void alwaysFailsWithFatalException() throws FatalException { + throw new FatalException(); + } + + @Override + public void failsOnceThenSucceeds() throws UnreliableException { + if (failsOnceInvocationCount++ == 0) { + throw new UnreliableException(); + } + } + + @Override + public void failsTenTimesThenSucceeds() throws UnreliableException { + if (failsTenTimesInvocationCount++ < 10) { + throw new UnreliableException(); + } + } + + @Override + public void failsWithSASLExceptionTenTimes() throws SaslException { + if (failsWithSASLExceptionTenTimesInvocationCount++ < 10) { + throw new SaslException(); + } + } + + @Override + public void failsWithAccessControlExceptionEightTimes() + throws AccessControlException { + if (failsWithAccessControlExceptionInvocationCount++ < 8) { + throw new AccessControlException(); + } + } + + @Override + public void failsWithWrappedAccessControlException() + throws IOException { + AccessControlException ace = new AccessControlException(); + IOException ioe = new IOException(ace); + throw new IOException(ioe); + } +} diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/UnreliableInterface.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/UnreliableInterface.java new file mode 100644 index 00000000000..1879250d060 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/io_/retry/UnreliableInterface.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io_.retry; + +import java.io.IOException; +import javax.security.sasl.SaslException; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.Idempotent; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.security.AccessControlException; + +/** + * The methods of UnreliableInterface could throw exceptions in a + * predefined way. It is currently used for testing {@link RetryPolicy} + * and {@link FailoverProxyProvider} classes, but can be potentially used + * to test any class's behaviour where an underlying interface or class + * may throw exceptions. + *

+ * Some methods may be annotated with the {@link Idempotent} annotation. + * In order to test those some methods of UnreliableInterface are annotated, + * but they are not actually Idempotent functions. + * + */ +interface UnreliableInterface { + + class UnreliableException extends Exception { + // no body + } + + class FatalException extends UnreliableException { + // no body + } + + void alwaysSucceeds() throws UnreliableException; + + void alwaysFailsWithFatalException() throws FatalException; + + void failsOnceThenSucceeds() throws UnreliableException; + + void failsTenTimesThenSucceeds() throws UnreliableException; + + void failsWithSASLExceptionTenTimes() throws SaslException; + + @Idempotent + void failsWithAccessControlExceptionEightTimes() + throws AccessControlException; + + @Idempotent + void failsWithWrappedAccessControlException() + throws IOException; +} From 1c562f00635d4f2c09d38736b20fb66486c849d2 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 20 May 2026 10:25:40 +0200 Subject: [PATCH 4/4] add to test profile --- pom.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pom.xml b/pom.xml index ca3b761c49e..5820d8f8558 100644 --- a/pom.xml +++ b/pom.xml @@ -2703,7 +2703,10 @@ maven-surefire-plugin + org.apache.hadoop.io_.** + org.apache.hadoop.ipc_.** org.apache.hadoop.ozone.client.** + org.apache.hadoop.security_.** ${unstable-test-groups}