From a49bb0b07e65768949c6c351ce677358363ca6de Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Fri, 7 Jul 2023 13:50:51 +0100 Subject: [PATCH] KREST-10989: Handle ThrottlingQuotaExceededException exception as 429 --- .../io/confluent/rest/exceptions/KafkaExceptionMapper.java | 4 ++++ .../test/java/io/confluent/rest/KafkaExceptionMapperTest.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/core/src/main/java/io/confluent/rest/exceptions/KafkaExceptionMapper.java b/core/src/main/java/io/confluent/rest/exceptions/KafkaExceptionMapper.java index c8959c6252..0472d8f17c 100644 --- a/core/src/main/java/io/confluent/rest/exceptions/KafkaExceptionMapper.java +++ b/core/src/main/java/io/confluent/rest/exceptions/KafkaExceptionMapper.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.SecurityDisabledException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.TopicExistsException; @@ -61,6 +62,7 @@ public class KafkaExceptionMapper extends GenericExceptionMapper { public static final int KAFKA_ERROR_ERROR_CODE = 50002; public static final int KAFKA_RETRIABLE_ERROR_ERROR_CODE = 50003; public static final int BROKER_NOT_AVAILABLE_ERROR_CODE = 50302; + public static final int TOO_MANY_REQUESTS_ERROR_CODE = 42901; private static final String TOPIC_NOT_PRESENT_MESSAGE_PATTERN = "not present in metadata"; @@ -98,6 +100,8 @@ private static Map, ResponsePair> errorMap() { KAFKA_BAD_REQUEST_ERROR_CODE)); errorMap.put(TopicDeletionDisabledException.class, new ResponsePair(Status.BAD_REQUEST, KAFKA_BAD_REQUEST_ERROR_CODE)); + errorMap.put(ThrottlingQuotaExceededException.class, new ResponsePair(Status.TOO_MANY_REQUESTS, + TOO_MANY_REQUESTS_ERROR_CODE)); return errorMap; } diff --git a/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java b/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java index c713a4412f..9d74e1df56 100644 --- a/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java +++ b/core/src/test/java/io/confluent/rest/KafkaExceptionMapperTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.SecurityDisabledException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicDeletionDisabledException; @@ -59,6 +60,7 @@ import static io.confluent.rest.exceptions.KafkaExceptionMapper.KAFKA_ERROR_ERROR_CODE; import static io.confluent.rest.exceptions.KafkaExceptionMapper.KAFKA_RETRIABLE_ERROR_ERROR_CODE; import static io.confluent.rest.exceptions.KafkaExceptionMapper.KAFKA_UNKNOWN_TOPIC_PARTITION_CODE; +import static io.confluent.rest.exceptions.KafkaExceptionMapper.TOO_MANY_REQUESTS_ERROR_CODE; import static io.confluent.rest.exceptions.KafkaExceptionMapper.TOPIC_NOT_FOUND_ERROR_CODE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -126,6 +128,8 @@ public void testKafkaExceptions() { KAFKA_BAD_REQUEST_ERROR_CODE); verifyMapperResponse(new InvalidTopicException("some message"), Status.BAD_REQUEST, KAFKA_BAD_REQUEST_ERROR_CODE); + verifyMapperResponse(new ThrottlingQuotaExceededException("some message"), Status.TOO_MANY_REQUESTS, + TOO_MANY_REQUESTS_ERROR_CODE); //test couple of retriable exceptions verifyMapperResponse(new NotCoordinatorException("some message"), Status.INTERNAL_SERVER_ERROR,