From cdb1cf3b9fbd3d6ad0d601174d79b6e4e2aa6c5e Mon Sep 17 00:00:00 2001 From: zhilingc Date: Fri, 29 Nov 2019 15:56:59 +0800 Subject: [PATCH 1/3] Filter out feature sets that dont share the same source --- .../java/feast/core/grpc/CoreServiceImpl.java | 34 ++--- .../feast/core/grpc/CoreServiceImplTest.java | 137 ++++++++++++++++++ 2 files changed, 154 insertions(+), 17 deletions(-) create mode 100644 core/src/test/java/feast/core/grpc/CoreServiceImplTest.java diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 5861742e00c..5abb4212537 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -51,13 +51,23 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; -/** Implementation of the feast core GRPC service. */ +/** + * Implementation of the feast core GRPC service. + */ @Slf4j @GRpcService public class CoreServiceImpl extends CoreServiceImplBase { - @Autowired private SpecService specService; - @Autowired private JobCoordinatorService jobCoordinatorService; + private SpecService specService; + private JobCoordinatorService jobCoordinatorService; + + @Autowired + public CoreServiceImpl( + SpecService specService, + JobCoordinatorService jobCoordinatorService) { + this.specService = specService; + this.jobCoordinatorService = jobCoordinatorService; + } @Override public void getFeastCoreVersion( @@ -114,23 +124,10 @@ public void applyFeatureSet( ApplyFeatureSetRequest request, StreamObserver responseObserver) { try { ApplyFeatureSetResponse response = specService.applyFeatureSet(request.getFeatureSet()); - String featureSetName = response.getFeatureSet().getName(); ListStoresResponse stores = specService.listStores(Filter.newBuilder().build()); for (Store store : stores.getStoreList()) { - List relevantSubscriptions = - store.getSubscriptionsList().stream() - .filter( - sub -> { - String subString = sub.getName(); - if (!subString.contains(".*")) { - subString = subString.replace("*", ".*"); - } - Pattern p = Pattern.compile(subString); - return p.matcher(featureSetName).matches(); - }) - .collect(Collectors.toList()); Set featureSetSpecs = new HashSet<>(); - for (Subscription subscription : relevantSubscriptions) { + for (Subscription subscription : store.getSubscriptionsList()) { featureSetSpecs.addAll( specService .listFeatureSets( @@ -144,6 +141,9 @@ public void applyFeatureSet( // We use the request featureSet source because it contains the information // about whether to default to the default feature stream or not SourceProto.Source source = response.getFeatureSet().getSource(); + featureSetSpecs = featureSetSpecs.stream().filter( + fs -> fs.getSource().equals(source) + ).collect(Collectors.toSet()); jobCoordinatorService.startOrUpdateJob( Lists.newArrayList(featureSetSpecs), source, store); } diff --git a/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java b/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java new file mode 100644 index 00000000000..f969e6a6915 --- /dev/null +++ b/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java @@ -0,0 +1,137 @@ +package feast.core.grpc; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.CoreServiceProto.ApplyFeatureSetRequest; +import feast.core.CoreServiceProto.ApplyFeatureSetResponse; +import feast.core.CoreServiceProto.ApplyFeatureSetResponse.Status; +import feast.core.CoreServiceProto.ListFeatureSetsRequest; +import feast.core.CoreServiceProto.ListFeatureSetsResponse; +import feast.core.CoreServiceProto.ListStoresRequest.Filter; +import feast.core.CoreServiceProto.ListStoresResponse; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.SourceProto.KafkaSourceConfig; +import feast.core.SourceProto.Source; +import feast.core.SourceProto.SourceType; +import feast.core.StoreProto.Store; +import feast.core.StoreProto.Store.RedisConfig; +import feast.core.StoreProto.Store.StoreType; +import feast.core.StoreProto.Store.Subscription; +import feast.core.service.JobCoordinatorService; +import feast.core.service.SpecService; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Captor; +import org.mockito.Mock; + +public class CoreServiceImplTest { + + @Mock + private JobCoordinatorService jobCoordinatorService; + + @Mock + private SpecService specService; + + @Captor + private ArgumentCaptor> fsListArgCaptor; + + @Before + public void setUp() { + initMocks(this); + } + + @Test + public void shouldPassCorrectListOfFeatureSetsToJobService() + throws InvalidProtocolBufferException { + CoreServiceImpl coreService = new CoreServiceImpl(specService, jobCoordinatorService); + Store store = Store.newBuilder() + .setType(StoreType.REDIS) + .setRedisConfig(RedisConfig.newBuilder() + .setHost("localhost").setPort(6379).build()) + .addSubscriptions(Subscription.newBuilder().setName("*").setVersion(">0")) + .build(); + FeatureSetSpec fs1Sc1 = FeatureSetSpec.newBuilder() + .setName("feature_set") + .setVersion(1) + .setSource(Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic1") + .build())) + .build(); + FeatureSetSpec fs2Sc1 = FeatureSetSpec.newBuilder() + .setName("feature_set_other") + .setVersion(1) + .setSource(Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic1") + .build())) + .build(); + FeatureSetSpec fs3Sc2 = FeatureSetSpec.newBuilder() + .setName("feature_set") + .setVersion(2) + .setSource(Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic2") + .build())) + .build(); + when(specService.applyFeatureSet(fs1Sc1)) + .thenReturn(ApplyFeatureSetResponse.newBuilder() + .setStatus(Status.CREATED) + .setFeatureSet(fs1Sc1) + .build()); + when(specService.listStores(ArgumentMatchers.any())) + .thenReturn(ListStoresResponse.newBuilder() + .addStore(store).build()); + when(specService.listFeatureSets( + ListFeatureSetsRequest.Filter + .newBuilder() + .setFeatureSetName("*") + .setFeatureSetVersion(">0").build())) + .thenReturn(ListFeatureSetsResponse.newBuilder() + .addFeatureSets(fs1Sc1) + .addFeatureSets(fs3Sc2) + .addFeatureSets(fs2Sc1).build()); + + coreService.applyFeatureSet(ApplyFeatureSetRequest.newBuilder() + .setFeatureSet(fs1Sc1).build(), new StreamObserver() { + @Override + public void onNext(ApplyFeatureSetResponse applyFeatureSetResponse) { + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }); + + verify(jobCoordinatorService, times(1)) + .startOrUpdateJob(fsListArgCaptor.capture(), eq(fs1Sc1.getSource()), eq(store)); + + assertThat(fsListArgCaptor.getValue(), containsInAnyOrder(fs1Sc1, fs2Sc1)); + } + + +} \ No newline at end of file From ac5088c98b273cfa6259c25ad2ba4a247b420920 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Fri, 29 Nov 2019 16:00:32 +0800 Subject: [PATCH 2/3] Apply spotless --- .../java/feast/core/grpc/CoreServiceImpl.java | 17 +- .../feast/core/grpc/CoreServiceImplTest.java | 166 ++++++++++-------- 2 files changed, 97 insertions(+), 86 deletions(-) diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 5abb4212537..704eb922462 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -42,18 +42,14 @@ import feast.core.service.SpecService; import io.grpc.stub.StreamObserver; import java.util.HashSet; -import java.util.List; import java.util.Set; -import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.lognet.springboot.grpc.GRpcService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; -/** - * Implementation of the feast core GRPC service. - */ +/** Implementation of the feast core GRPC service. */ @Slf4j @GRpcService public class CoreServiceImpl extends CoreServiceImplBase { @@ -62,9 +58,7 @@ public class CoreServiceImpl extends CoreServiceImplBase { private JobCoordinatorService jobCoordinatorService; @Autowired - public CoreServiceImpl( - SpecService specService, - JobCoordinatorService jobCoordinatorService) { + public CoreServiceImpl(SpecService specService, JobCoordinatorService jobCoordinatorService) { this.specService = specService; this.jobCoordinatorService = jobCoordinatorService; } @@ -141,9 +135,10 @@ public void applyFeatureSet( // We use the request featureSet source because it contains the information // about whether to default to the default feature stream or not SourceProto.Source source = response.getFeatureSet().getSource(); - featureSetSpecs = featureSetSpecs.stream().filter( - fs -> fs.getSource().equals(source) - ).collect(Collectors.toSet()); + featureSetSpecs = + featureSetSpecs.stream() + .filter(fs -> fs.getSource().equals(source)) + .collect(Collectors.toSet()); jobCoordinatorService.startOrUpdateJob( Lists.newArrayList(featureSetSpecs), source, store); } diff --git a/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java b/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java index f969e6a6915..5a3794cc654 100644 --- a/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java +++ b/core/src/test/java/feast/core/grpc/CoreServiceImplTest.java @@ -1,3 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package feast.core.grpc; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -14,7 +30,6 @@ import feast.core.CoreServiceProto.ApplyFeatureSetResponse.Status; import feast.core.CoreServiceProto.ListFeatureSetsRequest; import feast.core.CoreServiceProto.ListFeatureSetsResponse; -import feast.core.CoreServiceProto.ListStoresRequest.Filter; import feast.core.CoreServiceProto.ListStoresResponse; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.SourceProto.KafkaSourceConfig; @@ -37,14 +52,11 @@ public class CoreServiceImplTest { - @Mock - private JobCoordinatorService jobCoordinatorService; + @Mock private JobCoordinatorService jobCoordinatorService; - @Mock - private SpecService specService; + @Mock private SpecService specService; - @Captor - private ArgumentCaptor> fsListArgCaptor; + @Captor private ArgumentCaptor> fsListArgCaptor; @Before public void setUp() { @@ -55,83 +67,87 @@ public void setUp() { public void shouldPassCorrectListOfFeatureSetsToJobService() throws InvalidProtocolBufferException { CoreServiceImpl coreService = new CoreServiceImpl(specService, jobCoordinatorService); - Store store = Store.newBuilder() - .setType(StoreType.REDIS) - .setRedisConfig(RedisConfig.newBuilder() - .setHost("localhost").setPort(6379).build()) - .addSubscriptions(Subscription.newBuilder().setName("*").setVersion(">0")) - .build(); - FeatureSetSpec fs1Sc1 = FeatureSetSpec.newBuilder() - .setName("feature_set") - .setVersion(1) - .setSource(Source.newBuilder() - .setType(SourceType.KAFKA) - .setKafkaSourceConfig( - KafkaSourceConfig.newBuilder() - .setBootstrapServers("kafka:9092") - .setTopic("topic1") - .build())) - .build(); - FeatureSetSpec fs2Sc1 = FeatureSetSpec.newBuilder() - .setName("feature_set_other") - .setVersion(1) - .setSource(Source.newBuilder() - .setType(SourceType.KAFKA) - .setKafkaSourceConfig( - KafkaSourceConfig.newBuilder() - .setBootstrapServers("kafka:9092") - .setTopic("topic1") - .build())) - .build(); - FeatureSetSpec fs3Sc2 = FeatureSetSpec.newBuilder() - .setName("feature_set") - .setVersion(2) - .setSource(Source.newBuilder() - .setType(SourceType.KAFKA) - .setKafkaSourceConfig( - KafkaSourceConfig.newBuilder() - .setBootstrapServers("kafka:9092") - .setTopic("topic2") - .build())) - .build(); + Store store = + Store.newBuilder() + .setType(StoreType.REDIS) + .setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379).build()) + .addSubscriptions(Subscription.newBuilder().setName("*").setVersion(">0")) + .build(); + FeatureSetSpec fs1Sc1 = + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setVersion(1) + .setSource( + Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic1") + .build())) + .build(); + FeatureSetSpec fs2Sc1 = + FeatureSetSpec.newBuilder() + .setName("feature_set_other") + .setVersion(1) + .setSource( + Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic1") + .build())) + .build(); + FeatureSetSpec fs3Sc2 = + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setVersion(2) + .setSource( + Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("topic2") + .build())) + .build(); when(specService.applyFeatureSet(fs1Sc1)) - .thenReturn(ApplyFeatureSetResponse.newBuilder() - .setStatus(Status.CREATED) - .setFeatureSet(fs1Sc1) - .build()); + .thenReturn( + ApplyFeatureSetResponse.newBuilder() + .setStatus(Status.CREATED) + .setFeatureSet(fs1Sc1) + .build()); when(specService.listStores(ArgumentMatchers.any())) - .thenReturn(ListStoresResponse.newBuilder() - .addStore(store).build()); + .thenReturn(ListStoresResponse.newBuilder().addStore(store).build()); when(specService.listFeatureSets( - ListFeatureSetsRequest.Filter - .newBuilder() - .setFeatureSetName("*") - .setFeatureSetVersion(">0").build())) - .thenReturn(ListFeatureSetsResponse.newBuilder() - .addFeatureSets(fs1Sc1) - .addFeatureSets(fs3Sc2) - .addFeatureSets(fs2Sc1).build()); + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName("*") + .setFeatureSetVersion(">0") + .build())) + .thenReturn( + ListFeatureSetsResponse.newBuilder() + .addFeatureSets(fs1Sc1) + .addFeatureSets(fs3Sc2) + .addFeatureSets(fs2Sc1) + .build()); - coreService.applyFeatureSet(ApplyFeatureSetRequest.newBuilder() - .setFeatureSet(fs1Sc1).build(), new StreamObserver() { - @Override - public void onNext(ApplyFeatureSetResponse applyFeatureSetResponse) { - } + coreService.applyFeatureSet( + ApplyFeatureSetRequest.newBuilder().setFeatureSet(fs1Sc1).build(), + new StreamObserver() { + @Override + public void onNext(ApplyFeatureSetResponse applyFeatureSetResponse) {} - @Override - public void onError(Throwable throwable) { - } + @Override + public void onError(Throwable throwable) {} - @Override - public void onCompleted() { - } - }); + @Override + public void onCompleted() {} + }); verify(jobCoordinatorService, times(1)) .startOrUpdateJob(fsListArgCaptor.capture(), eq(fs1Sc1.getSource()), eq(store)); assertThat(fsListArgCaptor.getValue(), containsInAnyOrder(fs1Sc1, fs2Sc1)); } - - -} \ No newline at end of file +} From c34eaea7508cc416b9a390efd929ec9141f76469 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Fri, 29 Nov 2019 16:13:56 +0800 Subject: [PATCH 3/3] Change typo --- core/src/main/java/feast/core/grpc/CoreServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 704eb922462..a057789df7a 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -132,7 +132,7 @@ public void applyFeatureSet( .getFeatureSetsList()); } if (!featureSetSpecs.isEmpty() && featureSetSpecs.contains(response.getFeatureSet())) { - // We use the request featureSet source because it contains the information + // We use the response featureSet source because it contains the information // about whether to default to the default feature stream or not SourceProto.Source source = response.getFeatureSet().getSource(); featureSetSpecs =