Skip to content

Commit

Permalink
fix: /query-stream endpoint should serialize Struct (MINOR) (#5205)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Apr 30, 2020
1 parent d10390f commit 12b092b
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ private ErrorCodes() {
}

public static final int ERROR_CODE_MISSING_PARAM = 1;
public static final int ERROR_CODE_UNKNOWN_PARAM = 2;
public static final int ERROR_CODE_UNKNOWN_QUERY_ID = 3;
public static final int ERROR_CODE_MALFORMED_REQUEST = 4;
public static final int ERROR_CODE_INVALID_QUERY = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_MALFORMED_REQUEST;
import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_MISSING_PARAM;
import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_UNKNOWN_PARAM;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;

import io.confluent.ksql.api.server.protocol.PojoCodec;
Expand Down Expand Up @@ -94,13 +93,6 @@ public void onMissingParam(final String paramName) {
ERROR_CODE_MISSING_PARAM));
}

@Override
public void onExtraParam(final String paramName) {
routingContext
.fail(BAD_REQUEST.code(), new KsqlApiException("Unknown arg " + paramName,
ERROR_CODE_UNKNOWN_PARAM));
}

@Override
public void onInvalidJson() {
routingContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.jackson.DatabindCodec;
import java.io.IOException;
import java.util.Optional;

Expand All @@ -31,18 +30,16 @@
*/
public final class PojoCodec {

private static final ObjectMapper OBJECT_MAPPER = ApiJsonMapper.INSTANCE.get();

private PojoCodec() {
}

public static <T> Optional<T> deserialiseObject(final Buffer buffer,
final PojoDeserializerErrorHandler errorHandler,
final Class<T> clazz) {
final ObjectMapper objectMapper = DatabindCodec.mapper();
try {
return Optional.of(objectMapper.readValue(buffer.getBytes(), clazz));
} catch (UnrecognizedPropertyException e) {
errorHandler.onExtraParam(e.getPropertyName());
return Optional.empty();
return Optional.of(OBJECT_MAPPER.readValue(buffer.getBytes(), clazz));
} catch (MismatchedInputException e) {
// This is super ugly but I can't see how else to extract the property name
final int startIndex = e.getMessage().indexOf('\'');
Expand All @@ -59,9 +56,8 @@ public static <T> Optional<T> deserialiseObject(final Buffer buffer,
}

public static <T> Buffer serializeObject(final T t) {
final ObjectMapper objectMapper = DatabindCodec.mapper();
try {
final byte[] bytes = objectMapper.writeValueAsBytes(t);
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(t);
return Buffer.buffer(bytes);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize buffer", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ public interface PojoDeserializerErrorHandler {
*/
void onMissingParam(String paramName);

/**
* Called when a POJO fails to deserialise because the JSON contains an extra (unknown) param
*
* @param paramName the name of the param
*/
void onExtraParam(String paramName);

/**
* Called when a POJO fails to deserialise because the JSON is not well formed
*/
Expand Down
61 changes: 0 additions & 61 deletions ksqldb-rest-app/src/test/java/io/confluent/ksql/api/ApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_INVALID_QUERY;
import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_MALFORMED_REQUEST;
import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_MISSING_PARAM;
import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_UNKNOWN_PARAM;
import static io.confluent.ksql.api.server.ErrorCodes.ERROR_CODE_UNKNOWN_QUERY_ID;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.CoreMatchers.hasItem;
Expand Down Expand Up @@ -232,26 +231,6 @@ public void shouldHandleQueryWithMissingSql() throws Exception {
validateError(ERROR_CODE_MISSING_PARAM, "No sql in arguments", queryResponse.responseObject);
}

@Test
public void shouldHandleExtraArgInQuery() throws Exception {

// Given
JsonObject requestBody = new JsonObject().put("sql", DEFAULT_PULL_QUERY)
.put("badarg", 213);

// When
HttpResponse<Buffer> response = sendRequest("/query-stream",
requestBody.toBuffer().appendString("\n"));

// Then
assertThat(response.statusCode(), is(400));
assertThat(response.statusMessage(), is("Bad Request"));

QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
validateError(ERROR_CODE_UNKNOWN_PARAM, "Unknown arg badarg",
queryResponse.responseObject);
}

@Test
public void shouldHandleErrorInProcessingQuery() throws Exception {

Expand Down Expand Up @@ -368,26 +347,6 @@ public void shouldHandleMissingQueryIDInCloseQuery() throws Exception {
queryResponse.responseObject);
}

@Test
public void shouldHandleExtraArgInCloseQuery() throws Exception {

// Given
JsonObject requestBody = new JsonObject().put("queryId", "qwydguygwd")
.put("badarg", 213);

// When
HttpResponse<Buffer> response = sendRequest("/close-query",
requestBody.toBuffer().appendString("\n"));

// Then
assertThat(response.statusCode(), is(400));
assertThat(response.statusMessage(), is("Bad Request"));

QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
validateError(ERROR_CODE_UNKNOWN_PARAM, "Unknown arg badarg",
queryResponse.responseObject);
}

@Test
public void shouldHandleUnknownQueryIDInCloseQuery() throws Exception {

Expand Down Expand Up @@ -515,26 +474,6 @@ public void shouldHandleMissingTargetInInserts() throws Exception {
validateError(ERROR_CODE_MISSING_PARAM, "No target in arguments", queryResponse.responseObject);
}

@Test
public void shouldHandleExtraArgInInserts() throws Exception {

// Given
JsonObject requestBody = new JsonObject().put("target", "some-stream")
.put("badarg", 213);

// When
HttpResponse<Buffer> response = sendRequest("/inserts-stream",
requestBody.toBuffer().appendString("\n"));

// Then
assertThat(response.statusCode(), is(400));
assertThat(response.statusMessage(), is("Bad Request"));

QueryResponse queryResponse = new QueryResponse(response.bodyAsString());
validateError(ERROR_CODE_UNKNOWN_PARAM, "Unknown arg badarg",
queryResponse.responseObject);
}

@Test
public void shouldHandleErrorInProcessingInserts() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand All @@ -34,11 +37,18 @@
@RunWith(MockitoJUnitRunner.class)
public class PojoCodecTest {

private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
.field("foo", Schema.OPTIONAL_INT32_SCHEMA)
.field("bar", Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build();
private static final Struct STRUCT = new Struct(STRUCT_SCHEMA);

@Mock
private PojoDeserializerErrorHandler errorHandler;

@Test
public void testDeserializePojo() {
public void shouldDeserializePojo() {
JsonObject jsonObject = new JsonObject().put("field1", 123)
.put("field2", "foobar")
.put("field3", true);
Expand All @@ -51,27 +61,34 @@ public void testDeserializePojo() {
}

@Test
public void testDeserializeInvalidJson() {
public void shouldFailToDeserializeInvalidJson() {
Buffer buff = Buffer.buffer("{\"foo\":123");
Optional<TestPojo> testPojo = PojoCodec.deserialiseObject(buff, errorHandler, TestPojo.class);
assertThat(testPojo.isPresent(), is(false));
verify(errorHandler).onInvalidJson();
}

@Test
public void testDeserializeMissingField() {
public void shouldFailToDeserializeMissingField() {
Buffer buff = Buffer.buffer("{\"field1\":123,\"field2\":\"foo\"}");
Optional<TestPojo> testPojo = PojoCodec.deserialiseObject(buff, errorHandler, TestPojo.class);
assertThat(testPojo.isPresent(), is(false));
verify(errorHandler).onMissingParam("field3");
}

@Test
public void testDeserializeUnknownField() {
public void shouldDeserializeWithUnknownField() {
Buffer buff = Buffer.buffer("{\"field1\":123,\"field2\":\"foo\",\"field3\":true,\"blah\":432}");
Optional<TestPojo> testPojo = PojoCodec.deserialiseObject(buff, errorHandler, TestPojo.class);
assertThat(testPojo.isPresent(), is(false));
verify(errorHandler).onExtraParam("blah");
assertThat(testPojo.isPresent(), is(true));
assertThat(testPojo.get().field1, is(123));
assertThat(testPojo.get().field2, is("foo"));
assertThat(testPojo.get().field3, is(true));
}

@Test
public void shouldSerializeStruct() {
PojoCodec.serializeObject(STRUCT);
}

public static class TestPojo {
Expand Down

0 comments on commit 12b092b

Please sign in to comment.