Skip to content

Commit

Permalink
Add missing Swagger annotations for Pulsar Functions REST API endpoin…
Browse files Browse the repository at this point in the history
…ts (#1661)

* begin adding swagger annotations to pulsar functions endpoints

* annotations for more endpoints

* fix usages of 'exist' and property (vs tenant)

* fix syntax of @ApiResponses annotations

* add missing annotations
  • Loading branch information
lucperkins authored and merlimat committed Apr 30, 2018
1 parent 42c2f37 commit 1cac0ba
Show file tree
Hide file tree
Showing 7 changed files with 3,240 additions and 1,168 deletions.
Expand Up @@ -118,7 +118,7 @@ public void createCluster(@PathParam("cluster") String cluster, ClusterData clus
log.info("[{}] Created cluster {}", clientAppId(), cluster);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), cluster);
throw new RestException(Status.CONFLICT, "Cluster already exist");
throw new RestException(Status.CONFLICT, "Cluster already exists");
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create cluster with invalid name {}", clientAppId(), cluster, e);
throw new RestException(Status.PRECONDITION_FAILED, "Cluster name is not valid");
Expand Down Expand Up @@ -593,7 +593,7 @@ public void deleteNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
@Path("/{cluster}/failureDomains/{domainName}")
@ApiOperation(value = "Set cluster's failure Domain")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Broker already exist into other domain"),
@ApiResponse(code = 409, message = "Broker already exists in another domain"),
@ApiResponse(code = 404, message = "Cluster doesn't exist") })
public void setFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName,
FailureDomain domain) throws Exception {
Expand Down Expand Up @@ -719,7 +719,7 @@ private void validateBrokerExistsInOtherDomain(final String cluster, final Strin
.filter(inputDomain.brokers::contains).collect(Collectors.toList());
if (!duplicateBrokers.isEmpty()) {
throw new RestException(Status.CONFLICT,
duplicateBrokers + " already exist into " + domainName);
duplicateBrokers + " already exists in " + domainName);
}
}
} catch (Exception e) {
Expand Down
Expand Up @@ -31,7 +31,16 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
Expand All @@ -51,6 +60,13 @@ public WorkerService get() {
}

@POST
@ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 200, message = "Pulsar Function successfully created")
})
@Path("/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response registerFunction(final @PathParam("tenant") String tenant,
Expand All @@ -62,10 +78,15 @@ public Response registerFunction(final @PathParam("tenant") String tenant,

return functions.registerFunction(
tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson);

}

@PUT
@ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
@ApiResponse(code = 200, message = "Pulsar Function successfully updated")
})
@Path("/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response updateFunction(final @PathParam("tenant") String tenant,
Expand All @@ -82,6 +103,14 @@ public Response updateFunction(final @PathParam("tenant") String tenant,


@DELETE
@ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function doesn't exist"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 200, message = "The function was successfully deleted")
})
@Path("/{tenant}/{namespace}/{functionName}")
public Response deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
Expand All @@ -91,16 +120,34 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant,
}

@GET
@ApiOperation(
value = "Fetches information about a Pulsar Function currently running in cluster mode",
response = FunctionMetaData.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 404, message = "The function doesn't exist")
})
@Path("/{tenant}/{namespace}/{functionName}")
public Response getFunctionInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName)
throws IOException {
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionInfo(
tenant, namespace, functionName);
}

@GET
@ApiOperation(
value = "Displays the status of a Pulsar Function instance",
response = FunctionStatus.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The function doesn't exist")
})
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
Expand All @@ -111,6 +158,14 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena
}

@GET
@ApiOperation(
value = "Displays the status of a Pulsar Function running in cluster mode",
response = FunctionStatus.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}/{functionName}/status")
public Response getFunctionStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
Expand All @@ -120,6 +175,15 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
}

@GET
@ApiOperation(
value = "Lists all Pulsar Functions currently deployed in a given namespace",
response = String.class,
responseContainer = "Collection"
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}")
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
Expand All @@ -129,18 +193,45 @@ public Response listFunctions(final @PathParam("tenant") String tenant,
}

@GET
@ApiOperation(
value = "Fetches information about the Pulsar cluster running Pulsar Functions",
response = MembershipManager.WorkerInfo.class,
responseContainer = "List"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster")
public Response getCluster() {
return functions.getCluster();
}

@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
response = Assignment.class,
responseContainer = "Map"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/assignments")
public Response getAssignments() {
return functions.getAssignments();
}

@POST
@ApiOperation(
value = "Triggers a Pulsar Function with a user-specified value or file data",
response = Message.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/trigger")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response triggerFunction(final @PathParam("tenant") String tenant,
Expand All @@ -155,6 +246,10 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
}

@POST
@ApiOperation(
value = "Uploads Pulsar Function file data",
hidden = true
)
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
Expand All @@ -163,6 +258,10 @@ public Response uploadFunction(final @FormDataParam("data") InputStream uploaded
}

@GET
@ApiOperation(
value = "Downloads Pulsar Function file data",
hidden = true
)
@Path("/download")
public Response downloadFunction(final @QueryParam("path") String path) {
return functions.downloadFunction(path);
Expand Down
Expand Up @@ -346,15 +346,15 @@ protected void internalCreatePartitionedTopic(int numPartitions, boolean authori
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}

/**
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
*
Expand Down

0 comments on commit 1cac0ba

Please sign in to comment.