From 2cb75ba5737d6336ffe71ca0cc619ba32e911a7c Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Tue, 16 Jan 2018 07:49:24 -0800 Subject: [PATCH] Fixing response and cli (#117) * fixing response codes and cli * cleaning up * fixing unit tests --- .../client/admin/internal/FunctionsImpl.java | 20 +++++++-- .../worker/rest/api/v1/ApiV1Resource.java | 41 +++++++++---------- .../worker/rest/api/v1/ApiV1ResourceTest.java | 37 ++++++++--------- 3 files changed, 55 insertions(+), 43 deletions(-) diff --git a/pulsar-functions/cli/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-functions/cli/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index f4dd4505731dd..a4450d4c78d5f 100644 --- a/pulsar-functions/cli/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-functions/cli/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -30,10 +30,12 @@ import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; +import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import java.io.File; import java.util.List; @@ -50,7 +52,11 @@ public FunctionsImpl(WebTarget web, Authentication auth) { @Override public List getFunctions(String tenant, String namespace) throws PulsarAdminException { try { - return request(functions.path(tenant).path(namespace)).get(new GenericType>() { + Response response = request(functions.path(tenant).path(namespace)).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(new GenericType>() { }); } catch (Exception e) { throw getApiException(e); @@ -60,7 +66,11 @@ public List getFunctions(String tenant, String namespace) throws PulsarA @Override public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException { try { - String jsonResponse = request(functions.path(tenant).path(namespace).path(function)).get().readEntity(String.class); + Response response = request(functions.path(tenant).path(namespace).path(function)).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder(); JsonFormat.parser().merge(jsonResponse, functionConfigBuilder); return functionConfigBuilder.build(); @@ -72,7 +82,11 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi @Override public FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException { try { - String jsonResponse = request(functions.path(tenant).path(namespace).path(function).path("status")).get().readEntity(String.class); + Response response = request(functions.path(tenant).path(namespace).path(function).path("status")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); JsonFormat.parser().merge(jsonResponse, functionStatusBuilder); return functionStatusBuilder.build(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v1/ApiV1Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v1/ApiV1Resource.java index b88126482b39f..b9f2db8d2c67c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v1/ApiV1Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v1/ApiV1Resource.java @@ -25,6 +25,7 @@ import com.google.protobuf.util.JsonFormat; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; @@ -35,7 +36,6 @@ import org.apache.pulsar.functions.worker.Utils; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.BaseApiResource; -import org.apache.pulsar.functions.worker.rest.RestUtils; import org.apache.pulsar.functions.worker.WorkerConfig; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -81,7 +81,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant, tenant, namespace, functionName, e); return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getMessage())).build(); + .entity(new ErrorData(e.getMessage())).build(); } FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager(); @@ -90,7 +90,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant, log.error("Function {}/{}/{} already exists", tenant, namespace, functionName); return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(String.format("Function %s already exist", functionName))).build(); + .entity(new ErrorData(String.format("Function %s already exist", functionName))).build(); } // function state @@ -134,7 +134,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant, tenant, namespace, functionName, e); return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getMessage())).build(); + .entity(new ErrorData(e.getMessage())).build(); } FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager(); @@ -142,7 +142,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant, if (!functionRuntimeManager.containsFunction(tenant, namespace, functionName)) { return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); } // function state @@ -181,7 +181,7 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant, tenant, namespace, functionName, e); return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getMessage())).build(); + .entity(new ErrorData(e.getMessage())).build(); } FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager(); @@ -190,7 +190,7 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant, tenant, namespace, functionName); return Response.status(Status.NOT_FOUND) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); } CompletableFuture completableFuture @@ -202,7 +202,7 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant, if (!requestResult.isSuccess()) { return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(requestResult.toJson()) + .entity(new ErrorData(requestResult.getMessage())) .build(); } } catch (ExecutionException e) { @@ -210,14 +210,13 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant, tenant, namespace, functionName, e); return Response.serverError() .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getCause().getMessage())) + .entity(new ErrorData(e.getCause().getMessage())) .build(); } catch (InterruptedException e) { log.error("Interrupted Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName, e); return Response.status(Status.REQUEST_TIMEOUT) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getMessage())) .build(); } @@ -238,7 +237,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant, tenant, namespace, functionName, e); return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getMessage())).build(); + .entity(new ErrorData(e.getMessage())).build(); } FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager(); @@ -247,7 +246,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant, tenant, namespace, functionName); return Response.status(Status.NOT_FOUND) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); } FunctionMetaData functionMetaData = functionRuntimeManager.getFunction(tenant, namespace, functionName).getFunctionMetaData(); @@ -269,7 +268,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, tenant, namespace, functionName, e); return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getMessage())).build(); + .entity(new ErrorData(e.getMessage())).build(); } FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager(); @@ -278,7 +277,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, tenant, namespace, functionName); return Response.status(Status.NOT_FOUND) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build(); + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); } FunctionRuntimeInfo functionRuntimeInfo = functionRuntimeManager.getFunction(tenant, namespace, functionName); @@ -291,7 +290,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, log.error("Got Exception Getting Status from Spawner", ex); return Response.status(Status.INTERNAL_SERVER_ERROR) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(ex.getMessage())).build(); + .entity(new ErrorData(ex.getMessage())).build(); } } else { FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); @@ -318,7 +317,7 @@ public Response listFunctions(final @PathParam("tenant") String tenant, tenant, namespace, e); return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getMessage())).build(); + .entity(new ErrorData(e.getMessage())).build(); } FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager(); @@ -342,7 +341,7 @@ private Response updateRequest(FunctionMetaData functionMetaData, log.error("Error uploading file {}", functionMetaData.getPackageLocation(), e); return Response.serverError() .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getMessage())) + .entity(new ErrorData(e.getMessage())) .build(); } @@ -358,22 +357,22 @@ private Response updateRequest(FunctionMetaData functionMetaData, if (!requestResult.isSuccess()) { return Response.status(Response.Status.BAD_REQUEST) .type(MediaType.APPLICATION_JSON) - .entity(requestResult.toJson()) + .entity(new ErrorData(requestResult.getMessage())) .build(); } } catch (ExecutionException e) { return Response.serverError() .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getCause().getMessage())) + .entity(new ErrorData(e.getCause().getMessage())) .build(); } catch (InterruptedException e) { return Response.status(Status.REQUEST_TIMEOUT) .type(MediaType.APPLICATION_JSON) - .entity(RestUtils.createMessage(e.getCause().getMessage())) + .entity(new ErrorData(e.getCause().getMessage())) .build(); } - return Response.status(Response.Status.OK).entity(requestResult.toJson()).build(); + return Response.status(Response.Status.OK).build(); } private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v1/ApiV1ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v1/ApiV1ResourceTest.java index e7a8c65fb8b93..e0ac3a5eca24a 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v1/ApiV1ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v1/ApiV1ResourceTest.java @@ -45,6 +45,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.RequestHandler; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; @@ -310,7 +311,7 @@ private void testRegisterFunctionMissingArguments( JsonFormat.printer().print(functionConfig)); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - Assert.assertEquals(RestUtils.createMessage(missingFieldName + " is not provided"), response.getEntity()); + Assert.assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } private Response registerDefaultFunction() throws InvalidProtocolBufferException { @@ -335,7 +336,7 @@ public void testRegisterExistedFunction() throws InvalidProtocolBufferException Response response = registerDefaultFunction(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("Function " + function + " already exist"), response.getEntity()); + assertEquals(new ErrorData("Function " + function + " already exist").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -351,7 +352,7 @@ public void testRegisterFunctionUploadFailure() throws Exception { Response response = registerDefaultFunction(); assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("upload failure"), response.getEntity()); + assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -373,7 +374,6 @@ public void testRegisterFunctionSuccess() throws Exception { Response response = registerDefaultFunction(); assertEquals(Status.OK.getStatusCode(), response.getStatus()); - assertEquals(rr.toJson(), response.getEntity()); } @Test @@ -395,7 +395,7 @@ public void testRegisterFunctionFailure() throws Exception { Response response = registerDefaultFunction(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(rr.toJson(), response.getEntity()); + assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -415,7 +415,7 @@ public void testRegisterFunctionInterrupted() throws Exception { Response response = registerDefaultFunction(); assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("java.io.IOException: Function registeration interrupted"), response.getEntity()); + assertEquals(new ErrorData("java.io.IOException: Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason); } // @@ -599,7 +599,7 @@ private void testUpdateFunctionMissingArguments( JsonFormat.printer().print(functionConfig)); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage(missingFieldName + " is not provided"), response.getEntity()); + assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } private Response updateDefaultFunction() throws InvalidProtocolBufferException { @@ -622,7 +622,7 @@ public void testUpdateNotExistedFunction() throws InvalidProtocolBufferException Response response = updateDefaultFunction(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("Function " + function + " doesn't exist"), response.getEntity()); + assertEquals(new ErrorData("Function " + function + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -638,7 +638,7 @@ public void testUpdateFunctionUploadFailure() throws Exception { Response response = updateDefaultFunction(); assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("upload failure"), response.getEntity()); + assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -660,7 +660,6 @@ public void testUpdateFunctionSuccess() throws Exception { Response response = updateDefaultFunction(); assertEquals(Status.OK.getStatusCode(), response.getStatus()); - assertEquals(rr.toJson(), response.getEntity()); } @Test @@ -682,7 +681,7 @@ public void testUpdateFunctionFailure() throws Exception { Response response = updateDefaultFunction(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(rr.toJson(), response.getEntity()); + assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -702,7 +701,7 @@ public void testUpdateFunctionInterrupted() throws Exception { Response response = updateDefaultFunction(); assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("java.io.IOException: Function registeration interrupted"), response.getEntity()); + assertEquals(new ErrorData("java.io.IOException: Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason); } // @@ -748,7 +747,7 @@ private void testDeregisterFunctionMissingArguments( function); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage(missingFieldName + " is not provided"), response.getEntity()); + assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } private Response deregisterDefaultFunction() { @@ -764,7 +763,7 @@ public void testDeregisterNotExistedFunction() { Response response = deregisterDefaultFunction(); assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("Function " + function + " doesn't exist"), response.getEntity()); + assertEquals(new ErrorData("Function " + function + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -794,7 +793,7 @@ public void testDeregisterFunctionFailure() throws Exception { Response response = deregisterDefaultFunction(); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(rr.toJson(), response.getEntity()); + assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -807,7 +806,7 @@ public void testDeregisterFunctionInterrupted() throws Exception { Response response = deregisterDefaultFunction(); assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("java.io.IOException: Function deregisteration interrupted"), response.getEntity()); + assertEquals(new ErrorData("java.io.IOException: Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason); } // @@ -853,7 +852,7 @@ private void testGetFunctionMissingArguments( function); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage(missingFieldName + " is not provided"), response.getEntity()); + assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } private Response getDefaultFunctionInfo() throws InvalidProtocolBufferException { @@ -869,7 +868,7 @@ public void testGetNotExistedFunction() throws InvalidProtocolBufferException { Response response = getDefaultFunctionInfo(); assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage("Function " + function + " doesn't exist"), response.getEntity()); + assertEquals(new ErrorData("Function " + function + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason); } @Test @@ -932,7 +931,7 @@ private void testListFunctionsMissingArguments( namespace); assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); - assertEquals(RestUtils.createMessage(missingFieldName + " is not provided"), response.getEntity()); + assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason); } private Response listDefaultFunctions() {