Skip to content

Commit

Permalink
KREST-10790 Handle TopicDeletionDisabledException as 400
Browse files Browse the repository at this point in the history
  • Loading branch information
msn-tldr committed Jun 29, 2023
1 parent 6a72cba commit 9bcadb7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
Expand Up @@ -19,7 +19,9 @@
import io.confluent.rest.entities.ErrorMessage;
import io.confluent.rest.exceptions.KafkaExceptionMapper;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;

public final class KafkaRestExceptionMapper extends KafkaExceptionMapper {

Expand All @@ -32,7 +34,9 @@ public Response toResponse(Throwable exception) {
if (exception instanceof SerializationException) {
// CPKAFKA-3412: FIXME We should return more specific error codes (unavailable,
// registration failed, authorization etc).
return getResponse(exception, Response.Status.REQUEST_TIMEOUT, 40801);
return getResponse(exception, Status.REQUEST_TIMEOUT, 40801);
} else if (exception instanceof TopicDeletionDisabledException) {
return getResponse(exception, Status.BAD_REQUEST, KAFKA_BAD_REQUEST_ERROR_CODE);
} else {
return super.toResponse(exception);
}
Expand Down
@@ -0,0 +1,62 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest.exceptions;

import static io.confluent.rest.exceptions.KafkaExceptionMapper.KAFKA_BAD_REQUEST_ERROR_CODE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.confluent.rest.entities.ErrorMessage;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class KafkaRestExceptionMapperTest {

private KafkaRestExceptionMapper exceptionMapper;

@BeforeEach
public void setUp() {
exceptionMapper = new KafkaRestExceptionMapper(null);
}

@Test
public void test_whenTopicDeletionDisabledExceptionThrown_thenCorrectResponseStatusIsSet() {
verifyMapperResponse(
new TopicDeletionDisabledException("some message"), Status.BAD_REQUEST,
KAFKA_BAD_REQUEST_ERROR_CODE);
}

@Test
public void test_whenSerializationExceptionThrown_thenCorrectResponseStatusIsSet() {
verifyMapperResponse(
new SerializationException("some message"), Status.REQUEST_TIMEOUT,
40801);
}

private void verifyMapperResponse(Throwable throwable, Status status, int errorCode) {
Response response =
exceptionMapper.toResponse(throwable);
assertNotNull(response);
assertEquals(status.getStatusCode(), response.getStatus());
ErrorMessage errorMessage = (ErrorMessage) response.getEntity();
assertEquals(errorCode, errorMessage.getErrorCode());
}
}

0 comments on commit 9bcadb7

Please sign in to comment.