diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 9c1575fc8ed4d..6eb130189b08b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -667,7 +667,7 @@ public void startFunction( @Consumes(MediaType.MULTIPART_FORM_DATA) public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("path") String path) { - functions().uploadFunction(uploadedInputStream, path, clientAppId()); + functions().uploadFunction(uploadedInputStream, path, clientAppId(), clientAuthData()); } @GET @@ -735,6 +735,6 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan final @FormDataParam("delete") boolean delete) { functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream, - delete, uri.getRequestUri(), clientAppId()); + delete, uri.getRequestUri(), clientAppId(), clientAuthData()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java index d79eeeef97249..5d57723eb03a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java @@ -554,6 +554,6 @@ public List getSinkConfigDefinition( }) @Path("/reloadBuiltInSinks") public void reloadSinks() { - sinks().reloadConnectors(clientAppId()); + sinks().reloadConnectors(clientAppId(), clientAuthData()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java index 1d0f555cec26d..2d951b0e77d72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java @@ -506,6 +506,6 @@ public List getSourceConfigDefinition( }) @Path("/reloadBuiltInSources") public void reloadSources() { - sources().reloadConnectors(clientAppId()); + sources().reloadConnectors(clientAppId(), clientAuthData()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index aff7d608214e4..1c442a49a40f4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -901,13 +901,13 @@ public List getListOfConnectors() { } @Override - public void reloadConnectors(String clientRole) { + public void reloadConnectors(String clientRole, AuthenticationDataSource authenticationData) { if (!isWorkerServiceAvailable()) { throwUnavailableException(); } if (worker().getWorkerConfig().isAuthorizationEnabled()) { // Only superuser has permission to do this operation. - if (!isSuperUser(clientRole)) { + if (!isSuperUser(clientRole, authenticationData)) { throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access"); } } @@ -1205,13 +1205,14 @@ public void putFunctionState(final String tenant, } @Override - public void uploadFunction(final InputStream uploadedInputStream, final String path, String clientRole) { + public void uploadFunction(final InputStream uploadedInputStream, final String path, String clientRole, + AuthenticationDataSource authenticationData) { if (!isWorkerServiceAvailable()) { throwUnavailableException(); } - if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { + if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole, authenticationData)) { throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation"); } @@ -1309,7 +1310,7 @@ public StreamingOutput downloadFunction(final String path, String clientRole, Au throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); } } else { - if (!isSuperUser(clientRole)) { + if (!isSuperUser(clientRole, clientAuthenticationDataHttps)) { throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation"); } } @@ -1441,7 +1442,7 @@ public boolean isAuthorizedRole(String tenant, String namespace, String clientRo AuthenticationDataSource authenticationData) throws PulsarAdminException { if (worker().getWorkerConfig().isAuthorizationEnabled()) { // skip authorization if client role is super-user - if (isSuperUser(clientRole)) { + if (isSuperUser(clientRole, authenticationData)) { return true; } @@ -1524,14 +1525,14 @@ protected void componentInstanceStatusRequestValidate (final String tenant, } } - public boolean isSuperUser(String clientRole) { + public boolean isSuperUser(String clientRole, AuthenticationDataSource authenticationData) { if (clientRole != null) { try { if ((worker().getWorkerConfig().getSuperUserRoles() != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) { return true; } - return worker().getAuthorizationService().isSuperUser(clientRole, null) + return worker().getAuthorizationService().isSuperUser(clientRole, authenticationData) .get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { log.warn("Time-out {} sec while checking the role {} is a super user role ", diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 5d2f8ee378631..823605213ac46 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -670,14 +670,15 @@ public void updateFunctionOnWorkerLeader(final String tenant, final InputStream uploadedInputStream, final boolean delete, URI uri, - final String clientRole) { + final String clientRole, + AuthenticationDataSource authenticationData) { if (!isWorkerServiceAvailable()) { throwUnavailableException(); } if (worker().getWorkerConfig().isAuthorizationEnabled()) { - if (!isSuperUser(clientRole)) { + if (!isSuperUser(clientRole, authenticationData)) { log.error("{}/{}/{} Client [{}] is not superuser to update on worker leader {}", tenant, namespace, functionName, clientRole, ComponentTypeUtils.toString(componentType)); throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation"); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java index 9ed6afb96943c..26479baea38b4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java @@ -195,7 +195,7 @@ public Response stopFunctionInstances(String tenant, String namespace, String fu @Override public Response uploadFunction(InputStream uploadedInputStream, String path, String clientRole) { - delegate.uploadFunction(uploadedInputStream, path, clientRole); + delegate.uploadFunction(uploadedInputStream, path, clientRole, null); return Response.ok().build(); } @@ -248,4 +248,4 @@ private InstanceCommunication.FunctionStatus toProto( return functionStatus; } -} \ No newline at end of file +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java index 6d233a84df216..6d037c9855d00 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java @@ -312,7 +312,7 @@ public void startFunction(final @PathParam("tenant") String tenant, @Consumes(MediaType.MULTIPART_FORM_DATA) public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("path") String path) { - functions().uploadFunction(uploadedInputStream, path, clientAppId()); + functions().uploadFunction(uploadedInputStream, path, clientAppId(), clientAuthData()); } @GET @@ -386,6 +386,6 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan final @FormDataParam("delete") boolean delete) { functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream, - delete, uri.getRequestUri(), clientAppId()); + delete, uri.getRequestUri(), clientAppId(), clientAuthData()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java index af56d247966aa..1b3811e1379b1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java @@ -274,6 +274,6 @@ public List getSinkConfigDefinition( }) @Path("/reloadBuiltInSinks") public void reloadSinks() { - sinks().reloadConnectors(clientAppId()); + sinks().reloadConnectors(clientAppId(), clientAuthData()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java index db8eb2460c3e3..29fed9a4af3c9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java @@ -290,6 +290,6 @@ public List getSourceConfigDefinition( }) @Path("/reloadBuiltInSources") public void reloadSources() { - sources().reloadConnectors(clientAppId()); + sources().reloadConnectors(clientAppId(), clientAuthData()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java index 6139a5c6e1067..94d2a49734339 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java @@ -139,7 +139,8 @@ void putFunctionState(final String tenant, void uploadFunction(final InputStream uploadedInputStream, final String path, - String clientRole); + String clientRole, + final AuthenticationDataSource clientAuthenticationDataHttps); StreamingOutput downloadFunction(String path, String clientRole, @@ -154,5 +155,5 @@ StreamingOutput downloadFunction(String tenant, List getListOfConnectors(); - void reloadConnectors(String clientRole); + void reloadConnectors(String clientRole, AuthenticationDataSource clientAuthenticationDataHttps); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java index 9f68439919035..aa874ab205f1d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java @@ -61,7 +61,8 @@ void updateFunctionOnWorkerLeader(final String tenant, final InputStream uploadedInputStream, final boolean delete, URI uri, - final String clientRole); + final String clientRole, + final AuthenticationDataSource clientAuthenticationDataHttps); FunctionStatus getFunctionStatus(final String tenant, final String namespace, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index 03c1a149ed8c8..76586b7a7ab5d 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker.rest.api; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.broker.authentication.AuthenticationDataHttp; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.client.admin.Namespaces; @@ -55,6 +56,7 @@ import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; +import javax.servlet.http.HttpServletRequest; import java.io.InputStream; import java.util.Collections; import java.util.HashMap; @@ -261,9 +263,10 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep // test pulsar super user final String pulsarSuperUser = "pulsarSuperUser"; - when(authorizationService.isSuperUser(pulsarSuperUser, null)).thenReturn(CompletableFuture.completedFuture(true)); + when(authorizationService.isSuperUser(eq(pulsarSuperUser), any())) + .thenReturn(CompletableFuture.completedFuture(true)); assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", pulsarSuperUser, authenticationDataSource)); - assertTrue(functionImpl.isSuperUser(pulsarSuperUser)); + assertTrue(functionImpl.isSuperUser(pulsarSuperUser, null)); // test normal user functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); @@ -274,7 +277,7 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep when(admin.tenants()).thenReturn(tenants); when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin); when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false)); - when(authorizationService.isSuperUser("test-user", null)).thenReturn(CompletableFuture.completedFuture(false)); + when(authorizationService.isSuperUser(eq("test-user"), any())).thenReturn(CompletableFuture.completedFuture(false)); assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource)); // if user is tenant admin @@ -333,10 +336,10 @@ public void testIsSuperUser() throws PulsarAdminException { }); AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class); - assertTrue(functionImpl.isSuperUser(superUser)); + assertTrue(functionImpl.isSuperUser(superUser, null)); - assertFalse(functionImpl.isSuperUser("normal-user")); - assertFalse(functionImpl.isSuperUser( null)); + assertFalse(functionImpl.isSuperUser("normal-user", null)); + assertFalse(functionImpl.isSuperUser( null, null)); // test super roles is null and it's not a pulsar super user when(authorizationService.isSuperUser(superUser, null)) @@ -345,7 +348,22 @@ public void testIsSuperUser() throws PulsarAdminException { workerConfig = new WorkerConfig(); workerConfig.setAuthorizationEnabled(true); doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig(); - assertFalse(functionImpl.isSuperUser(superUser)); + assertFalse(functionImpl.isSuperUser(superUser, null)); + + // test super role is null but the auth datasource contains superuser + when(authorizationService.isSuperUser(anyString(), any(AuthenticationDataSource.class))) + .thenAnswer((invocationOnMock -> { + AuthenticationDataSource authData = invocationOnMock.getArgument(1, AuthenticationDataSource.class); + String user = authData.getHttpHeader("mockedUser"); + return CompletableFuture.completedFuture(superUser.equals(user)); + })); + AuthenticationDataSource authData = mock(AuthenticationDataSource.class); + when(authData.getHttpHeader("mockedUser")).thenReturn(superUser); + assertTrue(functionImpl.isSuperUser("non-superuser", authData)); + + AuthenticationDataSource nonSuperuserAuthData = mock(AuthenticationDataSource.class); + when(nonSuperuserAuthData.getHttpHeader("mockedUser")).thenReturn("non-superuser"); + assertFalse(functionImpl.isSuperUser("non-superuser", nonSuperuserAuthData)); } public static FunctionConfig createDefaultFunctionConfig() {