Skip to content

Commit

Permalink
Add authorization to function worker REST endpoints (#4628)
Browse files Browse the repository at this point in the history
(cherry picked from commit c0a57d0)
  • Loading branch information
jerrypeng authored and jiazhai committed Aug 27, 2019
1 parent e7d4f29 commit 3b0c3ec
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public WorkerService get() {
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public List<WorkerInfo> getCluster() {
return worker.getCluster();
return worker.getCluster(clientAppId());
}

@GET
Expand All @@ -81,7 +81,7 @@ public List<WorkerInfo> getCluster() {
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
public WorkerInfo getClusterLeader() {
return worker.getClusterLeader();
return worker.getClusterLeader(clientAppId());
}

@GET
Expand All @@ -96,7 +96,7 @@ public WorkerInfo getClusterLeader() {
@Path("/assignments")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Collection<String>> getAssignments() {
return worker.getAssignments();
return worker.getAssignments(clientAppId());
}

@GET
Expand All @@ -112,6 +112,6 @@ public Map<String, Collection<String>> getAssignments() {
@Path("/connectors")
@Produces(MediaType.APPLICATION_JSON)
public List<ConnectorDefinition> getConnectorsList() throws IOException {
return worker.getListOfConnectors();
return worker.getListOfConnectors(clientAppId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public void deregisterFunction(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to deregister {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -424,7 +424,7 @@ public FunctionConfig getFunctionInfo(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to get {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to get {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -490,7 +490,7 @@ public void changeFunctionInstanceStatus(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to start/stop {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -548,7 +548,7 @@ public void restartFunctionInstance(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to restart {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -618,7 +618,7 @@ public void changeFunctionStatusAllInstances(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to start/stop {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -673,7 +673,7 @@ public void restartFunctionInstances(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to restart {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -725,7 +725,7 @@ public FunctionStats getFunctionStats(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to get stats for {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -781,7 +781,7 @@ public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunction

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to get stats for {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -842,7 +842,7 @@ public List<String> listFunctions(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{} Client [{}] is not admin and authorized to list {}", tenant, namespace, clientRole, ComponentTypeUtils.toString(componentType));
log.error("{}/{} Client [{}] is not authorized to list {}", tenant, namespace, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
Expand Down Expand Up @@ -914,7 +914,7 @@ public String triggerFunction(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to trigger {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to trigger {}", tenant, namespace,
functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -1029,7 +1029,7 @@ public FunctionState getFunctionState(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to get state for {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to get state for {}", tenant, namespace,
functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -1115,7 +1115,7 @@ public void putFunctionState(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to put state for {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to put state for {}", tenant, namespace,
functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down Expand Up @@ -1380,7 +1380,7 @@ protected void componentStatusRequestValidate (final String tenant, final String

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized get status for {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized get status for {}", tenant, namespace,
componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void registerFunction(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to register {}", tenant, namespace,
functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand All @@ -119,7 +119,7 @@ public void registerFunction(final String tenant,
}
}
} catch (PulsarAdminException.NotAuthorizedException e) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to operate {} on tenant", tenant, namespace,
functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
Expand Down Expand Up @@ -259,7 +259,7 @@ public void updateFunction(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to update {}", tenant, namespace,
functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void registerSink(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to register {}", tenant, namespace,
sinkName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand All @@ -121,7 +121,7 @@ public void registerSink(final String tenant,
}
}
} catch (PulsarAdminException.NotAuthorizedException e) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to operate {} on tenant", tenant, namespace,
sinkName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
Expand Down Expand Up @@ -261,7 +261,7 @@ public void updateSink(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to update {}", tenant, namespace,
sinkName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void registerSource(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to register {}", tenant, namespace,
sourceName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
Expand All @@ -121,7 +121,7 @@ public void registerSource(final String tenant,
}
}
} catch (PulsarAdminException.NotAuthorizedException e) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to operate {} on tenant", tenant, namespace,
sourceName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
Expand Down Expand Up @@ -261,7 +261,7 @@ public void updateSource(final String tenant,

try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
log.error("{}/{}/{} Client [{}] is not authorized to update {}", tenant, namespace,
sourceName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");

Expand Down
Loading

0 comments on commit 3b0c3ec

Please sign in to comment.