From 76c93a3865db84287d0efca851e6ea27d148e4d0 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 4 May 2021 10:32:39 +0800 Subject: [PATCH] Fixes function api can not use authdata to check superuser (#10364) --- Fixes #10332 *Motivation* Sometimes, the superuser is not only needed the client role but also needs some data in the authentication data to check whether the role is the superuser. The changed interface is introduced from https://github.com/apache/pulsar/pull/8560, it's safe to change it since there is no release for that. --- .../broker/admin/impl/FunctionsBase.java | 4 +-- .../pulsar/broker/admin/impl/SinksBase.java | 2 +- .../pulsar/broker/admin/impl/SourcesBase.java | 2 +- .../worker/rest/api/ComponentImpl.java | 17 +++++----- .../worker/rest/api/FunctionsImpl.java | 5 +-- .../worker/rest/api/FunctionsImplV2.java | 4 +-- .../rest/api/v3/FunctionsApiV3Resource.java | 4 +-- .../rest/api/v3/SinksApiV3Resource.java | 2 +- .../rest/api/v3/SourcesApiV3Resource.java | 2 +- .../worker/service/api/Component.java | 5 +-- .../worker/service/api/Functions.java | 3 +- .../worker/rest/api/FunctionsImplTest.java | 32 +++++++++++++++---- 12 files changed, 52 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 9c1575fc8ed4d..6eb130189b08b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -667,7 +667,7 @@ public void startFunction( @Consumes(MediaType.MULTIPART_FORM_DATA) public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("path") String path) { - functions().uploadFunction(uploadedInputStream, path, clientAppId()); + functions().uploadFunction(uploadedInputStream, path, clientAppId(), clientAuthData()); } @GET @@ -735,6 +735,6 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan final @FormDataParam("delete") boolean delete) { functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream, - delete, uri.getRequestUri(), clientAppId()); + delete, uri.getRequestUri(), clientAppId(), clientAuthData()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java index d79eeeef97249..5d57723eb03a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java @@ -554,6 +554,6 @@ public List getSinkConfigDefinition( }) @Path("/reloadBuiltInSinks") public void reloadSinks() { - sinks().reloadConnectors(clientAppId()); + sinks().reloadConnectors(clientAppId(), clientAuthData()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java index 1d0f555cec26d..2d951b0e77d72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java @@ -506,6 +506,6 @@ public List getSourceConfigDefinition( }) @Path("/reloadBuiltInSources") public void reloadSources() { - sources().reloadConnectors(clientAppId()); + sources().reloadConnectors(clientAppId(), clientAuthData()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index aff7d608214e4..1c442a49a40f4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -901,13 +901,13 @@ public List getListOfConnectors() { } @Override - public void reloadConnectors(String clientRole) { + public void reloadConnectors(String clientRole, AuthenticationDataSource authenticationData) { if (!isWorkerServiceAvailable()) { throwUnavailableException(); } if (worker().getWorkerConfig().isAuthorizationEnabled()) { // Only superuser has permission to do this operation. - if (!isSuperUser(clientRole)) { + if (!isSuperUser(clientRole, authenticationData)) { throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access"); } } @@ -1205,13 +1205,14 @@ public void putFunctionState(final String tenant, } @Override - public void uploadFunction(final InputStream uploadedInputStream, final String path, String clientRole) { + public void uploadFunction(final InputStream uploadedInputStream, final String path, String clientRole, + AuthenticationDataSource authenticationData) { if (!isWorkerServiceAvailable()) { throwUnavailableException(); } - if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { + if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole, authenticationData)) { throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation"); } @@ -1309,7 +1310,7 @@ public StreamingOutput downloadFunction(final String path, String clientRole, Au throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); } } else { - if (!isSuperUser(clientRole)) { + if (!isSuperUser(clientRole, clientAuthenticationDataHttps)) { throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation"); } } @@ -1441,7 +1442,7 @@ public boolean isAuthorizedRole(String tenant, String namespace, String clientRo AuthenticationDataSource authenticationData) throws PulsarAdminException { if (worker().getWorkerConfig().isAuthorizationEnabled()) { // skip authorization if client role is super-user - if (isSuperUser(clientRole)) { + if (isSuperUser(clientRole, authenticationData)) { return true; } @@ -1524,14 +1525,14 @@ protected void componentInstanceStatusRequestValidate (final String tenant, } } - public boolean isSuperUser(String clientRole) { + public boolean isSuperUser(String clientRole, AuthenticationDataSource authenticationData) { if (clientRole != null) { try { if ((worker().getWorkerConfig().getSuperUserRoles() != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) { return true; } - return worker().getAuthorizationService().isSuperUser(clientRole, null) + return worker().getAuthorizationService().isSuperUser(clientRole, authenticationData) .get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { log.warn("Time-out {} sec while checking the role {} is a super user role ", diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 5d2f8ee378631..823605213ac46 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -670,14 +670,15 @@ public void updateFunctionOnWorkerLeader(final String tenant, final InputStream uploadedInputStream, final boolean delete, URI uri, - final String clientRole) { + final String clientRole, + AuthenticationDataSource authenticationData) { if (!isWorkerServiceAvailable()) { throwUnavailableException(); } if (worker().getWorkerConfig().isAuthorizationEnabled()) { - if (!isSuperUser(clientRole)) { + if (!isSuperUser(clientRole, authenticationData)) { log.error("{}/{}/{} Client [{}] is not superuser to update on worker leader {}", tenant, namespace, functionName, clientRole, ComponentTypeUtils.toString(componentType)); throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation"); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java index 9ed6afb96943c..26479baea38b4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java @@ -195,7 +195,7 @@ public Response stopFunctionInstances(String tenant, String namespace, String fu @Override public Response uploadFunction(InputStream uploadedInputStream, String path, String clientRole) { - delegate.uploadFunction(uploadedInputStream, path, clientRole); + delegate.uploadFunction(uploadedInputStream, path, clientRole, null); return Response.ok().build(); } @@ -248,4 +248,4 @@ private InstanceCommunication.FunctionStatus toProto( return functionStatus; } -} \ No newline at end of file +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java index 6d233a84df216..6d037c9855d00 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java @@ -312,7 +312,7 @@ public void startFunction(final @PathParam("tenant") String tenant, @Consumes(MediaType.MULTIPART_FORM_DATA) public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("path") String path) { - functions().uploadFunction(uploadedInputStream, path, clientAppId()); + functions().uploadFunction(uploadedInputStream, path, clientAppId(), clientAuthData()); } @GET @@ -386,6 +386,6 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan final @FormDataParam("delete") boolean delete) { functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream, - delete, uri.getRequestUri(), clientAppId()); + delete, uri.getRequestUri(), clientAppId(), clientAuthData()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java index af56d247966aa..1b3811e1379b1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java @@ -274,6 +274,6 @@ public List getSinkConfigDefinition( }) @Path("/reloadBuiltInSinks") public void reloadSinks() { - sinks().reloadConnectors(clientAppId()); + sinks().reloadConnectors(clientAppId(), clientAuthData()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java index db8eb2460c3e3..29fed9a4af3c9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java @@ -290,6 +290,6 @@ public List getSourceConfigDefinition( }) @Path("/reloadBuiltInSources") public void reloadSources() { - sources().reloadConnectors(clientAppId()); + sources().reloadConnectors(clientAppId(), clientAuthData()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java index 6139a5c6e1067..94d2a49734339 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java @@ -139,7 +139,8 @@ void putFunctionState(final String tenant, void uploadFunction(final InputStream uploadedInputStream, final String path, - String clientRole); + String clientRole, + final AuthenticationDataSource clientAuthenticationDataHttps); StreamingOutput downloadFunction(String path, String clientRole, @@ -154,5 +155,5 @@ StreamingOutput downloadFunction(String tenant, List getListOfConnectors(); - void reloadConnectors(String clientRole); + void reloadConnectors(String clientRole, AuthenticationDataSource clientAuthenticationDataHttps); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java index 9f68439919035..aa874ab205f1d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java @@ -61,7 +61,8 @@ void updateFunctionOnWorkerLeader(final String tenant, final InputStream uploadedInputStream, final boolean delete, URI uri, - final String clientRole); + final String clientRole, + final AuthenticationDataSource clientAuthenticationDataHttps); FunctionStatus getFunctionStatus(final String tenant, final String namespace, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index 03c1a149ed8c8..76586b7a7ab5d 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker.rest.api; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.broker.authentication.AuthenticationDataHttp; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.client.admin.Namespaces; @@ -55,6 +56,7 @@ import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; +import javax.servlet.http.HttpServletRequest; import java.io.InputStream; import java.util.Collections; import java.util.HashMap; @@ -261,9 +263,10 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep // test pulsar super user final String pulsarSuperUser = "pulsarSuperUser"; - when(authorizationService.isSuperUser(pulsarSuperUser, null)).thenReturn(CompletableFuture.completedFuture(true)); + when(authorizationService.isSuperUser(eq(pulsarSuperUser), any())) + .thenReturn(CompletableFuture.completedFuture(true)); assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", pulsarSuperUser, authenticationDataSource)); - assertTrue(functionImpl.isSuperUser(pulsarSuperUser)); + assertTrue(functionImpl.isSuperUser(pulsarSuperUser, null)); // test normal user functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); @@ -274,7 +277,7 @@ public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedExcep when(admin.tenants()).thenReturn(tenants); when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin); when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false)); - when(authorizationService.isSuperUser("test-user", null)).thenReturn(CompletableFuture.completedFuture(false)); + when(authorizationService.isSuperUser(eq("test-user"), any())).thenReturn(CompletableFuture.completedFuture(false)); assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource)); // if user is tenant admin @@ -333,10 +336,10 @@ public void testIsSuperUser() throws PulsarAdminException { }); AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class); - assertTrue(functionImpl.isSuperUser(superUser)); + assertTrue(functionImpl.isSuperUser(superUser, null)); - assertFalse(functionImpl.isSuperUser("normal-user")); - assertFalse(functionImpl.isSuperUser( null)); + assertFalse(functionImpl.isSuperUser("normal-user", null)); + assertFalse(functionImpl.isSuperUser( null, null)); // test super roles is null and it's not a pulsar super user when(authorizationService.isSuperUser(superUser, null)) @@ -345,7 +348,22 @@ public void testIsSuperUser() throws PulsarAdminException { workerConfig = new WorkerConfig(); workerConfig.setAuthorizationEnabled(true); doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig(); - assertFalse(functionImpl.isSuperUser(superUser)); + assertFalse(functionImpl.isSuperUser(superUser, null)); + + // test super role is null but the auth datasource contains superuser + when(authorizationService.isSuperUser(anyString(), any(AuthenticationDataSource.class))) + .thenAnswer((invocationOnMock -> { + AuthenticationDataSource authData = invocationOnMock.getArgument(1, AuthenticationDataSource.class); + String user = authData.getHttpHeader("mockedUser"); + return CompletableFuture.completedFuture(superUser.equals(user)); + })); + AuthenticationDataSource authData = mock(AuthenticationDataSource.class); + when(authData.getHttpHeader("mockedUser")).thenReturn(superUser); + assertTrue(functionImpl.isSuperUser("non-superuser", authData)); + + AuthenticationDataSource nonSuperuserAuthData = mock(AuthenticationDataSource.class); + when(nonSuperuserAuthData.getHttpHeader("mockedUser")).thenReturn("non-superuser"); + assertFalse(functionImpl.isSuperUser("non-superuser", nonSuperuserAuthData)); } public static FunctionConfig createDefaultFunctionConfig() {