From 05c4af9cc8179ea92586dd342bdd0f966fdb5fc7 Mon Sep 17 00:00:00 2001 From: Grace Chrstina Date: Fri, 20 Mar 2020 14:33:04 +0800 Subject: [PATCH 01/10] Add project service tests --- .../core/service/ProjectServiceTest.java | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 core/src/test/java/feast/core/service/ProjectServiceTest.java diff --git a/core/src/test/java/feast/core/service/ProjectServiceTest.java b/core/src/test/java/feast/core/service/ProjectServiceTest.java new file mode 100644 index 00000000000..a36bd322955 --- /dev/null +++ b/core/src/test/java/feast/core/service/ProjectServiceTest.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import feast.core.auth.authorization.AuthorizationProvider; +import feast.core.config.FeastProperties; +import feast.core.config.FeastProperties.SecurityProperties; +import feast.core.dao.ProjectRepository; +import feast.core.model.Project; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; + +public class ProjectServiceTest { + + @Mock private ProjectRepository projectRepository; + + @Rule public final ExpectedException expectedException = ExpectedException.none(); + + private ProjectService projectService; + + @Before + public void setUp() { + initMocks(this); + projectRepository = mock(ProjectRepository.class); + FeastProperties.SecurityProperties.AuthorizationProperties authProp = + new FeastProperties.SecurityProperties.AuthorizationProperties(); + authProp.setEnabled(false); + FeastProperties.SecurityProperties sp = new SecurityProperties(); + sp.setAuthorization(authProp); + FeastProperties feastProperties = new FeastProperties(); + feastProperties.setSecurity(sp); + projectService = + new ProjectService(feastProperties, projectRepository, mock(AuthorizationProvider.class)); + } + + @Test + public void shouldCreateProjectIfItDoesntExist() { + String projectName = "project1"; + Project project = new Project(projectName); + when(projectRepository.saveAndFlush(any(Project.class))).thenReturn(project); + projectService.createProject(projectName); + verify(projectRepository, times(1)).saveAndFlush(any()); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldNotCreateProjectIfItExist() { + String projectName = "project1"; + when(projectRepository.existsById(projectName)).thenReturn(true); + projectService.createProject(projectName); + } + + @Test + public void shouldArchiveProjectIfItExists() { + String projectName = "project1"; + when(projectRepository.findById(projectName)).thenReturn(Optional.of(new Project(projectName))); + projectService.archiveProject(projectName); + verify(projectRepository, times(1)).saveAndFlush(any(Project.class)); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldNotArchiveProjectIfItIsAlreadyArchived() { + String projectName = "project1"; + when(projectRepository.findById(projectName)).thenReturn(Optional.empty()); + projectService.archiveProject(projectName); + } + + @Test + public void shouldListProjects() { + String projectName = "project1"; + Project project = new Project(projectName); + List expected = Arrays.asList(project); + when(projectRepository.findAllByArchivedIsFalse()).thenReturn(expected); + List actual = projectService.listProjects(); + Assert.assertEquals(expected, actual); + } +} From 5991fd3cefcec64641a1ea2f5363a245e36814c2 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Fri, 20 Mar 2020 14:44:44 +0800 Subject: [PATCH 02/10] Add authentication and authorization --- core/pom.xml | 461 ++++++++++-------- .../GoogleOpenIDAuthenticationProvider.java | 66 +++ .../authorization/AuthorizationProvider.java | 34 ++ .../Keto/KetoAuthorizationProvider.java | 108 ++++ .../config/CoreGRpcServerBuilderConfig.java | 30 -- .../feast/core/config/FeastProperties.java | 37 ++ .../feast/core/config/SecurityConfig.java | 144 ++++++ .../java/feast/core/grpc/CoreServiceImpl.java | 26 +- ...gementService.java => ProjectService.java} | 43 +- core/src/main/resources/application.yml | 28 +- .../feast/core/grpc/CoreServiceAuthTest.java | 147 ++++++ sdk/python/feast/client.py | 196 ++++---- sdk/python/feast/config.py | 3 +- sdk/python/feast/constants.py | 18 +- sdk/python/feast/grpc/auth.py | 54 ++ sdk/python/feast/grpc/grpc.py | 53 ++ sdk/python/tests/feast_core_server.py | 31 ++ sdk/python/tests/test_client.py | 110 ++++- 18 files changed, 1217 insertions(+), 372 deletions(-) create mode 100644 core/src/main/java/feast/core/auth/authentication/GoogleOID/GoogleOpenIDAuthenticationProvider.java create mode 100644 core/src/main/java/feast/core/auth/authorization/AuthorizationProvider.java create mode 100644 core/src/main/java/feast/core/auth/authorization/Keto/KetoAuthorizationProvider.java delete mode 100644 core/src/main/java/feast/core/config/CoreGRpcServerBuilderConfig.java create mode 100644 core/src/main/java/feast/core/config/SecurityConfig.java rename core/src/main/java/feast/core/service/{AccessManagementService.java => ProjectService.java} (55%) create mode 100644 core/src/test/java/feast/core/grpc/CoreServiceAuthTest.java create mode 100644 sdk/python/feast/grpc/auth.py create mode 100644 sdk/python/feast/grpc/grpc.py diff --git a/core/pom.xml b/core/pom.xml index 7961b45074b..1fabf3ce4d1 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -18,194 +18,275 @@ - 4.0.0 - - - dev.feast - feast-parent - ${revision} - - - Feast Core - Feature registry and ingestion coordinator - feast-core - - - - - org.springframework.boot - spring-boot-maven-plugin - - false - - - - - - - - dev.feast - feast-ingestion - ${project.version} - - - - - org.springframework.boot - spring-boot-devtools - true - - - - javax.inject - javax.inject - 1 - - - - org.springframework.boot - spring-boot-starter-web - - - - org.springframework.boot - spring-boot-starter-log4j2 - - - org.apache.logging.log4j - log4j-web - - - - io.github.lognet - grpc-spring-boot-starter - - - - org.springframework.boot - spring-boot-starter-data-jpa - - - - org.springframework.boot - spring-boot-starter-actuator - - - - - org.springframework.boot - spring-boot-configuration-processor - - - - - io.grpc - grpc-services - - - - io.grpc - grpc-stub - - - - com.google.protobuf - protobuf-java-util - - - - - com.google.guava - guava - - - - com.google.code.gson - gson - 2.8.5 - - - com.google.api-client - google-api-client - 1.27.0 - - - com.google.apis - google-api-services-dataflow - v1b3-rev266-1.25.0 - - - - - org.hibernate - hibernate-core - 5.3.6.Final - - - - - org.postgresql - postgresql - provided - true - - - - org.apache.kafka - kafka-clients - - - - - org.projectlombok - lombok - - - - org.hamcrest - hamcrest-library - - - - io.prometheus - simpleclient - - - - io.prometheus - simpleclient_servlet - - - - - com.jayway.jsonpath - json-path-assert - 2.2.0 - test - - - org.mockito - mockito-core - 2.23.0 - test - - - - org.springframework.boot - spring-boot-test - test - - - org.springframework.boot - spring-boot-test-autoconfigure - test - - - - javax.xml.bind - jaxb-api - - - + 4.0.0 + + + dev.feast + feast-parent + ${revision} + + + Feast Core + Feature registry and ingestion coordinator + feast-core + + + + + org.springframework.boot + spring-boot-maven-plugin + + false + + + + + + + + dev.feast + feast-ingestion + ${project.version} + + + + + org.springframework.boot + spring-boot-devtools + true + + + + javax.inject + javax.inject + 1 + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-log4j2 + + + org.apache.logging.log4j + log4j-web + + + org.springframework.security + spring-security-core + 5.3.0.RELEASE + + + org.springframework.security + spring-security-config + 5.3.0.RELEASE + + + org.springframework.security.oauth + spring-security-oauth2 + 2.4.0.RELEASE + + + org.springframework.security + spring-security-oauth2-client + 5.3.0.RELEASE + + + org.springframework.security + spring-security-web + 5.3.0.RELEASE + + + org.springframework.security + spring-security-oauth2-resource-server + 5.3.0.RELEASE + + + org.springframework.security + spring-security-oauth2-jose + 5.3.0.RELEASE + + + net.devh + grpc-server-spring-boot-starter + 2.4.0.RELEASE + + + com.nimbusds + nimbus-jose-jwt + 8.2.1 + + + org.springframework.security + spring-security-oauth2-core + 5.3.0.RELEASE + + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + org.springframework.boot + spring-boot-configuration-processor + + + + + io.grpc + grpc-services + + + + io.grpc + grpc-stub + + + + com.google.protobuf + protobuf-java-util + + + + + com.google.guava + guava + + + + com.google.code.gson + gson + 2.8.5 + + + com.google.api-client + google-api-client + 1.27.0 + + + com.google.apis + google-api-services-dataflow + v1b3-rev266-1.25.0 + + + + + org.hibernate + hibernate-core + 5.3.6.Final + + + + + org.postgresql + postgresql + provided + true + + + + org.apache.kafka + kafka-clients + + + + + org.projectlombok + lombok + + + + org.hamcrest + hamcrest-library + + + + io.prometheus + simpleclient + + + + io.prometheus + simpleclient_servlet + + + com.google.api.client + google-api-client-googleapis-auth-oauth + 1.2.3-alpha + + + + com.auth0 + jwks-rsa + 0.11.0 + + + + com.auth0 + java-jwt + 3.10.0 + + + + sh.ory.keto + keto-client + 0.4.4-alpha.1 + + + + + com.jayway.jsonpath + json-path-assert + 2.2.0 + test + + + org.mockito + mockito-core + 2.23.0 + test + + + + org.springframework.boot + spring-boot-test + test + + + org.springframework.boot + spring-boot-test-autoconfigure + test + + + + javax.xml.bind + jaxb-api + + + org.springframework + spring-test + 5.1.3.RELEASE + test + + + org.junit.jupiter + junit-jupiter + RELEASE + test + + + diff --git a/core/src/main/java/feast/core/auth/authentication/GoogleOID/GoogleOpenIDAuthenticationProvider.java b/core/src/main/java/feast/core/auth/authentication/GoogleOID/GoogleOpenIDAuthenticationProvider.java new file mode 100644 index 00000000000..1e7c1f1654e --- /dev/null +++ b/core/src/main/java/feast/core/auth/authentication/GoogleOID/GoogleOpenIDAuthenticationProvider.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.auth.authentication.GoogleOID; + +import java.util.Map; +import org.springframework.security.authentication.AuthenticationProvider; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.AuthenticationException; +import org.springframework.security.oauth2.jwt.NimbusJwtDecoder; +import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter; +import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationProvider; + +/** + * Google Open ID Authentication Provider. This provider is used to validate incoming requests to + * Feast Core. + */ +public class GoogleOpenIDAuthenticationProvider implements AuthenticationProvider { + + private JwtAuthenticationProvider authProvider; + + /** + * @param options String K/V pair of options to initialize the AuthenticationProvider with. Only + * one option is currently configurable, the jwkEndpointURI. + */ + public GoogleOpenIDAuthenticationProvider(Map options) { + + // Endpoint used to retrieve certificates to validate JWT token + String jwkEndpointURI = "https://www.googleapis.com/oauth2/v3/certs"; + + // Provide a custom endpoint to retrieve certificates + if (options != null) { + jwkEndpointURI = options.get("jwkEndpointURI"); + } + authProvider = + new JwtAuthenticationProvider(NimbusJwtDecoder.withJwkSetUri(jwkEndpointURI).build()); + authProvider.setJwtAuthenticationConverter(new JwtAuthenticationConverter()); + } + + /** + * @param authentication Authentication object which contains a JWT to validate + * @return Returns the same authentication object after authentication + */ + @Override + public Authentication authenticate(Authentication authentication) throws AuthenticationException { + return authProvider.authenticate(authentication); + } + + @Override + public boolean supports(Class aClass) { + return authProvider.supports(aClass); + } +} diff --git a/core/src/main/java/feast/core/auth/authorization/AuthorizationProvider.java b/core/src/main/java/feast/core/auth/authorization/AuthorizationProvider.java new file mode 100644 index 00000000000..14ef91c64d3 --- /dev/null +++ b/core/src/main/java/feast/core/auth/authorization/AuthorizationProvider.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.auth.authorization; + +import org.springframework.security.access.AccessDeniedException; +import org.springframework.security.core.Authentication; + +public interface AuthorizationProvider { + + /** + * Validates whether a user is within a project. Throws an AccessDeniedException if user is not + * within the project. + * + * @param project Name of the Feast project + * @param authentication Spring Security Authentication object + * @throws AccessDeniedException + */ + void checkIfProjectMember(String project, Authentication authentication) + throws AccessDeniedException; +} diff --git a/core/src/main/java/feast/core/auth/authorization/Keto/KetoAuthorizationProvider.java b/core/src/main/java/feast/core/auth/authorization/Keto/KetoAuthorizationProvider.java new file mode 100644 index 00000000000..7f744e5c79e --- /dev/null +++ b/core/src/main/java/feast/core/auth/authorization/Keto/KetoAuthorizationProvider.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.auth.authorization.Keto; + +import feast.core.auth.authorization.AuthorizationProvider; +import java.util.List; +import java.util.Map; +import org.hibernate.validator.internal.constraintvalidators.bv.EmailValidator; +import org.springframework.security.access.AccessDeniedException; +import org.springframework.security.core.Authentication; +import org.springframework.security.oauth2.jwt.Jwt; +import sh.ory.keto.ApiClient; +import sh.ory.keto.ApiException; +import sh.ory.keto.Configuration; +import sh.ory.keto.api.EnginesApi; +import sh.ory.keto.model.OryAccessControlPolicyRole; + +/** Authorization Provider implementation for Ory Keto */ +public class KetoAuthorizationProvider implements AuthorizationProvider { + + private final EnginesApi apiInstance; + + /** + * Initializes the KetoAuthorizationProvider + * + * @param options String K/V pair of options to initialize the provider with. Expects at least a + * "basePath" for the provider URL + */ + public KetoAuthorizationProvider(Map options) { + if (options == null) { + throw new IllegalArgumentException("Cannot pass empty or null options to KetoAuth"); + } + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath(options.get("basePath")); + this.apiInstance = new EnginesApi(defaultClient); + } + + /** + * Validates whether a user is within a project. Throws an AccessDeniedException if user is not + * within the project. + * + * @param project Name of the Feast project + * @param authentication Spring Security Authentication object + * @throws AccessDeniedException + */ + public void checkIfProjectMember(String project, Authentication authentication) + throws AccessDeniedException { + String email = getEmailFromAuth(authentication); + try { + // Get all roles from Keto + List roles = + this.apiInstance.listOryAccessControlPolicyRoles("glob", 500L, 500L, email); + + // Loop through all roles the user has + for (OryAccessControlPolicyRole role : roles) { + // If the user has an admin or project specific role, return. + if (("roles:admin").equals(role.getId()) + || (String.format("roles:feast:%s-member", project)).equals(role.getId())) { + return; + } + } + } catch (ApiException e) { + System.err.println("Exception when calling EnginesApi#doOryAccessControlPoliciesAllow"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } + // Could not determine project membership, deny access. + throw new AccessDeniedException( + String.format("Access denied to project %s for user %s", project, email)); + } + + /** + * Get user email from their authentication object. + * + * @param authentication Spring Security Authentication object, used to extract user details + * @return String user email + */ + private String getEmailFromAuth(Authentication authentication) { + Jwt principle = ((Jwt) authentication.getPrincipal()); + Map claims = principle.getClaims(); + String email = (String) claims.get("email"); + + if (email.isEmpty()) { + throw new IllegalStateException("JWT does not have a valid email set."); + } + boolean validEmail = (new EmailValidator()).isValid(email, null); + if (!validEmail) { + throw new IllegalStateException("JWT contains an invalid email address"); + } + return email; + } +} diff --git a/core/src/main/java/feast/core/config/CoreGRpcServerBuilderConfig.java b/core/src/main/java/feast/core/config/CoreGRpcServerBuilderConfig.java deleted file mode 100644 index e025c7b2978..00000000000 --- a/core/src/main/java/feast/core/config/CoreGRpcServerBuilderConfig.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.core.config; - -import io.grpc.ServerBuilder; -import io.grpc.protobuf.services.ProtoReflectionService; -import org.lognet.springboot.grpc.GRpcServerBuilderConfigurer; -import org.springframework.stereotype.Component; - -@Component -public class CoreGRpcServerBuilderConfig extends GRpcServerBuilderConfigurer { - @Override - public void configure(ServerBuilder serverBuilder) { - serverBuilder.addService(ProtoReflectionService.newInstance()); - } -} diff --git a/core/src/main/java/feast/core/config/FeastProperties.java b/core/src/main/java/feast/core/config/FeastProperties.java index 1887caf5e64..a47abc4194e 100644 --- a/core/src/main/java/feast/core/config/FeastProperties.java +++ b/core/src/main/java/feast/core/config/FeastProperties.java @@ -29,6 +29,7 @@ public class FeastProperties { private String version; private JobProperties jobs; private StreamProperties stream; + private SecurityProperties security; @Getter @Setter @@ -64,4 +65,40 @@ public static class MetricsProperties { private String host; private int port; } + + @Getter + @Setter + public static class SecurityProperties { + + private AuthenticationProperties authentication; + private AuthorizationProperties authorization; + + @Getter + @Setter + public static class AuthenticationProperties { + + // Enable authentication + private boolean enabled; + + // Named authentication provider to use + private String provider; + + // K/V options to initialize the provider with + private Map options; + } + + @Getter + @Setter + public static class AuthorizationProperties { + + // Enable authorization. Authentication must be enabled if authorization is enabled. + private boolean enabled; + + // Named authorization provider to use. + private String provider; + + // K/V options to initialize the provider with + private Map options; + } + } } diff --git a/core/src/main/java/feast/core/config/SecurityConfig.java b/core/src/main/java/feast/core/config/SecurityConfig.java new file mode 100644 index 00000000000..855d1b1354c --- /dev/null +++ b/core/src/main/java/feast/core/config/SecurityConfig.java @@ -0,0 +1,144 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.config; + +import feast.core.CoreServiceGrpc; +import feast.core.auth.authentication.GoogleOID.GoogleOpenIDAuthenticationProvider; +import feast.core.auth.authorization.AuthorizationProvider; +import feast.core.auth.authorization.Keto.KetoAuthorizationProvider; +import feast.core.config.FeastProperties.SecurityProperties; +import java.util.ArrayList; +import java.util.List; +import net.devh.boot.grpc.server.security.authentication.BearerAuthenticationReader; +import net.devh.boot.grpc.server.security.authentication.GrpcAuthenticationReader; +import net.devh.boot.grpc.server.security.check.AccessPredicate; +import net.devh.boot.grpc.server.security.check.AccessPredicateVoter; +import net.devh.boot.grpc.server.security.check.GrpcSecurityMetadataSource; +import net.devh.boot.grpc.server.security.check.ManualGrpcSecurityMetadataSource; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.security.access.AccessDecisionManager; +import org.springframework.security.access.AccessDecisionVoter; +import org.springframework.security.access.vote.UnanimousBased; +import org.springframework.security.authentication.AuthenticationManager; +import org.springframework.security.authentication.AuthenticationProvider; +import org.springframework.security.authentication.ProviderManager; +import org.springframework.security.oauth2.server.resource.BearerTokenAuthenticationToken; + +@Configuration +public class SecurityConfig { + + private final SecurityProperties securityProperties; + + public SecurityConfig(FeastProperties feastProperties) { + this.securityProperties = feastProperties.getSecurity(); + } + + /** + * Initializes an AuthenticationManager if authentication has been enabled. + * + * @return AuthenticationManager + */ + @Bean + @ConditionalOnProperty(prefix = "feast.security.authentication", name = "enabled") + AuthenticationManager authenticationManager() { + final List providers = new ArrayList<>(); + + if (securityProperties.getAuthentication().isEnabled()) { + switch (securityProperties.getAuthentication().getProvider()) { + case "GoogleOpenID": + providers.add( + new GoogleOpenIDAuthenticationProvider( + securityProperties.getAuthentication().getOptions())); + break; + default: + throw new IllegalArgumentException( + "Please configure an Authentication Provider if you have enabled authentication."); + } + } + return new ProviderManager(providers); + } + + /** + * Creates an AuthenticationReader that the AuthenticationManager will use to authenticate + * requests + * + * @return GrpcAuthenticationReader + */ + @Bean + @ConditionalOnProperty(prefix = "feast.security.authentication", name = "enabled") + GrpcAuthenticationReader authenticationReader() { + return new BearerAuthenticationReader(BearerTokenAuthenticationToken::new); + } + + /** + * Creates a SecurityMetadataSource when authentication is enabled. This allows for the + * configuration of endpoint level security rules. + * + * @return GrpcSecurityMetadataSource + */ + @Bean + @ConditionalOnProperty(prefix = "feast.security.authentication", name = "enabled") + GrpcSecurityMetadataSource grpcSecurityMetadataSource() { + final ManualGrpcSecurityMetadataSource source = new ManualGrpcSecurityMetadataSource(); + + // Authentication is enabled for all gRPC endpoints + source.setDefault(AccessPredicate.authenticated()); + + // The following endpoints allow unauthenticated access + source.set(CoreServiceGrpc.getGetFeastCoreVersionMethod(), AccessPredicate.permitAll()); + + return source; + } + + /** + * Creates an AccessDecisionManager if authorization is enabled. This object determines the policy + * used to make authorization decisions. + * + * @return AccessDecisionManager + */ + @Bean + @ConditionalOnProperty(prefix = "feast.security.authorization", name = "enabled") + AccessDecisionManager accessDecisionManager() { + final List> voters = new ArrayList<>(); + voters.add(new AccessPredicateVoter()); + return new UnanimousBased(voters); + } + + /** + * Creates an AuthorizationProvider based on Feast configuration. This provider is available + * through the security service. + * + * @return AuthorizationProvider used to validate access to Feast resources. + */ + @Bean + @ConditionalOnProperty(prefix = "feast.security.authorization", name = "enabled") + AuthorizationProvider authorizationProvider() { + if (securityProperties.getAuthentication().isEnabled() + && securityProperties.getAuthorization().isEnabled()) { + switch (securityProperties.getAuthorization().getProvider()) { + case "KetoAuthorization": + return new KetoAuthorizationProvider(securityProperties.getAuthorization().getOptions()); + default: + throw new IllegalArgumentException( + "Please configure an Authorization Provider if you have enabled authorization."); + } + } + return null; + } +} diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 661bbe24039..7d8c2b4e409 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -39,7 +39,7 @@ import feast.core.exception.RetrievalException; import feast.core.grpc.interceptors.MonitoringInterceptor; import feast.core.model.Project; -import feast.core.service.AccessManagementService; +import feast.core.service.ProjectService; import feast.core.service.SpecService; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -47,21 +47,22 @@ import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.lognet.springboot.grpc.GRpcService; +import net.devh.boot.grpc.server.service.GrpcService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.core.context.SecurityContextHolder; /** Implementation of the feast core GRPC service. */ @Slf4j -@GRpcService(interceptors = {MonitoringInterceptor.class}) +@GrpcService(interceptors = {MonitoringInterceptor.class}) public class CoreServiceImpl extends CoreServiceImplBase { private SpecService specService; - private AccessManagementService accessManagementService; + private ProjectService projectService; @Autowired - public CoreServiceImpl(SpecService specService, AccessManagementService accessManagementService) { + public CoreServiceImpl(SpecService specService, ProjectService projectService) { this.specService = specService; - this.accessManagementService = accessManagementService; + this.projectService = projectService; } @Override @@ -116,6 +117,10 @@ public void listStores( @Override public void applyFeatureSet( ApplyFeatureSetRequest request, StreamObserver responseObserver) { + + projectService.checkIfProjectMember( + SecurityContextHolder.getContext(), request.getFeatureSet().getSpec().getProject()); + try { ApplyFeatureSetResponse response = specService.applyFeatureSet(request.getFeatureSet()); responseObserver.onNext(response); @@ -152,7 +157,7 @@ public void updateStore( public void createProject( CreateProjectRequest request, StreamObserver responseObserver) { try { - accessManagementService.createProject(request.getName()); + projectService.createProject(request.getName()); responseObserver.onNext(CreateProjectResponse.getDefaultInstance()); responseObserver.onCompleted(); } catch (Exception e) { @@ -165,8 +170,11 @@ public void createProject( @Override public void archiveProject( ArchiveProjectRequest request, StreamObserver responseObserver) { + + projectService.checkIfProjectMember(SecurityContextHolder.getContext(), request.getName()); + try { - accessManagementService.archiveProject(request.getName()); + projectService.archiveProject(request.getName()); responseObserver.onNext(ArchiveProjectResponse.getDefaultInstance()); responseObserver.onCompleted(); } catch (Exception e) { @@ -180,7 +188,7 @@ public void archiveProject( public void listProjects( ListProjectsRequest request, StreamObserver responseObserver) { try { - List projects = accessManagementService.listProjects(); + List projects = projectService.listProjects(); responseObserver.onNext( ListProjectsResponse.newBuilder() .addAllProjects(projects.stream().map(Project::getName).collect(Collectors.toList())) diff --git a/core/src/main/java/feast/core/service/AccessManagementService.java b/core/src/main/java/feast/core/service/ProjectService.java similarity index 55% rename from core/src/main/java/feast/core/service/AccessManagementService.java rename to core/src/main/java/feast/core/service/ProjectService.java index df92750e94f..64828f84a81 100644 --- a/core/src/main/java/feast/core/service/AccessManagementService.java +++ b/core/src/main/java/feast/core/service/ProjectService.java @@ -16,24 +16,45 @@ */ package feast.core.service; +import feast.core.auth.authorization.AuthorizationProvider; +import feast.core.config.FeastProperties; +import feast.core.config.FeastProperties.SecurityProperties; import feast.core.dao.ProjectRepository; import feast.core.model.Project; import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContext; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; @Slf4j @Service -public class AccessManagementService { +public class ProjectService { + private SecurityProperties securityProperties; + private AuthorizationProvider authorizationProvider; private ProjectRepository projectRepository; + public ProjectService( + FeastProperties feastProperties, + ProjectRepository projectRepository, + AuthorizationProvider authorizationProvider) { + this.projectRepository = projectRepository; + this.authorizationProvider = authorizationProvider; + this.securityProperties = feastProperties.getSecurity(); + } + @Autowired - public AccessManagementService(ProjectRepository projectRepository) { + public ProjectService( + FeastProperties feastProperties, + ProjectRepository projectRepository, + ObjectProvider authorizationProvider) { this.projectRepository = projectRepository; + this.authorizationProvider = authorizationProvider.getIfUnique(); + this.securityProperties = feastProperties.getSecurity(); } /** @@ -41,7 +62,6 @@ public AccessManagementService(ProjectRepository projectRepository) { * * @param name Name of project to be created */ - @Transactional public void createProject(String name) { if (projectRepository.existsById(name)) { throw new IllegalArgumentException(String.format("Project already exists: %s", name)); @@ -55,7 +75,6 @@ public void createProject(String name) { * * @param name Name of the project to be archived */ - @Transactional public void archiveProject(String name) { Optional project = projectRepository.findById(name); if (!project.isPresent()) { @@ -74,4 +93,18 @@ public void archiveProject(String name) { public List listProjects() { return projectRepository.findAllByArchivedIsFalse(); } + + /** + * Determine whether a user belongs to a Project + * + * @param securityContext User's Spring Security Context. Used to identify user. + * @param project Name of the project for which membership should be tested. + */ + public void checkIfProjectMember(SecurityContext securityContext, String project) { + Authentication authentication = securityContext.getAuthentication(); + if (!this.securityProperties.getAuthorization().isEnabled()) { + return; + } + this.authorizationProvider.checkIfProjectMember(project, authentication); + } } diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index dc78719f22e..214f97047ba 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -15,13 +15,6 @@ # # -grpc: - # The port number Feast Serving GRPC service should listen on - port: 6565 - # This allows client to discover GRPC endpoints easily - # https://github.com/grpc/grpc-java/blob/master/documentation/server-reflection-tutorial.md - enable-reflection: true - feast: # version: @project.version@ jobs: @@ -53,6 +46,25 @@ feast: replicationFactor: 1 partitions: 1 + security: + authentication: + enabled: false + provider: GoogleOpenID + authorization: + enabled: false + provider: KetoAuthorization + options: + basePath: http://localhost:3000 + +grpc: + server: + # The port that Feast Core gRPC service listens on + port: 6565 + security: + enabled: false + certificateChainPath: server.crt + privateKeyPath: server.key + spring: jpa: properties.hibernate: @@ -67,6 +79,8 @@ spring: url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_DATABASE:postgres} username: ${DB_USERNAME:postgres} password: ${DB_PASSWORD:password} + application: + name: local-grpc-server management: metrics: diff --git a/core/src/test/java/feast/core/grpc/CoreServiceAuthTest.java b/core/src/test/java/feast/core/grpc/CoreServiceAuthTest.java new file mode 100644 index 00000000000..6ca91c85733 --- /dev/null +++ b/core/src/test/java/feast/core/grpc/CoreServiceAuthTest.java @@ -0,0 +1,147 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.grpc; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.CoreServiceProto.ApplyFeatureSetRequest; +import feast.core.CoreServiceProto.ApplyFeatureSetResponse; +import feast.core.FeatureSetProto; +import feast.core.FeatureSetProto.FeatureSetStatus; +import feast.core.SourceProto.KafkaSourceConfig; +import feast.core.SourceProto.SourceType; +import feast.core.auth.authorization.AuthorizationProvider; +import feast.core.config.FeastProperties; +import feast.core.config.FeastProperties.SecurityProperties; +import feast.core.dao.ProjectRepository; +import feast.core.model.FeatureSet; +import feast.core.model.Field; +import feast.core.model.Source; +import feast.core.service.ProjectService; +import feast.core.service.SpecService; +import feast.types.ValueProto.ValueType.Enum; +import io.grpc.internal.testing.StreamRecorder; +import java.sql.Date; +import java.time.Instant; +import java.util.Arrays; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.security.access.AccessDeniedException; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContext; +import org.springframework.security.core.context.SecurityContextHolder; + +@SpringBootTest +class CoreServiceAuthTest { + + private CoreServiceImpl coreService; + private SpecService specService; + private ProjectService projectService; + private ProjectRepository projectRepository; + private AuthorizationProvider authProvider; + + CoreServiceAuthTest() { + specService = mock(SpecService.class); + projectRepository = mock(ProjectRepository.class); + authProvider = mock(AuthorizationProvider.class); + FeastProperties.SecurityProperties.AuthorizationProperties authProp = + new FeastProperties.SecurityProperties.AuthorizationProperties(); + authProp.setEnabled(true); + FeastProperties.SecurityProperties sp = new SecurityProperties(); + sp.setAuthorization(authProp); + FeastProperties feastProperties = new FeastProperties(); + feastProperties.setSecurity(sp); + projectService = new ProjectService(feastProperties, projectRepository, authProvider); + coreService = new CoreServiceImpl(specService, projectService); + } + + @Test + void cantApplyFeatureSetIfNotProjectMember() throws InvalidProtocolBufferException { + + String project = "project1"; + Authentication auth = mock(Authentication.class); + SecurityContext context = mock(SecurityContext.class); + SecurityContextHolder.setContext(context); + when(context.getAuthentication()).thenReturn(auth); + + doThrow(AccessDeniedException.class).when(authProvider).checkIfProjectMember(project, auth); + + StreamRecorder responseObserver = StreamRecorder.create(); + FeatureSetProto.FeatureSet incomingFeatureSet = newDummyFeatureSet("f2", 1, project).toProto(); + FeatureSetProto.FeatureSetSpec incomingFeatureSetSpec = + incomingFeatureSet.getSpec().toBuilder().clearVersion().build(); + FeatureSetProto.FeatureSet spec = + FeatureSetProto.FeatureSet.newBuilder().setSpec(incomingFeatureSetSpec).build(); + ApplyFeatureSetRequest request = + ApplyFeatureSetRequest.newBuilder().setFeatureSet(spec).build(); + + assertThrows( + AccessDeniedException.class, () -> coreService.applyFeatureSet(request, responseObserver)); + } + + @Test + void canApplyFeatureSetIfProjectMember() throws InvalidProtocolBufferException { + + String project = "project1"; + Authentication auth = mock(Authentication.class); + SecurityContext context = mock(SecurityContext.class); + SecurityContextHolder.setContext(context); + when(context.getAuthentication()).thenReturn(auth); + + StreamRecorder responseObserver = StreamRecorder.create(); + FeatureSetProto.FeatureSet incomingFeatureSet = newDummyFeatureSet("f2", 1, project).toProto(); + FeatureSetProto.FeatureSetSpec incomingFeatureSetSpec = + incomingFeatureSet.getSpec().toBuilder().clearVersion().build(); + FeatureSetProto.FeatureSet spec = + FeatureSetProto.FeatureSet.newBuilder().setSpec(incomingFeatureSetSpec).build(); + ApplyFeatureSetRequest request = + ApplyFeatureSetRequest.newBuilder().setFeatureSet(spec).build(); + + coreService.applyFeatureSet(request, responseObserver); + } + + private FeatureSet newDummyFeatureSet(String name, int version, String project) { + Field feature = new Field("feature", Enum.INT64); + Field entity = new Field("entity", Enum.STRING); + + Source defaultSource = + new Source( + SourceType.KAFKA, + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("my-topic") + .build(), + true); + + FeatureSet fs = + new FeatureSet( + name, + project, + version, + 100L, + Arrays.asList(entity), + Arrays.asList(feature), + defaultSource, + FeatureSetStatus.STATUS_READY); + fs.setCreated(Date.from(Instant.ofEpochSecond(10L))); + return fs; + } +} diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 2a0b636b373..16b01b86a51 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - import logging import os import shutil @@ -27,13 +26,18 @@ import pyarrow as pa import pyarrow.parquet as pq +import feast.grpc.auth as feast_auth from feast.config import Config from feast.constants import ( - CONFIG_CORE_SECURE_KEY, + CONFIG_CORE_ENABLE_AUTH_KEY, + CONFIG_CORE_ENABLE_SSL_KEY, + CONFIG_CORE_SERVER_SSL_CERT_KEY, CONFIG_CORE_URL_KEY, CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, CONFIG_PROJECT_KEY, - CONFIG_SERVING_SECURE_KEY, + CONFIG_SERVING_ENABLE_AUTH_KEY, + CONFIG_SERVING_ENABLE_SSL_KEY, + CONFIG_SERVING_SERVER_SSL_CERT_KEY, CONFIG_SERVING_URL_KEY, ) from feast.core.CoreService_pb2 import ( @@ -54,6 +58,7 @@ from feast.core.CoreService_pb2_grpc import CoreServiceStub from feast.core.FeatureSet_pb2 import FeatureSetStatus from feast.feature_set import Entity, FeatureSet +from feast.grpc.grpc import create_grpc_channel from feast.job import Job from feast.loaders.abstract_producer import get_producer from feast.loaders.file import export_source_to_staging_location @@ -84,13 +89,15 @@ class Client: def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): """ The Feast Client should be initialized with at least one service url - - Args: + Please see constants.py for configuration options. Commonly used options + or arguments include: core_url: Feast Core URL. Used to manage features serving_url: Feast Serving URL. Used to retrieve features project: Sets the active project. This field is optional. core_secure: Use client-side SSL/TLS for Core gRPC API serving_secure: Use client-side SSL/TLS for Serving gRPC API + + Args: options: Configuration options to initialize client with **kwargs: Additional keyword arguments that will be used as configuration options along with "options" @@ -100,10 +107,51 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): options = dict() self._config = Config(options={**options, **kwargs}) - self.__core_channel: grpc.Channel = None - self.__serving_channel: grpc.Channel = None self._core_service_stub: CoreServiceStub = None self._serving_service_stub: ServingServiceStub = None + self._auth_metadata = None + if self._config.getboolean(CONFIG_CORE_ENABLE_AUTH_KEY): + self._auth_metadata = feast_auth.get_auth_metadata_plugin() + + @property + def _core_service(self): + """ + Creates or returns the gRPC Feast Core Service Stub + + Returns: CoreServiceStub + """ + if not self._core_service_stub: + channel = create_grpc_channel( + url=self._config.get(CONFIG_CORE_URL_KEY), + enable_ssl=self._config.getboolean(CONFIG_CORE_ENABLE_SSL_KEY), + enable_auth=self._config.getboolean(CONFIG_CORE_ENABLE_AUTH_KEY), + ssl_server_cert_path=self._config.get(CONFIG_CORE_SERVER_SSL_CERT_KEY), + auth_metadata_plugin=self._auth_metadata, + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + ) + self._core_service_stub = CoreServiceStub(channel) + return self._core_service_stub + + @property + def _serving_service(self): + """ + Creates or returns the gRPC Feast Serving Service Stub + + Returns: ServingServiceStub + """ + if not self._serving_service_stub: + channel = create_grpc_channel( + url=self._config.get(CONFIG_SERVING_URL_KEY), + enable_ssl=self._config.getboolean(CONFIG_SERVING_ENABLE_SSL_KEY), + enable_auth=self._config.getboolean(CONFIG_SERVING_ENABLE_AUTH_KEY), + ssl_server_cert_path=self._config.get( + CONFIG_SERVING_SERVER_SSL_CERT_KEY + ), + auth_metadata_plugin=self._auth_metadata, + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + ) + self._serving_service_stub = ServingServiceStub(channel) + return self._serving_service_stub @property def core_url(self) -> str: @@ -153,7 +201,7 @@ def core_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - return self._config.getboolean(CONFIG_CORE_SECURE_KEY) + return self._config.getboolean(CONFIG_CORE_ENABLE_SSL_KEY) @core_secure.setter def core_secure(self, value: bool): @@ -163,7 +211,7 @@ def core_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._config.set(CONFIG_CORE_SECURE_KEY, value) + self._config.set(CONFIG_CORE_ENABLE_SSL_KEY, value) @property def serving_secure(self) -> bool: @@ -173,7 +221,7 @@ def serving_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - return self._config.getboolean(CONFIG_SERVING_SECURE_KEY) + return self._config.getboolean(CONFIG_SERVING_ENABLE_SSL_KEY) @serving_secure.setter def serving_secure(self, value: bool): @@ -183,7 +231,7 @@ def serving_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._config.set(CONFIG_SERVING_SECURE_KEY, value) + self._config.set(CONFIG_SERVING_ENABLE_SSL_KEY, value) def version(self): """ @@ -192,90 +240,22 @@ def version(self): result = {} if self.serving_url: - self._connect_serving() - serving_version = self._serving_service_stub.GetFeastServingInfo( + serving_version = self._serving_service.GetFeastServingInfo( GetFeastServingInfoRequest(), timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ).version result["serving"] = {"url": self.serving_url, "version": serving_version} if self.core_url: - self._connect_core() - core_version = self._core_service_stub.GetFeastCoreVersion( + core_version = self._core_service.GetFeastCoreVersion( GetFeastCoreVersionRequest(), timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + metadata=self._get_grpc_metadata(), ).version result["core"] = {"url": self.core_url, "version": core_version} return result - def _connect_core(self, skip_if_connected: bool = True): - """ - Connect to Core API - - Args: - skip_if_connected: Do not attempt to connect if already connected - """ - if skip_if_connected and self._core_service_stub: - return - - if not self.core_url: - raise ValueError("Please set Feast Core URL.") - - if self.__core_channel is None: - if self.core_secure or self.core_url.endswith(":443"): - self.__core_channel = grpc.secure_channel( - self.core_url, grpc.ssl_channel_credentials() - ) - else: - self.__core_channel = grpc.insecure_channel(self.core_url) - - try: - grpc.channel_ready_future(self.__core_channel).result( - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) - ) - except grpc.FutureTimeoutError: - raise ConnectionError( - f"Connection timed out while attempting to connect to Feast " - f"Core gRPC server {self.core_url} " - ) - else: - self._core_service_stub = CoreServiceStub(self.__core_channel) - - def _connect_serving(self, skip_if_connected=True): - """ - Connect to Serving API - - Args: - skip_if_connected: Do not attempt to connect if already connected - """ - - if skip_if_connected and self._serving_service_stub: - return - - if not self.serving_url: - raise ValueError("Please set Feast Serving URL.") - - if self.__serving_channel is None: - if self.serving_secure or self.serving_url.endswith(":443"): - self.__serving_channel = grpc.secure_channel( - self.serving_url, grpc.ssl_channel_credentials() - ) - else: - self.__serving_channel = grpc.insecure_channel(self.serving_url) - - try: - grpc.channel_ready_future(self.__serving_channel).result( - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) - ) - except grpc.FutureTimeoutError: - raise ConnectionError( - f"Connection timed out while attempting to connect to Feast " - f"Serving gRPC server {self.serving_url} " - ) - else: - self._serving_service_stub = ServingServiceStub(self.__serving_channel) - @property def project(self) -> Union[str, None]: """ @@ -303,10 +283,11 @@ def list_projects(self) -> List[str]: List of project names """ - self._connect_core() - response = self._core_service_stub.ListProjects( + + response = self._core_service.ListProjects( ListProjectsRequest(), timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + metadata=self._get_grpc_metadata(), ) # type: ListProjectsResponse return list(response.projects) @@ -318,10 +299,10 @@ def create_project(self, project: str): project: Name of project """ - self._connect_core() - self._core_service_stub.CreateProject( + self._core_service.CreateProject( CreateProjectRequest(name=project), timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + metadata=self._get_grpc_metadata(), ) # type: CreateProjectResponse def archive_project(self, project): @@ -334,10 +315,10 @@ def archive_project(self, project): project: Name of project to archive """ - self._connect_core() - self._core_service_stub.ArchiveProject( + self._core_service.ArchiveProject( ArchiveProjectRequest(name=project), timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + metadata=self._get_grpc_metadata(), ) # type: ArchiveProjectResponse if self._project == project: @@ -368,7 +349,6 @@ def _apply_feature_set(self, feature_set: FeatureSet): Args: feature_set: Feature set that will be registered """ - self._connect_core() feature_set.is_valid() feature_set_proto = feature_set.to_proto() @@ -384,9 +364,10 @@ def _apply_feature_set(self, feature_set: FeatureSet): # Convert the feature set to a request and send to Feast Core try: - apply_fs_response = self._core_service_stub.ApplyFeatureSet( + apply_fs_response = self._core_service.ApplyFeatureSet( ApplyFeatureSetRequest(feature_set=feature_set_proto), timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + metadata=self._get_grpc_metadata(), ) # type: ApplyFeatureSetResponse except grpc.RpcError as e: raise grpc.RpcError(e.details()) @@ -421,7 +402,6 @@ def list_feature_sets( Returns: List of feature sets """ - self._connect_core() if project is None: if self.project is not None: @@ -440,8 +420,8 @@ def list_feature_sets( ) # Get latest feature sets from Feast Core - feature_set_protos = self._core_service_stub.ListFeatureSets( - ListFeatureSetsRequest(filter=filter) + feature_set_protos = self._core_service.ListFeatureSets( + ListFeatureSetsRequest(filter=filter), metadata=self._get_grpc_metadata(), ) # type: ListFeatureSetsResponse # Extract feature sets and return @@ -468,7 +448,6 @@ def get_feature_set( Returns either the specified feature set, or raises an exception if none is found """ - self._connect_core() if project is None: if self.project is not None: @@ -480,10 +459,11 @@ def get_feature_set( version = 0 try: - get_feature_set_response = self._core_service_stub.GetFeatureSet( + get_feature_set_response = self._core_service.GetFeatureSet( GetFeatureSetRequest( project=project, name=name.strip(), version=int(version) - ) + ), + metadata=self._get_grpc_metadata(), ) # type: GetFeatureSetResponse except grpc.RpcError as e: raise grpc.RpcError(e.details()) @@ -548,22 +528,20 @@ def get_batch_features( >>> print(df) """ - self._connect_serving() - feature_references = _build_feature_references( feature_refs=feature_refs, default_project=default_project ) # Retrieve serving information to determine store type and # staging location - serving_info = self._serving_service_stub.GetFeastServingInfo( + serving_info = self._serving_service.GetFeastServingInfo( GetFeastServingInfoRequest(), timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ) # type: GetFeastServingInfoResponse if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH: raise Exception( - f'You are connected to a store "{self._serving_url}" which ' + f'You are connected to a store "{self.serving_url}" which ' f"does not support batch retrieval " ) @@ -605,8 +583,8 @@ def get_batch_features( ) # Retrieve Feast Job object to manage life cycle of retrieval - response = self._serving_service_stub.GetBatchFeatures(request) - return Job(response.job, self._serving_service_stub) + response = self._serving_service.GetBatchFeatures(request) + return Job(response.job, self._serving_service) def get_online_features( self, @@ -634,9 +612,8 @@ def get_online_features( Returns a list of maps where each item in the list contains the latest feature values for the provided entities """ - self._connect_serving() - return self._serving_service_stub.GetOnlineFeatures( + return self._serving_service.GetOnlineFeatures( GetOnlineFeaturesRequest( features=_build_feature_references( feature_refs=feature_refs, @@ -784,6 +761,17 @@ def ingest( return None + def _get_grpc_metadata(self): + """ + Returns a metadata tuple to attach to gRPC requests. This is primarily + used when authentication is enabled but SSL/TLS is disabled. + + Returns: Tuple of metadata to attach to each gRPC call + """ + if self._config.getboolean(CONFIG_CORE_ENABLE_AUTH_KEY) and self._auth_metadata: + return self._auth_metadata.get_signed_meta() + return () + def _build_feature_references( feature_refs: List[str], default_project: str = None diff --git a/sdk/python/feast/config.py b/sdk/python/feast/config.py index fd35b6e5d87..71800e24a78 100644 --- a/sdk/python/feast/config.py +++ b/sdk/python/feast/config.py @@ -67,7 +67,8 @@ def _init_config(path: str): def _get_feast_env_vars(): """ - Get environmental variables that start with FEAST_ + Get environmental variables that start with "FEAST_" + Returns: Dict of Feast environmental variables (stripped of prefix) """ feast_env_vars = {} diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index c4bde75404a..4ee63248bbe 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -24,11 +24,15 @@ # Feast configuration options +CONFIG_PROJECT_KEY = "project" CONFIG_CORE_URL_KEY = "core_url" +CONFIG_CORE_ENABLE_SSL_KEY = "core_enable_ssl" +CONFIG_CORE_ENABLE_AUTH_KEY = "core_enable_auth" +CONFIG_CORE_SERVER_SSL_CERT_KEY = "core_server_ssl_cert" CONFIG_SERVING_URL_KEY = "serving_url" -CONFIG_PROJECT_KEY = "project" -CONFIG_CORE_SECURE_KEY = "core_secure" -CONFIG_SERVING_SECURE_KEY = "serving_secure" +CONFIG_SERVING_ENABLE_SSL_KEY = "serving_enable_ssl" +CONFIG_SERVING_ENABLE_AUTH_KEY = "serving_enable_auth" +CONFIG_SERVING_SERVER_SSL_CERT_KEY = "serving_server_ssl_cert" CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply_key" CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( @@ -39,9 +43,13 @@ FEAST_DEFAULT_OPTIONS = { CONFIG_PROJECT_KEY: "default", CONFIG_CORE_URL_KEY: "localhost:6565", - CONFIG_CORE_SECURE_KEY: "False", + CONFIG_CORE_ENABLE_SSL_KEY: "False", + CONFIG_CORE_ENABLE_AUTH_KEY: "False", + CONFIG_CORE_SERVER_SSL_CERT_KEY: "", CONFIG_SERVING_URL_KEY: "localhost:6565", - CONFIG_SERVING_SECURE_KEY: "False", + CONFIG_SERVING_ENABLE_SSL_KEY: "False", + CONFIG_SERVING_ENABLE_AUTH_KEY: "False", + CONFIG_SERVING_SERVER_SSL_CERT_KEY: "", CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "3", CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY: "600", CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY: "600", diff --git a/sdk/python/feast/grpc/auth.py b/sdk/python/feast/grpc/auth.py new file mode 100644 index 00000000000..da8f3a4d0ee --- /dev/null +++ b/sdk/python/feast/grpc/auth.py @@ -0,0 +1,54 @@ +import grpc + + +def get_auth_metadata_plugin(): + """ + Get an Authentication Metadata Plugin. This plugin is used in gRPC to + sign requests. Please see the following URL for more details + https://grpc.github.io/grpc/python/_modules/grpc.html#AuthMetadataPlugin + + New plugins can be added to this function. For the time being we only + support Google Open ID authentication. + + Returns: Returns an implementation of grpc.AuthMetadataPlugin + """ + return GoogleOpenIDAuthMetadataPlugin() + + +class GoogleOpenIDAuthMetadataPlugin(grpc.AuthMetadataPlugin): + """A `gRPC AuthMetadataPlugin`_ that inserts the credentials into each + request. + + .. _gRPC AuthMetadataPlugin: + http://www.grpc.io/grpc/python/grpc.html#grpc.AuthMetadataPlugin + """ + + def __init__(self): + super(GoogleOpenIDAuthMetadataPlugin, self).__init__() + from google.auth.transport import requests + + self._request = requests.Request() + self._token = None + self._refresh_token() + + def get_signed_meta(self): + """ Creates a signed authorization metadata token.""" + return (("authorization", "Bearer {}".format(self._token)),) + + def _refresh_token(self): + """ Refreshes Google ID token and persists it in memory """ + from google import auth as google_auth + + credentials, _ = google_auth.default() + credentials.refresh(self._request) + self._token = credentials.id_token + + def __call__(self, context, callback): + """Passes authorization metadata into the given callback. + + Args: + context (grpc.AuthMetadataContext): The RPC context. + callback (grpc.AuthMetadataPluginCallback): The callback that will + be invoked to pass in the authorization metadata. + """ + callback(self.get_signed_meta(), None) diff --git a/sdk/python/feast/grpc/grpc.py b/sdk/python/feast/grpc/grpc.py new file mode 100644 index 00000000000..28b21437a37 --- /dev/null +++ b/sdk/python/feast/grpc/grpc.py @@ -0,0 +1,53 @@ +import grpc + + +def create_grpc_channel( + url: str, + enable_ssl: bool = False, + enable_auth: bool = False, + ssl_server_cert_path: str = None, + auth_metadata_plugin: grpc.AuthMetadataPlugin = None, + timeout: int = 3, +) -> grpc.Channel: + """ + Create a gRPC channel + Args: + url: gRPC URL to connect to + enable_ssl: Enable TLS/SSL, optionally provide a server side certificate + enable_auth: Enable user auth + ssl_server_cert_path: (optional) Path to certificate (used with + "enable SSL") + auth_metadata_plugin: Metadata plugin to use to sign requests, only used + with "enable auth" when SSL/TLS is enabled + timeout: Connection timeout to server + + Returns: Returns a grpc.Channel + """ + if not url: + raise ValueError("Unable to create gRPC channel. URL has not been defined.") + + if enable_ssl or url.endswith(":443"): + # User has provided a public key certificate + if ssl_server_cert_path: + with open(ssl_server_cert_path, "rb",) as f: + credentials = grpc.ssl_channel_credentials(f.read()) + # Guess the certificate location + else: + credentials = grpc.ssl_channel_credentials() + + # Authentication is enabled, add the metadata plugin in order to sign + # requests + if enable_auth: + credentials = grpc.composite_channel_credentials( + credentials, grpc.metadata_call_credentials(auth_metadata_plugin), + ) + channel = grpc.secure_channel(url, credentials=credentials) + else: + channel = grpc.insecure_channel(url) + try: + grpc.channel_ready_future(channel).result(timeout=timeout) + return channel + except grpc.FutureTimeoutError: + raise ConnectionError( + f"Connection timed out while attempting to connect to {url}" + ) diff --git a/sdk/python/tests/feast_core_server.py b/sdk/python/tests/feast_core_server.py index b6efe2cb6d1..aad30a62cb4 100644 --- a/sdk/python/tests/feast_core_server.py +++ b/sdk/python/tests/feast_core_server.py @@ -21,6 +21,37 @@ _logger = logging.getLogger(__name__) _ONE_DAY_IN_SECONDS = 60 * 60 * 24 +_SIGNATURE_HEADER_KEY = "authorization" + + +class DisallowAuthInterceptor(grpc.ServerInterceptor): + def __init__(self): + def abort(ignored_request, context): + context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid signature") + + self._abortion = grpc.unary_unary_rpc_method_handler(abort) + + def intercept_service(self, continuation, handler_call_details): + print(handler_call_details.invocation_metadata) + if "Bearer" in handler_call_details.invocation_metadata[0][1]: + return self._abortion + else: + return continuation(handler_call_details) + + +class AllowAuthInterceptor(grpc.ServerInterceptor): + def __init__(self): + def abort(ignored_request, context): + context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid signature") + + self._abortion = grpc.unary_unary_rpc_method_handler(abort) + + def intercept_service(self, continuation, handler_call_details): + print(handler_call_details.invocation_metadata) + if "Bearer" in handler_call_details.invocation_metadata[0][1]: + return continuation(handler_call_details) + else: + return self._abortion class CoreServicer(Core.CoreServiceServicer): diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 3c1e8bef0f0..14b017f0a75 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -47,7 +47,11 @@ from feast.source import KafkaSource from feast.types import Value_pb2 as ValueProto from feast.value_type import ValueType -from feast_core_server import CoreServicer +from feast_core_server import ( + AllowAuthInterceptor, + CoreServicer, + DisallowAuthInterceptor, +) from feast_serving_server import ServingServicer CORE_URL = "core.feast.example.com" @@ -59,24 +63,20 @@ class TestClient: @pytest.fixture - def secure_mock_client(self, mocker): + def secure_mock_client(self): client = Client( core_url=CORE_URL, serving_url=SERVING_URL, - core_secure=True, - serving_secure=True, + core_enable_ssl=True, + serving_enable_ssl=True, ) - mocker.patch.object(client, "_connect_core") - mocker.patch.object(client, "_connect_serving") client._core_url = CORE_URL client._serving_url = SERVING_URL return client @pytest.fixture - def mock_client(self, mocker): + def mock_client(self): client = Client(core_url=CORE_URL, serving_url=SERVING_URL) - mocker.patch.object(client, "_connect_core") - mocker.patch.object(client, "_connect_serving") client._core_url = CORE_URL client._serving_url = SERVING_URL return client @@ -118,12 +118,47 @@ def secure_core_server(self, server_credentials): def secure_serving_server(self, server_credentials): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) Serving.add_ServingServiceServicer_to_server(ServingServicer(), server) - server.add_secure_port("[::]:50054", server_credentials) server.start() yield server server.stop(0) + @pytest.fixture + def secure_core_server_with_auth(self, server_credentials): + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + interceptors=(AllowAuthInterceptor(),), + ) + Core.add_CoreServiceServicer_to_server(CoreServicer(), server) + server.add_secure_port("[::]:50055", server_credentials) + server.start() + yield server + server.stop(0) + + @pytest.fixture + def insecure_core_server_with_auth(self, server_credentials): + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + interceptors=(AllowAuthInterceptor(),), + ) + Core.add_CoreServiceServicer_to_server(CoreServicer(), server) + server.add_insecure_port("[::]:50056") + server.start() + yield server + server.stop(0) + + @pytest.fixture + def insecure_core_server_that_blocks_auth(self, server_credentials): + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + interceptors=(DisallowAuthInterceptor(),), + ) + Core.add_CoreServiceServicer_to_server(CoreServicer(), server) + server.add_insecure_port("[::]:50057") + server.start() + yield server + server.stop(0) + @pytest.fixture def secure_client(self, secure_core_server, secure_serving_server): root_certificate_credentials = pkgutil.get_data( @@ -140,8 +175,24 @@ def secure_client(self, secure_core_server, secure_serving_server): yield Client( core_url="localhost:50053", serving_url="localhost:50054", - core_secure=True, - serving_secure=True, + core_enable_ssl=True, + serving_enable_ssl=True, + ) + + @pytest.fixture + def secure_core_client_with_auth(self, secure_core_server_with_auth): + root_certificate_credentials = pkgutil.get_data( + __name__, _ROOT_CERTIFICATE_RESOURCE_PATH + ) + ssl_channel_credentials = grpc.ssl_channel_credentials( + root_certificates=root_certificate_credentials + ) + with mock.patch( + "grpc.ssl_channel_credentials", + MagicMock(return_value=ssl_channel_credentials), + ): + yield Client( + core_url="localhost:50055", core_enable_ssl=True, core_enable_auth=True ) @pytest.fixture @@ -603,7 +654,7 @@ def test_feature_set_types_success(self, test_client, dataframe, mocker): test_client.apply(all_types_fs) mocker.patch.object( - test_client._core_service_stub, + test_client._core_service, "GetFeatureSet", return_value=GetFeatureSetResponse(feature_set=all_types_fs.to_proto()), ) @@ -618,15 +669,15 @@ def test_secure_channel_creation_with_secure_client(self, _mocked_obj): client = Client( core_url="localhost:50051", serving_url="localhost:50052", - serving_secure=True, - core_secure=True, + serving_enable_ssl=True, + core_enable_ssl=True, ) with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( "grpc.ssl_channel_credentials", MagicMock(return_value="test") ) as _mocked_credentials: - client._connect_serving() + _ = client._serving_service _grpc_mock.assert_called_with( - client.serving_url, _mocked_credentials.return_value + client.serving_url, credentials=_mocked_credentials.return_value ) @mock.patch("grpc.channel_ready_future") @@ -637,9 +688,9 @@ def test_secure_channel_creation_with_secure_serving_url( with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( "grpc.ssl_channel_credentials", MagicMock(return_value="test") ) as _mocked_credentials: - client._connect_serving() + _ = client._serving_service _grpc_mock.assert_called_with( - client.serving_url, _mocked_credentials.return_value + client.serving_url, credentials=_mocked_credentials.return_value ) @patch("grpc.channel_ready_future") @@ -648,7 +699,24 @@ def test_secure_channel_creation_with_secure_core_url(self, _mocked_obj): with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( "grpc.ssl_channel_credentials", MagicMock(return_value="test") ) as _mocked_credentials: - client._connect_core() + _ = client._core_service _grpc_mock.assert_called_with( - client.core_url, _mocked_credentials.return_value + client.core_url, credentials=_mocked_credentials.return_value ) + + def test_auth_success_with_secure_channel_on_core_url( + self, secure_core_client_with_auth + ): + secure_core_client_with_auth.list_feature_sets() + + def test_auth_success_with_insecure_channel_on_core_url( + self, insecure_core_server_with_auth + ): + client = Client(core_url="localhost:50056", core_enable_auth=True) + client.list_feature_sets() + + def test_no_auth_sent_when_auth_disabled( + self, insecure_core_server_that_blocks_auth + ): + client = Client(core_url="localhost:50057") + client.list_feature_sets() From 04b59799b7afa162b8c33ce8c6b482b96c58c1a8 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Fri, 20 Mar 2020 15:09:52 +0800 Subject: [PATCH 03/10] Add comments and remove unnecessary config --- .gitignore | 3 ++- .../GoogleOID/GoogleOpenIDAuthenticationProvider.java | 2 ++ .../feast/core/auth/authorization/AuthorizationProvider.java | 4 ++++ core/src/main/resources/application.yml | 2 -- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index d034c89dccf..fb71051feae 100644 --- a/.gitignore +++ b/.gitignore @@ -183,4 +183,5 @@ sdk/python/feast/core/ sdk/python/feast/serving/ sdk/python/feast/storage/ sdk/python/feast/types/ -sdk/python/tensorflow_metadata \ No newline at end of file +sdk/python/tensorflow_metadata +core/src/main/resources/certificates/ diff --git a/core/src/main/java/feast/core/auth/authentication/GoogleOID/GoogleOpenIDAuthenticationProvider.java b/core/src/main/java/feast/core/auth/authentication/GoogleOID/GoogleOpenIDAuthenticationProvider.java index 1e7c1f1654e..103ed5e2635 100644 --- a/core/src/main/java/feast/core/auth/authentication/GoogleOID/GoogleOpenIDAuthenticationProvider.java +++ b/core/src/main/java/feast/core/auth/authentication/GoogleOID/GoogleOpenIDAuthenticationProvider.java @@ -51,6 +51,8 @@ public GoogleOpenIDAuthenticationProvider(Map options) { } /** + * Authenticate a request based on its Spring Security Authentication object + * * @param authentication Authentication object which contains a JWT to validate * @return Returns the same authentication object after authentication */ diff --git a/core/src/main/java/feast/core/auth/authorization/AuthorizationProvider.java b/core/src/main/java/feast/core/auth/authorization/AuthorizationProvider.java index 14ef91c64d3..0209b90b9ff 100644 --- a/core/src/main/java/feast/core/auth/authorization/AuthorizationProvider.java +++ b/core/src/main/java/feast/core/auth/authorization/AuthorizationProvider.java @@ -19,6 +19,10 @@ import org.springframework.security.access.AccessDeniedException; import org.springframework.security.core.Authentication; +/** + * AuthorizationProvider is the base interface that each AuthorizationProvider needs to implement in + * order to authorize requests to Feast Core + */ public interface AuthorizationProvider { /** diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index 214f97047ba..e99c263cf43 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -79,8 +79,6 @@ spring: url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_DATABASE:postgres} username: ${DB_USERNAME:postgres} password: ${DB_PASSWORD:password} - application: - name: local-grpc-server management: metrics: From c61cab160dd339b800dc3f9975e5811d48a97199 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sat, 21 Mar 2020 11:37:38 +0800 Subject: [PATCH 04/10] Fix broken auth tests and allow for static token to be set --- sdk/python/feast/client.py | 8 +++++ sdk/python/feast/constants.py | 1 + sdk/python/feast/grpc/auth.py | 52 ++++++++++++++++++++++++++++----- sdk/python/tests/test_client.py | 18 ++++++++++-- 4 files changed, 68 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 16b01b86a51..c482b369630 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -30,6 +30,7 @@ from feast.config import Config from feast.constants import ( CONFIG_CORE_ENABLE_AUTH_KEY, + CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY, CONFIG_CORE_ENABLE_SSL_KEY, CONFIG_CORE_SERVER_SSL_CERT_KEY, CONFIG_CORE_URL_KEY, @@ -110,8 +111,15 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): self._core_service_stub: CoreServiceStub = None self._serving_service_stub: ServingServiceStub = None self._auth_metadata = None + + # Configure Auth Metadata Plugin if auth is enabled if self._config.getboolean(CONFIG_CORE_ENABLE_AUTH_KEY): self._auth_metadata = feast_auth.get_auth_metadata_plugin() + # If provided, set a static token + if self._config.exists(CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY): + self._auth_metadata.set_static_token( + self._config.get(CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY) + ) @property def _core_service(self): diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 4ee63248bbe..00d55d5c1dd 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -28,6 +28,7 @@ CONFIG_CORE_URL_KEY = "core_url" CONFIG_CORE_ENABLE_SSL_KEY = "core_enable_ssl" CONFIG_CORE_ENABLE_AUTH_KEY = "core_enable_auth" +CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY = "core_auth_token" CONFIG_CORE_SERVER_SSL_CERT_KEY = "core_server_ssl_cert" CONFIG_SERVING_URL_KEY = "serving_url" CONFIG_SERVING_ENABLE_SSL_KEY = "serving_enable_ssl" diff --git a/sdk/python/feast/grpc/auth.py b/sdk/python/feast/grpc/auth.py index da8f3a4d0ee..4f8b3e7f9c7 100644 --- a/sdk/python/feast/grpc/auth.py +++ b/sdk/python/feast/grpc/auth.py @@ -29,6 +29,7 @@ def __init__(self): self._request = requests.Request() self._token = None + self._static_token = None self._refresh_token() def get_signed_meta(self): @@ -37,18 +38,53 @@ def get_signed_meta(self): def _refresh_token(self): """ Refreshes Google ID token and persists it in memory """ + + # Use static token if available + if self._static_token: + self._token = self._static_token + return + + # Try to find ID Token from Gcloud SDK + from google.auth import jwt + import subprocess + + cli_output = subprocess.run(["printenv"], stdout=subprocess.PIPE) + token = cli_output.stdout.decode("utf-8").strip() + try: + jwt.decode(token, verify=False) # Ensure the token is valid + self._token = token + return + except ValueError: + pass # GCloud command not successful + + # Try to use Google Auth library to find ID Token from google import auth as google_auth - credentials, _ = google_auth.default() + credentials, _ = google_auth.default(["openid", "email"]) credentials.refresh(self._request) - self._token = credentials.id_token + if hasattr(credentials, "id_token"): + self._token = credentials.id_token + return + + # Raise exception otherwise + raise RuntimeError("Could not determine Google ID token") - def __call__(self, context, callback): - """Passes authorization metadata into the given callback. + def set_static_token(self, token): + """ + Define a static token to return Args: - context (grpc.AuthMetadataContext): The RPC context. - callback (grpc.AuthMetadataPluginCallback): The callback that will - be invoked to pass in the authorization metadata. + token: String token """ - callback(self.get_signed_meta(), None) + self._static_token = token + + +def __call__(self, context, callback): + """Passes authorization metadata into the given callback. + + Args: + context (grpc.AuthMetadataContext): The RPC context. + callback (grpc.AuthMetadataPluginCallback): The callback that will + be invoked to pass in the authorization metadata. + """ + callback(self.get_signed_meta(), None) diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 14b017f0a75..bd2bc215f42 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -59,6 +59,11 @@ _PRIVATE_KEY_RESOURCE_PATH = "data/localhost.key" _CERTIFICATE_CHAIN_RESOURCE_PATH = "data/localhost.pem" _ROOT_CERTIFICATE_RESOURCE_PATH = "data/localhost.crt" +_FAKE_JWT_TOKEN = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0N" + "TY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDI" + "yfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" +) class TestClient: @@ -164,7 +169,7 @@ def secure_client(self, secure_core_server, secure_serving_server): root_certificate_credentials = pkgutil.get_data( __name__, _ROOT_CERTIFICATE_RESOURCE_PATH ) - # this is needed to establish a secure connection using self-signed certificates, for the purpose of the test + ssl_channel_credentials = grpc.ssl_channel_credentials( root_certificates=root_certificate_credentials ) @@ -192,7 +197,10 @@ def secure_core_client_with_auth(self, secure_core_server_with_auth): MagicMock(return_value=ssl_channel_credentials), ): yield Client( - core_url="localhost:50055", core_enable_ssl=True, core_enable_auth=True + core_url="localhost:50055", + core_enable_ssl=True, + core_enable_auth=True, + core_auth_token=_FAKE_JWT_TOKEN, ) @pytest.fixture @@ -712,7 +720,11 @@ def test_auth_success_with_secure_channel_on_core_url( def test_auth_success_with_insecure_channel_on_core_url( self, insecure_core_server_with_auth ): - client = Client(core_url="localhost:50056", core_enable_auth=True) + client = Client( + core_url="localhost:50056", + core_enable_auth=True, + core_auth_token=_FAKE_JWT_TOKEN, + ) client.list_feature_sets() def test_no_auth_sent_when_auth_disabled( From 9c7ff64b04c9b26255fca598a0dc80242a0c9973 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 23 Mar 2020 16:53:29 +0800 Subject: [PATCH 05/10] Implement static token configuration and fix broken tests --- sdk/python/feast/client.py | 12 ++------ sdk/python/feast/grpc/auth.py | 51 +++++++++++++++++++++++++-------- sdk/python/tests/test_client.py | 1 + 3 files changed, 43 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index c482b369630..1a5de3697d6 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -30,7 +30,6 @@ from feast.config import Config from feast.constants import ( CONFIG_CORE_ENABLE_AUTH_KEY, - CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY, CONFIG_CORE_ENABLE_SSL_KEY, CONFIG_CORE_SERVER_SSL_CERT_KEY, CONFIG_CORE_URL_KEY, @@ -114,12 +113,7 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): # Configure Auth Metadata Plugin if auth is enabled if self._config.getboolean(CONFIG_CORE_ENABLE_AUTH_KEY): - self._auth_metadata = feast_auth.get_auth_metadata_plugin() - # If provided, set a static token - if self._config.exists(CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY): - self._auth_metadata.set_static_token( - self._config.get(CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY) - ) + self._auth_metadata = feast_auth.get_auth_metadata_plugin(self._config) @property def _core_service(self): @@ -426,12 +420,12 @@ def list_feature_sets( filter = ListFeatureSetsRequest.Filter( project=project, feature_set_name=name, feature_set_version=version ) - + print("woop") # Get latest feature sets from Feast Core feature_set_protos = self._core_service.ListFeatureSets( ListFeatureSetsRequest(filter=filter), metadata=self._get_grpc_metadata(), ) # type: ListFeatureSetsResponse - + print("woop") # Extract feature sets and return feature_sets = [] for feature_set_proto in feature_set_protos.feature_sets: diff --git a/sdk/python/feast/grpc/auth.py b/sdk/python/feast/grpc/auth.py index 4f8b3e7f9c7..cd4bb2de3f7 100644 --- a/sdk/python/feast/grpc/auth.py +++ b/sdk/python/feast/grpc/auth.py @@ -1,7 +1,11 @@ import grpc +from google.auth.exceptions import DefaultCredentialsError +from feast.config import Config +from feast.constants import CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY -def get_auth_metadata_plugin(): + +def get_auth_metadata_plugin(config: Config): """ Get an Authentication Metadata Plugin. This plugin is used in gRPC to sign requests. Please see the following URL for more details @@ -11,8 +15,11 @@ def get_auth_metadata_plugin(): support Google Open ID authentication. Returns: Returns an implementation of grpc.AuthMetadataPlugin + + Args: + config: Feast Configuration object """ - return GoogleOpenIDAuthMetadataPlugin() + return GoogleOpenIDAuthMetadataPlugin(config) class GoogleOpenIDAuthMetadataPlugin(grpc.AuthMetadataPlugin): @@ -23,13 +30,23 @@ class GoogleOpenIDAuthMetadataPlugin(grpc.AuthMetadataPlugin): http://www.grpc.io/grpc/python/grpc.html#grpc.AuthMetadataPlugin """ - def __init__(self): + def __init__(self, config: Config): + """ + Initializes a GoogleOpenIDAuthMetadataPlugin, used to sign gRPC requests + Args: + config: Feast Configuration object + """ super(GoogleOpenIDAuthMetadataPlugin, self).__init__() from google.auth.transport import requests - self._request = requests.Request() - self._token = None self._static_token = None + self._token = None + + # If provided, set a static token + if config.exists(CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY): + self._static_token = config.get(CONFIG_CORE_ENABLE_AUTH_TOKEN_KEY) + + self._request = requests.Request() self._refresh_token() def get_signed_meta(self): @@ -48,7 +65,9 @@ def _refresh_token(self): from google.auth import jwt import subprocess - cli_output = subprocess.run(["printenv"], stdout=subprocess.PIPE) + cli_output = subprocess.run( + ["gcloud", "auth", "print-identity-token"], stdout=subprocess.PIPE + ) token = cli_output.stdout.decode("utf-8").strip() try: jwt.decode(token, verify=False) # Ensure the token is valid @@ -60,14 +79,22 @@ def _refresh_token(self): # Try to use Google Auth library to find ID Token from google import auth as google_auth - credentials, _ = google_auth.default(["openid", "email"]) - credentials.refresh(self._request) - if hasattr(credentials, "id_token"): - self._token = credentials.id_token - return + try: + credentials, _ = google_auth.default(["openid", "email"]) + credentials.refresh(self._request) + if hasattr(credentials, "id_token"): + self._token = credentials.id_token + return + except DefaultCredentialsError: + pass # Could not determine credentials, skip # Raise exception otherwise - raise RuntimeError("Could not determine Google ID token") + raise RuntimeError( + "Could not determine Google ID token. Please ensure that the " + "Google Cloud SDK is installed or that a service account can be " + "found using the GOOGLE_APPLICATION_CREDENTIALS environmental " + "variable." + ) def set_static_token(self, token): """ diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index bd2bc215f42..49abafcd29c 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -712,6 +712,7 @@ def test_secure_channel_creation_with_secure_core_url(self, _mocked_obj): client.core_url, credentials=_mocked_credentials.return_value ) + @mock.patch("grpc.channel_ready_future") def test_auth_success_with_secure_channel_on_core_url( self, secure_core_client_with_auth ): From dc727bcfdc4af2363dfaf9cbb0b5a9abac640dcc Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 23 Mar 2020 18:44:18 +0800 Subject: [PATCH 06/10] Fix e2e tests --- infra/scripts/test-end-to-end-batch.sh | 14 ++++++++++++-- infra/scripts/test-end-to-end.sh | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/infra/scripts/test-end-to-end-batch.sh b/infra/scripts/test-end-to-end-batch.sh index 9bc17c2e757..a83c402e2b2 100755 --- a/infra/scripts/test-end-to-end-batch.sh +++ b/infra/scripts/test-end-to-end-batch.sh @@ -111,8 +111,18 @@ Starting Feast Core # Start Feast Core in background cat < /tmp/core.application.yml grpc: - port: 6565 - enable-reflection: true + server: + port: 6565 + security: + enabled: false + +security: + authentication: + enabled: false + provider: None + authorization: + enabled: false + provider: None feast: version: 0.3 diff --git a/infra/scripts/test-end-to-end.sh b/infra/scripts/test-end-to-end.sh index 3e6a5492a16..88981d6ac52 100755 --- a/infra/scripts/test-end-to-end.sh +++ b/infra/scripts/test-end-to-end.sh @@ -94,8 +94,18 @@ Starting Feast Core # Start Feast Core in background cat < /tmp/core.application.yml grpc: - port: 6565 - enable-reflection: true + server: + port: 6565 + security: + enabled: false + +security: + authentication: + enabled: false + provider: None + authorization: + enabled: false + provider: None feast: version: 0.3 From 4d40e9aa183831e62a43af3ca46e258e5ba49c25 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 23 Mar 2020 18:55:27 +0800 Subject: [PATCH 07/10] Fix e2e test indent --- infra/scripts/test-end-to-end-batch.sh | 12 ++++++------ infra/scripts/test-end-to-end.sh | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/infra/scripts/test-end-to-end-batch.sh b/infra/scripts/test-end-to-end-batch.sh index a83c402e2b2..0b7c44a826b 100755 --- a/infra/scripts/test-end-to-end-batch.sh +++ b/infra/scripts/test-end-to-end-batch.sh @@ -117,12 +117,12 @@ grpc: enabled: false security: - authentication: - enabled: false - provider: None - authorization: - enabled: false - provider: None + authentication: + enabled: false + provider: None + authorization: + enabled: false + provider: None feast: version: 0.3 diff --git a/infra/scripts/test-end-to-end.sh b/infra/scripts/test-end-to-end.sh index 88981d6ac52..e0e3db8c309 100755 --- a/infra/scripts/test-end-to-end.sh +++ b/infra/scripts/test-end-to-end.sh @@ -100,12 +100,12 @@ grpc: enabled: false security: - authentication: - enabled: false - provider: None - authorization: - enabled: false - provider: None + authentication: + enabled: false + provider: None + authorization: + enabled: false + provider: None feast: version: 0.3 From 89003500a112cac3cd3b48a307ca9420aa42f613 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 23 Mar 2020 18:56:46 +0800 Subject: [PATCH 08/10] Fix e2e test indent --- infra/scripts/test-end-to-end-batch.sh | 27 +++++++++++++------------- infra/scripts/test-end-to-end.sh | 27 +++++++++++++------------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/infra/scripts/test-end-to-end-batch.sh b/infra/scripts/test-end-to-end-batch.sh index 0b7c44a826b..8b15b9b0954 100755 --- a/infra/scripts/test-end-to-end-batch.sh +++ b/infra/scripts/test-end-to-end-batch.sh @@ -110,19 +110,6 @@ Starting Feast Core " # Start Feast Core in background cat < /tmp/core.application.yml -grpc: - server: - port: 6565 - security: - enabled: false - -security: - authentication: - enabled: false - provider: None - authorization: - enabled: false - provider: None feast: version: 0.3 @@ -142,6 +129,20 @@ feast: replicationFactor: 1 partitions: 1 + security: + authentication: + enabled: false + provider: None + authorization: + enabled: false + provider: None + +grpc: +server: + port: 6565 + security: + enabled: false + spring: jpa: properties.hibernate: diff --git a/infra/scripts/test-end-to-end.sh b/infra/scripts/test-end-to-end.sh index e0e3db8c309..5128c17d820 100755 --- a/infra/scripts/test-end-to-end.sh +++ b/infra/scripts/test-end-to-end.sh @@ -93,19 +93,6 @@ Starting Feast Core " # Start Feast Core in background cat < /tmp/core.application.yml -grpc: - server: - port: 6565 - security: - enabled: false - -security: - authentication: - enabled: false - provider: None - authorization: - enabled: false - provider: None feast: version: 0.3 @@ -125,6 +112,20 @@ feast: replicationFactor: 1 partitions: 1 + security: + authentication: + enabled: false + provider: None + authorization: + enabled: false + provider: None + +grpc: + server: + port: 6565 + security: + enabled: false + spring: jpa: properties.hibernate: From 6254f79a8df6017944351b92c97cb04f41eff047 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 23 Mar 2020 19:11:28 +0800 Subject: [PATCH 09/10] Fix e2e test indent --- infra/scripts/test-end-to-end-batch.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/infra/scripts/test-end-to-end-batch.sh b/infra/scripts/test-end-to-end-batch.sh index 8b15b9b0954..10c2a2fecd9 100755 --- a/infra/scripts/test-end-to-end-batch.sh +++ b/infra/scripts/test-end-to-end-batch.sh @@ -138,10 +138,10 @@ feast: provider: None grpc: -server: - port: 6565 - security: - enabled: false + server: + port: 6565 + security: + enabled: false spring: jpa: From 49976945a1d208b2b0bc084034ccd1ba0231282e Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Sun, 29 Mar 2020 10:57:15 +0800 Subject: [PATCH 10/10] Update configuration comments and add disable option to enable auth for serving --- sdk/python/feast/client.py | 5 ++--- sdk/python/feast/constants.py | 30 ++++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 1a5de3697d6..f20301ceb31 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -35,7 +35,6 @@ CONFIG_CORE_URL_KEY, CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, CONFIG_PROJECT_KEY, - CONFIG_SERVING_ENABLE_AUTH_KEY, CONFIG_SERVING_ENABLE_SSL_KEY, CONFIG_SERVING_SERVER_SSL_CERT_KEY, CONFIG_SERVING_URL_KEY, @@ -145,11 +144,11 @@ def _serving_service(self): channel = create_grpc_channel( url=self._config.get(CONFIG_SERVING_URL_KEY), enable_ssl=self._config.getboolean(CONFIG_SERVING_ENABLE_SSL_KEY), - enable_auth=self._config.getboolean(CONFIG_SERVING_ENABLE_AUTH_KEY), + enable_auth=False, ssl_server_cert_path=self._config.get( CONFIG_SERVING_SERVER_SSL_CERT_KEY ), - auth_metadata_plugin=self._auth_metadata, + auth_metadata_plugin=None, timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ) self._serving_service_stub = ServingServiceStub(channel) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 00d55d5c1dd..990cdb6f392 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -14,16 +14,24 @@ # limitations under the License. # -# General constants DATETIME_COLUMN = "datetime" + +# Environmental variable to specify Feast configuration file location FEAST_CONFIG_FILE_ENV_KEY = "FEAST_CONFIG" + +# Default prefix to Feast environmental variables CONFIG_FEAST_ENV_VAR_PREFIX = "FEAST_" + +# Default directory to Feast configuration file CONFIG_FILE_DEFAULT_DIRECTORY = ".feast" + +# Default Feast configuration file name CONFIG_FILE_NAME = "config" -CONFIG_FILE_SECTION = "general" +# Default section in Feast configuration file to specify options +CONFIG_FILE_SECTION = "general" -# Feast configuration options +# Feast Configuration Options CONFIG_PROJECT_KEY = "project" CONFIG_CORE_URL_KEY = "core_url" CONFIG_CORE_ENABLE_SSL_KEY = "core_enable_ssl" @@ -32,26 +40,36 @@ CONFIG_CORE_SERVER_SSL_CERT_KEY = "core_server_ssl_cert" CONFIG_SERVING_URL_KEY = "serving_url" CONFIG_SERVING_ENABLE_SSL_KEY = "serving_enable_ssl" -CONFIG_SERVING_ENABLE_AUTH_KEY = "serving_enable_auth" CONFIG_SERVING_SERVER_SSL_CERT_KEY = "serving_server_ssl_cert" CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" -CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply_key" +CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply" CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( "batch_feature_request_wait_time_seconds" ) # Configuration option default values FEAST_DEFAULT_OPTIONS = { + # Default Feast project to use CONFIG_PROJECT_KEY: "default", + # Default Feast Core URL CONFIG_CORE_URL_KEY: "localhost:6565", + # Enable or disable TLS/SSL to Feast Core CONFIG_CORE_ENABLE_SSL_KEY: "False", + # Enable user authentication to Feast Core CONFIG_CORE_ENABLE_AUTH_KEY: "False", + # Path to certificate(s) to secure connection to Feast Core CONFIG_CORE_SERVER_SSL_CERT_KEY: "", + # Default Feast Serving URL CONFIG_SERVING_URL_KEY: "localhost:6565", + # Enable or disable TLS/SSL to Feast Serving CONFIG_SERVING_ENABLE_SSL_KEY: "False", - CONFIG_SERVING_ENABLE_AUTH_KEY: "False", + # Path to certificate(s) to secure connection to Feast Serving CONFIG_SERVING_SERVER_SSL_CERT_KEY: "", + # Default connection timeout to Feast Serving and Feast Core (in seconds) CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "3", + # Default gRPC connection timeout when sending an ApplyFeatureSet command to + # Feast Core (in seconds) CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY: "600", + # Time to wait for batch feature requests before timing out. CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY: "600", }