Conversation
sijie
left a comment
There was a problem hiding this comment.
it is unclear to me what is this PR for. please update the description with your motivation and what are the changes.
There was a problem hiding this comment.
I don't understand why the return value was changed to void?
There was a problem hiding this comment.
because there is no output that needs to be returned. If there is an error raising a RestException will trigger jetty to return the error message
There was a problem hiding this comment.
but this is a breaking change no? How can old pulsar admin clients talk to new broker?
There was a problem hiding this comment.
updateSink won't be a breaking change because the the response returned has no content just a 400 status code
There was a problem hiding this comment.
let's not couple fixing typos with other changes.
|
I didn't mark this ready for review yet. |
efcc2da to
dd4e12a
Compare
There was a problem hiding this comment.
This should return FunctionConfig
There was a problem hiding this comment.
this should return a list
There was a problem hiding this comment.
this should return something. Not sure if its String or byte[]
There was a problem hiding this comment.
This should not return void
There was a problem hiding this comment.
this should not return void
There was a problem hiding this comment.
We should just have another method called getSinkInfo and it should return a SinkConfig not object
There was a problem hiding this comment.
Same as my comment for Sink
There was a problem hiding this comment.
Can you rename the function to throwUnavailableException
There was a problem hiding this comment.
This should be FunctionConfig
...unctions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
Outdated
Show resolved
Hide resolved
...unctions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
Outdated
Show resolved
Hide resolved
...unctions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
should return this value
| when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false); | ||
|
|
||
| Response response = updateDefaultSource(); | ||
| assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should still check status code
| when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); | ||
|
|
||
| Response response = updateDefaultSource(); | ||
| assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should still check status code
| when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); | ||
|
|
||
| Response response = updateDefaultSource(); | ||
| assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should still check status code
| when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult); | ||
|
|
||
| Response response = updateDefaultSource(); | ||
| assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should still check status code
| function, | ||
| null); | ||
|
|
||
| assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should still check status code
| when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false); | ||
|
|
||
| Response response = deregisterDefaultSource(); | ||
| assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should still check status code
| public void testRegisterFunctionNonexistantNamespace() throws Exception { | ||
| this.namespaceList.clear(); | ||
| Response response = registerDefaultSource(); | ||
| assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should still check status code
| new Gson().toJson(sinkConfig), | ||
| null); | ||
|
|
||
| assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should still check status code
| if (expectedError == null) { | ||
| assertEquals(Status.OK.getStatusCode(), response.getStatus()); | ||
| } else { | ||
| assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); |
There was a problem hiding this comment.
we should check status code
| log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName); | ||
| return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) | ||
| .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build(); | ||
| throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't exist", componentType, componentName)); |
There was a problem hiding this comment.
the return code should be not found
|
|
||
| if (null == worker().getStateStoreAdminClient()) { | ||
| return getStateStoreUnvailableResponse(); | ||
| getStateStoreUnvailableResponse(); |
There was a problem hiding this comment.
this should be a return statement
There was a problem hiding this comment.
We are throwing an exception why would there be a return value
Rename getStateStoreUnvailableResponse
| public Response getFunctionInfo(final String tenant, | ||
| final String namespace, | ||
| final String componentName) { | ||
| public FunctionConfig getFunctionInfo(final String tenant, |
There was a problem hiding this comment.
This function should get moved out into org.apache.pulsar.functions.worker.rest.api.FunctionImpl
|
run integration tests |
|
@sijie does this pr look good to you now? |
sijie
left a comment
There was a problem hiding this comment.
since it is changing the rest api, please add an integration test to make sure old admin client can talk to new brokers.
if the old admin client can't talk to new brokers, it is breaking BC. then you should consider adding v3 api instead of changing the v2 api.
| @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), | ||
| @ApiResponse(code = 404, message = "The function does not exist"), | ||
| @ApiResponse(code = 500, message = "Internal server error") }) | ||
| @ApiResponse(code = 404, message = "The function does not exist"), |
There was a problem hiding this comment.
please revert the indent change here. because @apiresponse is not a separate annotation, but it is part of @ApiResponses
There was a problem hiding this comment.
but this is a breaking change no? How can old pulsar admin clients talk to new broker?
| ); | ||
| } catch (RestException re){ | ||
| assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST); | ||
| throw re; |
There was a problem hiding this comment.
keep all assertions in one place. you already catch the exception here, you should also validate @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided") rather than rethrowing the exception and use a different way to do assertions.
| topicsToSerDeClassName, | ||
| className, | ||
| parallelism, | ||
| @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided") |
There was a problem hiding this comment.
keep all assertions in one place.
| topicsToSerDeClassName, | ||
| className, | ||
| parallelism, | ||
| @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Name is not provided") |
There was a problem hiding this comment.
keep all assertions in one place.
| @Test | ||
| public void testRegisterSinkZeroParallelism() throws IOException { | ||
| testRegisterSinkMissingArguments( | ||
| @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number") |
There was a problem hiding this comment.
keep all assertions in one place.
| @Test | ||
| public void testRegisterSinkHttpUrl() throws IOException { | ||
| testRegisterSinkMissingArguments( | ||
| @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Jar") |
There was a problem hiding this comment.
keep all assertions in one place.
| } | ||
|
|
||
| @Test | ||
| @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink already exists") |
There was a problem hiding this comment.
keep all assertions in one place.
| } | ||
|
|
||
| @Test | ||
| @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure") |
There was a problem hiding this comment.
keep all assertions in one place.
| } | ||
|
|
||
| @Test | ||
| @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register") |
There was a problem hiding this comment.
keep all assertions in one place.
|
@sijie I'm not sure if this pr breaks backwards compatibility. Its not changing any return types. |
|
@srkukarni I would prefer some forms of tests to ensure it doesn't really break BC, because breaking BC is a really bad behavior to users. |
sijie
left a comment
There was a problem hiding this comment.
@srkukarni @jerrypeng @aahmed-se I have commented a few examples that return types are changes. Are you guys sure those doesn't break BC?
| @Path("/{tenant}/{namespace}") | ||
| public Response listSink(final @PathParam("tenant") String tenant, | ||
| final @PathParam("namespace") String namespace) { | ||
| public List<String> listSink(final @PathParam("tenant") String tenant, |
There was a problem hiding this comment.
this is returning List<String> instead of Response. doesn't it break the BC?
| public Response getSinkInfo(final @PathParam("tenant") String tenant, | ||
| final @PathParam("namespace") String namespace, | ||
| final @PathParam("sinkName") String sinkName) | ||
| public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant, |
There was a problem hiding this comment.
This is changing from Response to SinkConfig, doesn't it break the BC?
| final @PathParam("sourceName") String sourceName) throws IOException { | ||
| return source.getFunctionInfo( | ||
| tenant, namespace, sourceName); | ||
| public SourceConfig getSourceInfo(final @PathParam("tenant") String tenant, |
There was a problem hiding this comment.
This changes Response to SourceConfig, doesn't it break BC?
|
@sijie I think we have already broken backwards compatibility with getFunctionInfo, getStatus, getStats endpoints in previous PRs. To maintain backwards compatibility, I would suggest 1) merge this PR, 2) go back and move all the changes for those endpoints to V3, 3) add the old endpoints back to V2. Add tests to make sure old clients can still talk with V2 endpoints. |
|
I will make the changes for the assert statements not sure if we want to invest in BC tests at this time. |
can you gusy come up a list of endpoints that are added in 2.3.0 and those were changed in 2.3.0, before we make any decisions? If they are new endpoints added in 2.3.0, they are okay. |
|
There is low chance on BC breaking , I am directly returning the object to be serialized by jersey rather than doing it explicitly. |
|
@sijie The PR actually doesn’t change the output of any of the endpoints, its just returning the actual object to be serialized by jersey to json instead of doing that explicitly ourselves. This PR will also make it more clear what is actually returned so it will be easier not to break backwards compatibility since currently many endpoints are just returning a Response object which is unclear what is contained within. However, after this PR lets make sure that no changes have being introduced in the admin api so that 2.3 would break backwards compatibility for 2.2 |
We are deprecating the Response Object in favor data object responses.