Skip to content

Commit

Permalink
Fixing response and cli (#117)
Browse files Browse the repository at this point in the history
* fixing response codes and cli

* cleaning up

* fixing unit tests
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent 693ae3b commit 2cb75ba
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 43 deletions.
Expand Up @@ -30,10 +30,12 @@
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;

import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
import java.util.List;

Expand All @@ -50,7 +52,11 @@ public FunctionsImpl(WebTarget web, Authentication auth) {
@Override
public List<String> getFunctions(String tenant, String namespace) throws PulsarAdminException {
try {
return request(functions.path(tenant).path(namespace)).get(new GenericType<List<String>>() {
Response response = request(functions.path(tenant).path(namespace)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(new GenericType<List<String>>() {
});
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -60,7 +66,11 @@ public List<String> getFunctions(String tenant, String namespace) throws PulsarA
@Override
public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
try {
String jsonResponse = request(functions.path(tenant).path(namespace).path(function)).get().readEntity(String.class);
Response response = request(functions.path(tenant).path(namespace).path(function)).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder();
JsonFormat.parser().merge(jsonResponse, functionConfigBuilder);
return functionConfigBuilder.build();
Expand All @@ -72,7 +82,11 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi
@Override
public FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException {
try {
String jsonResponse = request(functions.path(tenant).path(namespace).path(function).path("status")).get().readEntity(String.class);
Response response = request(functions.path(tenant).path(namespace).path(function).path("status")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
JsonFormat.parser().merge(jsonResponse, functionStatusBuilder);
return functionStatusBuilder.build();
Expand Down
Expand Up @@ -25,6 +25,7 @@
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
Expand All @@ -35,7 +36,6 @@
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.BaseApiResource;
import org.apache.pulsar.functions.worker.rest.RestUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
Expand Down Expand Up @@ -81,7 +81,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
tenant, namespace, functionName, e);
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage())).build();
.entity(new ErrorData(e.getMessage())).build();
}

FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager();
Expand All @@ -90,7 +90,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(String.format("Function %s already exist", functionName))).build();
.entity(new ErrorData(String.format("Function %s already exist", functionName))).build();
}

// function state
Expand Down Expand Up @@ -134,15 +134,15 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
tenant, namespace, functionName, e);
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage())).build();
.entity(new ErrorData(e.getMessage())).build();
}

FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager();

if (!functionRuntimeManager.containsFunction(tenant, namespace, functionName)) {
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build();
.entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
}

// function state
Expand Down Expand Up @@ -181,7 +181,7 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant,
tenant, namespace, functionName, e);
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage())).build();
.entity(new ErrorData(e.getMessage())).build();
}

FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager();
Expand All @@ -190,7 +190,7 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant,
tenant, namespace, functionName);
return Response.status(Status.NOT_FOUND)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build();
.entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
}

CompletableFuture<RequestResult> completableFuture
Expand All @@ -202,22 +202,21 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant,
if (!requestResult.isSuccess()) {
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(requestResult.toJson())
.entity(new ErrorData(requestResult.getMessage()))
.build();
}
} catch (ExecutionException e) {
log.error("Execution Exception while deregistering function @ /{}/{}/{}",
tenant, namespace, functionName, e);
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getCause().getMessage()))
.entity(new ErrorData(e.getCause().getMessage()))
.build();
} catch (InterruptedException e) {
log.error("Interrupted Exception while deregistering function @ /{}/{}/{}",
tenant, namespace, functionName, e);
return Response.status(Status.REQUEST_TIMEOUT)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage()))
.build();
}

Expand All @@ -238,7 +237,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
tenant, namespace, functionName, e);
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage())).build();
.entity(new ErrorData(e.getMessage())).build();
}

FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager();
Expand All @@ -247,7 +246,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
tenant, namespace, functionName);
return Response.status(Status.NOT_FOUND)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build();
.entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
}

FunctionMetaData functionMetaData = functionRuntimeManager.getFunction(tenant, namespace, functionName).getFunctionMetaData();
Expand All @@ -269,7 +268,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
tenant, namespace, functionName, e);
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage())).build();
.entity(new ErrorData(e.getMessage())).build();
}

FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager();
Expand All @@ -278,7 +277,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
tenant, namespace, functionName);
return Response.status(Status.NOT_FOUND)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build();
.entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
}

FunctionRuntimeInfo functionRuntimeInfo = functionRuntimeManager.getFunction(tenant, namespace, functionName);
Expand All @@ -291,7 +290,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
log.error("Got Exception Getting Status from Spawner", ex);
return Response.status(Status.INTERNAL_SERVER_ERROR)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(ex.getMessage())).build();
.entity(new ErrorData(ex.getMessage())).build();
}
} else {
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
Expand All @@ -318,7 +317,7 @@ public Response listFunctions(final @PathParam("tenant") String tenant,
tenant, namespace, e);
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage())).build();
.entity(new ErrorData(e.getMessage())).build();
}

FunctionRuntimeManager functionRuntimeManager = getWorkerFunctionStateManager();
Expand All @@ -342,7 +341,7 @@ private Response updateRequest(FunctionMetaData functionMetaData,
log.error("Error uploading file {}", functionMetaData.getPackageLocation(), e);
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage()))
.entity(new ErrorData(e.getMessage()))
.build();
}

Expand All @@ -358,22 +357,22 @@ private Response updateRequest(FunctionMetaData functionMetaData,
if (!requestResult.isSuccess()) {
return Response.status(Response.Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(requestResult.toJson())
.entity(new ErrorData(requestResult.getMessage()))
.build();
}
} catch (ExecutionException e) {
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getCause().getMessage()))
.entity(new ErrorData(e.getCause().getMessage()))
.build();
} catch (InterruptedException e) {
return Response.status(Status.REQUEST_TIMEOUT)
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getCause().getMessage()))
.entity(new ErrorData(e.getCause().getMessage()))
.build();
}

return Response.status(Response.Status.OK).entity(requestResult.toJson()).build();
return Response.status(Response.Status.OK).build();
}

private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
Expand Down

0 comments on commit 2cb75ba

Please sign in to comment.