Skip to content

Commit

Permalink
KREST-10989: Handle ThrottlingQuotaExceededException exception as 429
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorvmittal10 committed Jul 7, 2023
1 parent a78856d commit a49bb0b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class KafkaExceptionMapper extends GenericExceptionMapper {
public static final int KAFKA_ERROR_ERROR_CODE = 50002;
public static final int KAFKA_RETRIABLE_ERROR_ERROR_CODE = 50003;
public static final int BROKER_NOT_AVAILABLE_ERROR_CODE = 50302;
public static final int TOO_MANY_REQUESTS_ERROR_CODE = 42901;

private static final String TOPIC_NOT_PRESENT_MESSAGE_PATTERN = "not present in metadata";

Expand Down Expand Up @@ -98,6 +100,8 @@ private static Map<Class<? extends ApiException>, ResponsePair> errorMap() {
KAFKA_BAD_REQUEST_ERROR_CODE));
errorMap.put(TopicDeletionDisabledException.class, new ResponsePair(Status.BAD_REQUEST,
KAFKA_BAD_REQUEST_ERROR_CODE));
errorMap.put(ThrottlingQuotaExceededException.class, new ResponsePair(Status.TOO_MANY_REQUESTS,
TOO_MANY_REQUESTS_ERROR_CODE));
return errorMap;
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
Expand All @@ -59,6 +60,7 @@
import static io.confluent.rest.exceptions.KafkaExceptionMapper.KAFKA_ERROR_ERROR_CODE;
import static io.confluent.rest.exceptions.KafkaExceptionMapper.KAFKA_RETRIABLE_ERROR_ERROR_CODE;
import static io.confluent.rest.exceptions.KafkaExceptionMapper.KAFKA_UNKNOWN_TOPIC_PARTITION_CODE;
import static io.confluent.rest.exceptions.KafkaExceptionMapper.TOO_MANY_REQUESTS_ERROR_CODE;
import static io.confluent.rest.exceptions.KafkaExceptionMapper.TOPIC_NOT_FOUND_ERROR_CODE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -126,6 +128,8 @@ public void testKafkaExceptions() {
KAFKA_BAD_REQUEST_ERROR_CODE);
verifyMapperResponse(new InvalidTopicException("some message"), Status.BAD_REQUEST,
KAFKA_BAD_REQUEST_ERROR_CODE);
verifyMapperResponse(new ThrottlingQuotaExceededException("some message"), Status.TOO_MANY_REQUESTS,
TOO_MANY_REQUESTS_ERROR_CODE);

//test couple of retriable exceptions
verifyMapperResponse(new NotCoordinatorException("some message"), Status.INTERNAL_SERVER_ERROR,
Expand Down

0 comments on commit a49bb0b

Please sign in to comment.