From a9b7f247276a0f6ca8fc620642f9320a2d955d22 Mon Sep 17 00:00:00 2001 From: Bill Date: Tue, 7 Oct 2025 19:36:52 -0400 Subject: [PATCH 1/3] Update `ClientTelemetryReporter` to handle failures by differentiating between retryable and fatal exceptions and add test cases --- checkstyle/suppressions.xml | 2 +- .../internals/ClientTelemetryReporter.java | 29 ++- .../ClientTelemetryReporterTest.java | 177 ++++++++++++++++++ 3 files changed, 205 insertions(+), 3 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index dc9ac09a9b25c..d363b9ed9c0d0 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -88,7 +88,7 @@ files="ClientUtils.java"/> + files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest|ClientTelemetryReporterTest).java"/> diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index bef65977be4c6..763bc765c0639 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; import org.apache.kafka.common.message.PushTelemetryRequestData; @@ -524,17 +525,41 @@ public void handleResponse(PushTelemetryResponse response) { lock.writeLock().unlock(); } } + + private boolean isRetryable(final KafkaException maybeFatalException) { + if (maybeFatalException == null) { + return true; + } + + Throwable cause; + if (maybeFatalException.getClass() == KafkaException.class) { + if (maybeFatalException.getCause() == null) { + return false; + } else { + cause = maybeFatalException.getCause(); + } + } else { + cause = maybeFatalException; + } + while (cause != null) { + if (!(cause instanceof RetriableException)) { + return false; + } + cause = cause.getCause(); + } + return true; + } @Override public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) { log.debug("The broker generated an error for the get telemetry network API request", maybeFatalException); - handleFailedRequest(maybeFatalException != null); + handleFailedRequest(isRetryable(maybeFatalException)); } @Override public void handleFailedPushTelemetryRequest(KafkaException maybeFatalException) { log.debug("The broker generated an error for the push telemetry network API request", maybeFatalException); - handleFailedRequest(maybeFatalException != null); + handleFailedRequest(isRetryable(maybeFatalException)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java index 935c02dbf833f..093ec04d1e16b 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java @@ -21,6 +21,11 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.NetworkException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; import org.apache.kafka.common.message.PushTelemetryRequestData; @@ -901,6 +906,178 @@ public void testTelemetryReporterInitiateCloseAlreadyInTerminatedStates() { .telemetrySender()).state()); } + @Test + public void testHandleFailedGetTelemetrySubscriptionsRequestWithRetriableException() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + + KafkaException retriableException = new TimeoutException("Request timed out"); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(retriableException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs()); + assertTrue(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedRetriableException() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + + KafkaException wrappedException = new KafkaException(new DisconnectException("Connection lost")); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs()); + assertTrue(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedGetTelemetrySubscriptionsRequestWithFatalException() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + + KafkaException fatalException = new AuthorizationException("Not authorized for telemetry"); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(fatalException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs()); + assertFalse(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedFatalException() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + + KafkaException wrappedException = new KafkaException("Version check failed", + new UnsupportedVersionException("Broker doesn't support telemetry")); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs()); + assertFalse(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedPushTelemetryRequestWithRetriableException() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS)); + + KafkaException networkException = new NetworkException("Network failure"); + telemetrySender.handleFailedPushTelemetryRequest(networkException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs()); + assertTrue(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedPushTelemetryRequestWithFatalException() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS)); + + KafkaException authException = new AuthorizationException("Not authorized to push telemetry"); + telemetrySender.handleFailedPushTelemetryRequest(authException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs()); + assertFalse(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedRequestWithMultipleRetriableExceptionsInChain() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + + KafkaException chainedException = new TimeoutException("Outer timeout", + new DisconnectException("Inner disconnect")); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(chainedException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs()); + assertTrue(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedRequestWithMixedExceptionChain() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + + KafkaException mixedException = new TimeoutException("Timeout during auth", + new AuthorizationException("Auth failed")); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(mixedException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs()); + assertFalse(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedRequestWithGenericKafkaException() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + + KafkaException genericException = new KafkaException("Unknown error"); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(genericException); + + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs()); + assertFalse(telemetrySender.enabled()); + } + + @Test + public void testHandleFailedRequestDuringTermination() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED)); + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED)); + + KafkaException exception = new TimeoutException("Timeout"); + telemetrySender.handleFailedPushTelemetryRequest(exception); + + assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED, telemetrySender.state()); + assertTrue(telemetrySender.enabled()); + } + + @Test + public void testSequentialFailuresWithDifferentExceptionTypes() { + ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); + telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); + + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest( + new TimeoutException("Timeout 1")); + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertTrue(telemetrySender.enabled()); + + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest( + new DisconnectException("Disconnect")); + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertTrue(telemetrySender.enabled()); + + assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); + telemetrySender.handleFailedGetTelemetrySubscriptionsRequest( + new UnsupportedVersionException("Version not supported")); + assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); + assertFalse(telemetrySender.enabled()); + } + @AfterEach public void tearDown() { clientTelemetryReporter.close(); From 4e729a9e0db90d4051af30f7d2f4fe93754cff7b Mon Sep 17 00:00:00 2001 From: Bill Date: Wed, 8 Oct 2025 18:19:03 -0400 Subject: [PATCH 2/3] Move private method to later in class --- .../internals/ClientTelemetryReporter.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index 763bc765c0639..bbaa7130cc2b9 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -525,30 +525,6 @@ public void handleResponse(PushTelemetryResponse response) { lock.writeLock().unlock(); } } - - private boolean isRetryable(final KafkaException maybeFatalException) { - if (maybeFatalException == null) { - return true; - } - - Throwable cause; - if (maybeFatalException.getClass() == KafkaException.class) { - if (maybeFatalException.getCause() == null) { - return false; - } else { - cause = maybeFatalException.getCause(); - } - } else { - cause = maybeFatalException; - } - while (cause != null) { - if (!(cause instanceof RetriableException)) { - return false; - } - cause = cause.getCause(); - } - return true; - } @Override public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) { @@ -653,6 +629,30 @@ public void initiateClose() { } } + private boolean isRetryable(final KafkaException maybeFatalException) { + if (maybeFatalException == null) { + return true; + } + + Throwable cause; + if (maybeFatalException.getClass() == KafkaException.class) { + if (maybeFatalException.getCause() == null) { + return false; + } else { + cause = maybeFatalException.getCause(); + } + } else { + cause = maybeFatalException; + } + while (cause != null) { + if (!(cause instanceof RetriableException)) { + return false; + } + cause = cause.getCause(); + } + return true; + } + private Optional> createSubscriptionRequest(ClientTelemetrySubscription localSubscription) { /* If we've previously retrieved a subscription, it will contain the client instance ID From 900be0124c55f3d07ee8031cfeba7d34ca0b1e45 Mon Sep 17 00:00:00 2001 From: Bill Date: Thu, 9 Oct 2025 09:11:37 -0400 Subject: [PATCH 3/3] Simplified logic for retriable --- .../internals/ClientTelemetryReporter.java | 24 +++---------------- .../ClientTelemetryReporterTest.java | 17 +------------ 2 files changed, 4 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index bbaa7130cc2b9..88bedf0dcaa23 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -630,27 +630,9 @@ public void initiateClose() { } private boolean isRetryable(final KafkaException maybeFatalException) { - if (maybeFatalException == null) { - return true; - } - - Throwable cause; - if (maybeFatalException.getClass() == KafkaException.class) { - if (maybeFatalException.getCause() == null) { - return false; - } else { - cause = maybeFatalException.getCause(); - } - } else { - cause = maybeFatalException; - } - while (cause != null) { - if (!(cause instanceof RetriableException)) { - return false; - } - cause = cause.getCause(); - } - return true; + return maybeFatalException == null || + (maybeFatalException instanceof RetriableException) || + (maybeFatalException.getCause() != null && maybeFatalException.getCause() instanceof RetriableException); } private Optional> createSubscriptionRequest(ClientTelemetrySubscription localSubscription) { diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java index 093ec04d1e16b..f693912f6616e 100644 --- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java @@ -1009,22 +1009,7 @@ public void testHandleFailedRequestWithMultipleRetriableExceptionsInChain() { assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs()); assertTrue(telemetrySender.enabled()); } - - @Test - public void testHandleFailedRequestWithMixedExceptionChain() { - ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); - telemetrySender.updateSubscriptionResult(subscription, time.milliseconds()); - assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS)); - - KafkaException mixedException = new TimeoutException("Timeout during auth", - new AuthorizationException("Auth failed")); - telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(mixedException); - - assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state()); - assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs()); - assertFalse(telemetrySender.enabled()); - } - + @Test public void testHandleFailedRequestWithGenericKafkaException() { ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();