diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dc9ac09a9b25c..d363b9ed9c0d0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -88,7 +88,7 @@
files="ClientUtils.java"/>
+ files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest|ClientTelemetryReporterTest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index bef65977be4c6..88bedf0dcaa23 100644
--- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -20,6 +20,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -528,13 +529,13 @@ public void handleResponse(PushTelemetryResponse response) {
@Override
public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) {
log.debug("The broker generated an error for the get telemetry network API request", maybeFatalException);
- handleFailedRequest(maybeFatalException != null);
+ handleFailedRequest(isRetryable(maybeFatalException));
}
@Override
public void handleFailedPushTelemetryRequest(KafkaException maybeFatalException) {
log.debug("The broker generated an error for the push telemetry network API request", maybeFatalException);
- handleFailedRequest(maybeFatalException != null);
+ handleFailedRequest(isRetryable(maybeFatalException));
}
@Override
@@ -628,6 +629,12 @@ public void initiateClose() {
}
}
+ private boolean isRetryable(final KafkaException maybeFatalException) {
+ return maybeFatalException == null ||
+ (maybeFatalException instanceof RetriableException) ||
+ (maybeFatalException.getCause() != null && maybeFatalException.getCause() instanceof RetriableException);
+ }
+
private Optional> createSubscriptionRequest(ClientTelemetrySubscription localSubscription) {
/*
If we've previously retrieved a subscription, it will contain the client instance ID
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index 935c02dbf833f..f693912f6616e 100644
--- a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -21,6 +21,11 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -901,6 +906,163 @@ public void testTelemetryReporterInitiateCloseAlreadyInTerminatedStates() {
.telemetrySender()).state());
}
+ @Test
+ public void testHandleFailedGetTelemetrySubscriptionsRequestWithRetriableException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException retriableException = new TimeoutException("Request timed out");
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(retriableException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedRetriableException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException wrappedException = new KafkaException(new DisconnectException("Connection lost"));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedGetTelemetrySubscriptionsRequestWithFatalException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException fatalException = new AuthorizationException("Not authorized for telemetry");
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(fatalException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedFatalException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException wrappedException = new KafkaException("Version check failed",
+ new UnsupportedVersionException("Broker doesn't support telemetry"));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedPushTelemetryRequestWithRetriableException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ KafkaException networkException = new NetworkException("Network failure");
+ telemetrySender.handleFailedPushTelemetryRequest(networkException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedPushTelemetryRequestWithFatalException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ KafkaException authException = new AuthorizationException("Not authorized to push telemetry");
+ telemetrySender.handleFailedPushTelemetryRequest(authException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedRequestWithMultipleRetriableExceptionsInChain() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException chainedException = new TimeoutException("Outer timeout",
+ new DisconnectException("Inner disconnect"));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(chainedException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedRequestWithGenericKafkaException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException genericException = new KafkaException("Unknown error");
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(genericException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedRequestDuringTermination() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+
+ KafkaException exception = new TimeoutException("Timeout");
+ telemetrySender.handleFailedPushTelemetryRequest(exception);
+
+ assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED, telemetrySender.state());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testSequentialFailuresWithDifferentExceptionTypes() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+ new TimeoutException("Timeout 1"));
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertTrue(telemetrySender.enabled());
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+ new DisconnectException("Disconnect"));
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertTrue(telemetrySender.enabled());
+
+ assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+ new UnsupportedVersionException("Version not supported"));
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
+ assertFalse(telemetrySender.enabled());
+ }
+
@AfterEach
public void tearDown() {
clientTelemetryReporter.close();