Skip to content

Commit

Permalink
KREST 9773 Handle InvalidTopicException correctly
Browse files Browse the repository at this point in the history
This is kafka exception is raised due to bad user input, hence should
be returned as a 400, instead of a 500(server error).
  • Loading branch information
msn-tldr committed Mar 30, 2023
1 parent bdf15ab commit 1ec7602
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SecurityDisabledException;
Expand Down Expand Up @@ -126,6 +127,8 @@ private Response handleException(final Throwable exception) {
}
return getResponse(exception, Status.INTERNAL_SERVER_ERROR,
KAFKA_RETRIABLE_ERROR_ERROR_CODE);
} else if (exception instanceof InvalidTopicException) {
return getResponse(exception, Status.BAD_REQUEST, KAFKA_BAD_REQUEST_ERROR_CODE);
} else if (exception instanceof KafkaException) {
log.error("Kafka exception", exception);
return getResponse(exception, Status.INTERNAL_SERVER_ERROR,
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.PolicyViolationException;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void testKafkaExceptions() {
KAFKA_BAD_REQUEST_ERROR_CODE);
verifyMapperResponse(new InvalidRequestException("some message"), Status.BAD_REQUEST,
KAFKA_BAD_REQUEST_ERROR_CODE);
verifyMapperResponse(new UnknownServerException("some message"),Status.BAD_REQUEST,
verifyMapperResponse(new UnknownServerException("some message"), Status.BAD_REQUEST,
KAFKA_BAD_REQUEST_ERROR_CODE);
verifyMapperResponse(new UnknownTopicOrPartitionException("some message"), Status.NOT_FOUND,
KAFKA_UNKNOWN_TOPIC_PARTITION_CODE);
Expand All @@ -120,11 +121,14 @@ public void testKafkaExceptions() {
KAFKA_BAD_REQUEST_ERROR_CODE);
verifyMapperResponse(new InvalidConfigurationException("some message"), Status.BAD_REQUEST,
KAFKA_BAD_REQUEST_ERROR_CODE);
verifyMapperResponse(new InvalidTopicException("some message"), Status.BAD_REQUEST,
KAFKA_BAD_REQUEST_ERROR_CODE);

//test couple of retriable exceptions
verifyMapperResponse(new NotCoordinatorException("some message"), Status.INTERNAL_SERVER_ERROR,
KAFKA_RETRIABLE_ERROR_ERROR_CODE);
verifyMapperResponse(new NotEnoughReplicasException("some message"), Status.INTERNAL_SERVER_ERROR,
verifyMapperResponse(new NotEnoughReplicasException("some message"),
Status.INTERNAL_SERVER_ERROR,
KAFKA_RETRIABLE_ERROR_ERROR_CODE);
//Including the special case of a topic not being present (eg because it's not been defined yet)
//not returning a 500 error
Expand Down

0 comments on commit 1ec7602

Please sign in to comment.