diff --git a/core/src/main/java/feast/core/model/FeatureSet.java b/core/src/main/java/feast/core/model/FeatureSet.java index 232a5f67d14..ec8da77c5f9 100644 --- a/core/src/main/java/feast/core/model/FeatureSet.java +++ b/core/src/main/java/feast/core/model/FeatureSet.java @@ -25,6 +25,7 @@ import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSetStatus; import feast.core.FeatureSetProto.FeatureSpec; +import feast.core.util.TypeConversion; import feast.types.ValueProto.ValueType.Enum; import java.util.ArrayList; import java.util.HashMap; @@ -116,6 +117,10 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable entities, List features, Source source, + Map labels, FeatureSetStatus status) { this.maxAgeSeconds = maxAgeSeconds; this.source = source; @@ -137,6 +143,7 @@ public FeatureSet( this.name = name; this.project = new Project(project); this.version = version; + this.labels = TypeConversion.convertMapToJsonString(labels); this.setId(project, name, version); addEntities(entities); addFeatures(features); @@ -191,6 +198,7 @@ public static FeatureSet fromProto(FeatureSetProto.FeatureSet featureSetProto) { entitySpecs, featureSpecs, source, + featureSetProto.getSpec().getLabelsMap(), featureSetProto.getMeta().getStatus()); } @@ -247,6 +255,7 @@ public FeatureSetProto.FeatureSet toProto() throws InvalidProtocolBufferExceptio .setMaxAge(Duration.newBuilder().setSeconds(maxAgeSeconds)) .addAllEntities(entitySpecs) .addAllFeatures(featureSpecs) + .putAllLabels(TypeConversion.convertJsonStringToMap(labels)) .setSource(source.toProto()); return FeatureSetProto.FeatureSet.newBuilder().setMeta(meta).setSpec(spec).build(); @@ -352,6 +361,10 @@ private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field featureSpecBuilder.setTimeOfDayDomain( TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain())); } + + if (featureField.getLabels() != null) { + featureSpecBuilder.putAllLabels(featureField.getLabels()); + } } /** diff --git a/core/src/main/java/feast/core/model/Field.java b/core/src/main/java/feast/core/model/Field.java index cb23e4eceb7..213c17f954a 100644 --- a/core/src/main/java/feast/core/model/Field.java +++ b/core/src/main/java/feast/core/model/Field.java @@ -18,8 +18,9 @@ import feast.core.FeatureSetProto.EntitySpec; import feast.core.FeatureSetProto.FeatureSpec; -import feast.types.ValueProto.ValueType; +import feast.core.util.TypeConversion; import java.util.Arrays; +import java.util.Map; import java.util.Objects; import javax.persistence.Column; import javax.persistence.Embeddable; @@ -47,6 +48,10 @@ public class Field { @Column(name = "project") private String project; + // Labels that this field belongs to + @Column(name = "labels", columnDefinition = "text") + private String labels; + // Presence constraints (refer to proto feast.core.FeatureSet.FeatureSpec) // Only one of them can be set. private byte[] presence; @@ -74,14 +79,10 @@ public class Field { public Field() {} - public Field(String name, ValueType.Enum type) { - this.name = name; - this.type = type.toString(); - } - public Field(FeatureSpec featureSpec) { this.name = featureSpec.getName(); this.type = featureSpec.getValueType().toString(); + this.labels = TypeConversion.convertMapToJsonString(featureSpec.getLabelsMap()); switch (featureSpec.getPresenceConstraintsCase()) { case PRESENCE: @@ -215,6 +216,10 @@ public Field(EntitySpec entitySpec) { } } + public Map getLabels() { + return TypeConversion.convertJsonStringToMap(this.labels); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -227,6 +232,7 @@ public boolean equals(Object o) { return Objects.equals(name, field.name) && Objects.equals(type, field.type) && Objects.equals(project, field.project) + && Objects.equals(labels, field.labels) && Arrays.equals(presence, field.presence) && Arrays.equals(groupPresence, field.groupPresence) && Arrays.equals(shape, field.shape) @@ -247,6 +253,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(super.hashCode(), name, type); + return Objects.hash(super.hashCode(), name, type, project, labels); } } diff --git a/core/src/main/java/feast/core/util/TypeConversion.java b/core/src/main/java/feast/core/util/TypeConversion.java index e01a5511359..6ee990fc1c5 100644 --- a/core/src/main/java/feast/core/util/TypeConversion.java +++ b/core/src/main/java/feast/core/util/TypeConversion.java @@ -70,9 +70,6 @@ public static Map convertJsonStringToMap(String jsonString) { * @return json string corresponding to given map */ public static String convertMapToJsonString(Map map) { - if (map.isEmpty()) { - return "{}"; - } return gson.toJson(map); } diff --git a/core/src/main/java/feast/core/validators/FeatureSetValidator.java b/core/src/main/java/feast/core/validators/FeatureSetValidator.java index 213e3898d51..ca0f1ec035d 100644 --- a/core/src/main/java/feast/core/validators/FeatureSetValidator.java +++ b/core/src/main/java/feast/core/validators/FeatureSetValidator.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; public class FeatureSetValidator { + public static void validateSpec(FeatureSet featureSet) { if (featureSet.getSpec().getProject().isEmpty()) { throw new IllegalArgumentException("Project name must be provided"); @@ -34,6 +35,9 @@ public static void validateSpec(FeatureSet featureSet) { if (featureSet.getSpec().getName().isEmpty()) { throw new IllegalArgumentException("Feature set name must be provided"); } + if (featureSet.getSpec().getLabelsMap().containsKey("")) { + throw new IllegalArgumentException("Feature set label keys must not be empty"); + } checkValidCharacters(featureSet.getSpec().getProject(), "project"); checkValidCharacters(featureSet.getSpec().getName(), "name"); @@ -44,6 +48,9 @@ public static void validateSpec(FeatureSet featureSet) { } for (FeatureSpec featureSpec : featureSet.getSpec().getFeaturesList()) { checkValidCharacters(featureSpec.getName(), "features::name"); + if (featureSpec.getLabelsMap().containsKey("")) { + throw new IllegalArgumentException("Feature label keys must not be empty"); + } } } diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java index c0e90ca43f4..e98d66c3b7e 100644 --- a/core/src/test/java/feast/core/service/JobServiceTest.java +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -34,11 +34,8 @@ import feast.core.CoreServiceProto.RestartIngestionJobResponse; import feast.core.CoreServiceProto.StopIngestionJobRequest; import feast.core.CoreServiceProto.StopIngestionJobResponse; -import feast.core.FeatureSetProto.FeatureSetStatus; import feast.core.FeatureSetReferenceProto.FeatureSetReference; import feast.core.IngestionJobProto.IngestionJob; -import feast.core.SourceProto.KafkaSourceConfig; -import feast.core.SourceProto.SourceType; import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.core.dao.JobRepository; @@ -84,14 +81,7 @@ public void setup() { // create mock objects for testing // fake data source - this.dataSource = - new Source( - SourceType.KAFKA, - KafkaSourceConfig.newBuilder() - .setBootstrapServers("kafka:9092") - .setTopic("my-topic") - .build(), - true); + this.dataSource = TestObjectFactory.defaultSource; // fake data store this.dataStore = new Store( @@ -158,19 +148,12 @@ public void setupJobManager() { // dummy model constructorss private FeatureSet newDummyFeatureSet(String name, int version, String project) { - Field feature = new Field(name + "_feature", Enum.INT64); - Field entity = new Field(name + "_entity", Enum.STRING); + Field feature = TestObjectFactory.CreateFeatureField(name + "_feature", Enum.INT64); + Field entity = TestObjectFactory.CreateEntityField(name + "_entity", Enum.STRING); FeatureSet fs = - new FeatureSet( - name, - project, - version, - 100L, - Arrays.asList(entity), - Arrays.asList(feature), - this.dataSource, - FeatureSetStatus.STATUS_READY); + TestObjectFactory.CreateFeatureSet( + name, project, version, Arrays.asList(entity), Arrays.asList(feature)); fs.setCreated(Date.from(Instant.ofEpochSecond(10L))); return fs; } diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java index 43a66135dce..bb9f832bd7f 100644 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ b/core/src/test/java/feast/core/service/SpecServiceTest.java @@ -39,10 +39,7 @@ import feast.core.FeatureSetProto; import feast.core.FeatureSetProto.EntitySpec; import feast.core.FeatureSetProto.FeatureSetSpec; -import feast.core.FeatureSetProto.FeatureSetStatus; import feast.core.FeatureSetProto.FeatureSpec; -import feast.core.SourceProto.KafkaSourceConfig; -import feast.core.SourceProto.SourceType; import feast.core.StoreProto; import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; @@ -110,39 +107,24 @@ public class SpecServiceTest { @Before public void setUp() { initMocks(this); - defaultSource = - new Source( - SourceType.KAFKA, - KafkaSourceConfig.newBuilder() - .setBootstrapServers("kafka:9092") - .setTopic("my-topic") - .build(), - true); + defaultSource = TestObjectFactory.defaultSource; FeatureSet featureSet1v1 = newDummyFeatureSet("f1", 1, "project1"); FeatureSet featureSet1v2 = newDummyFeatureSet("f1", 2, "project1"); FeatureSet featureSet1v3 = newDummyFeatureSet("f1", 3, "project1"); FeatureSet featureSet2v1 = newDummyFeatureSet("f2", 1, "project1"); - Field f3f1 = new Field("f3f1", Enum.INT64); - Field f3f2 = new Field("f3f2", Enum.INT64); - Field f3e1 = new Field("f3e1", Enum.STRING); + Field f3f1 = TestObjectFactory.CreateFeatureField("f3f1", Enum.INT64); + Field f3f2 = TestObjectFactory.CreateFeatureField("f3f2", Enum.INT64); + Field f3e1 = TestObjectFactory.CreateEntityField("f3e1", Enum.STRING); FeatureSet featureSet3v1 = - new FeatureSet( - "f3", - "project1", - 1, - 100L, - Arrays.asList(f3e1), - Arrays.asList(f3f2, f3f1), - defaultSource, - FeatureSetStatus.STATUS_READY); + TestObjectFactory.CreateFeatureSet( + "f3", "project1", 1, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1)); featureSets = Arrays.asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1, featureSet3v1); when(featureSetRepository.findAll()).thenReturn(featureSets); when(featureSetRepository.findAllByOrderByNameAscVersionAsc()).thenReturn(featureSets); - when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion("f1", "project1", 1)) .thenReturn(featureSets.get(0)); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( @@ -490,19 +472,12 @@ public void applyFeatureSetShouldIncrementFeatureSetVersionIfAlreadyExists() public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered() throws InvalidProtocolBufferException { - Field f3f1 = new Field("f3f1", Enum.INT64); - Field f3f2 = new Field("f3f2", Enum.INT64); - Field f3e1 = new Field("f3e1", Enum.STRING); + Field f3f1 = TestObjectFactory.CreateFeatureField("f3f1", Enum.INT64); + Field f3f2 = TestObjectFactory.CreateFeatureField("f3f2", Enum.INT64); + Field f3e1 = TestObjectFactory.CreateEntityField("f3e1", Enum.STRING); FeatureSetProto.FeatureSet incomingFeatureSet = - (new FeatureSet( - "f3", - "project1", - 5, - 100L, - Arrays.asList(f3e1), - Arrays.asList(f3f2, f3f1), - defaultSource, - FeatureSetStatus.STATUS_READY)) + (TestObjectFactory.CreateFeatureSet( + "f3", "project1", 5, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1))) .toProto(); ApplyFeatureSetResponse applyFeatureSetResponse = @@ -630,16 +605,8 @@ public void applyFeatureSetShouldAcceptPresenceShapeAndDomainConstraints() new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); appliedFeatureSpecs.sort(Comparator.comparing(FeatureSpec::getName)); - assertEquals(appliedEntitySpecs.size(), entitySpecs.size()); - assertEquals(appliedFeatureSpecs.size(), featureSpecs.size()); - - for (int i = 0; i < appliedEntitySpecs.size(); i++) { - assertEquals(entitySpecs.get(i), appliedEntitySpecs.get(i)); - } - - for (int i = 0; i < appliedFeatureSpecs.size(); i++) { - assertEquals(featureSpecs.get(i), appliedFeatureSpecs.get(i)); - } + assertEquals(appliedEntitySpecs, entitySpecs); + assertEquals(appliedFeatureSpecs, featureSpecs); } @Test @@ -713,19 +680,12 @@ public void applyFeatureSetShouldUpdateFeatureSetWhenConstraintsAreUpdated() @Test public void applyFeatureSetShouldCreateProjectWhenNotAlreadyExists() throws InvalidProtocolBufferException { - Field f3f1 = new Field("f3f1", Enum.INT64); - Field f3f2 = new Field("f3f2", Enum.INT64); - Field f3e1 = new Field("f3e1", Enum.STRING); + Field f3f1 = TestObjectFactory.CreateFeatureField("f3f1", Enum.INT64); + Field f3f2 = TestObjectFactory.CreateFeatureField("f3f2", Enum.INT64); + Field f3e1 = TestObjectFactory.CreateEntityField("f3e1", Enum.STRING); FeatureSetProto.FeatureSet incomingFeatureSet = - (new FeatureSet( - "f3", - "newproject", - 5, - 100L, - Arrays.asList(f3e1), - Arrays.asList(f3f2, f3f1), - defaultSource, - FeatureSetStatus.STATUS_READY)) + (TestObjectFactory.CreateFeatureSet( + "f3", "newproject", 5, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1))) .toProto(); ApplyFeatureSetResponse applyFeatureSetResponse = @@ -739,19 +699,12 @@ public void applyFeatureSetShouldCreateProjectWhenNotAlreadyExists() @Test public void applyFeatureSetShouldFailWhenProjectIsArchived() throws InvalidProtocolBufferException { - Field f3f1 = new Field("f3f1", Enum.INT64); - Field f3f2 = new Field("f3f2", Enum.INT64); - Field f3e1 = new Field("f3e1", Enum.STRING); + Field f3f1 = TestObjectFactory.CreateFeatureField("f3f1", Enum.INT64); + Field f3f2 = TestObjectFactory.CreateFeatureField("f3f2", Enum.INT64); + Field f3e1 = TestObjectFactory.CreateEntityField("f3e1", Enum.STRING); FeatureSetProto.FeatureSet incomingFeatureSet = - (new FeatureSet( - "f3", - "archivedproject", - 5, - 100L, - Arrays.asList(f3e1), - Arrays.asList(f3f2, f3f1), - defaultSource, - FeatureSetStatus.STATUS_READY)) + (TestObjectFactory.CreateFeatureSet( + "f3", "archivedproject", 5, Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1))) .toProto(); expectedException.expect(IllegalArgumentException.class); @@ -759,6 +712,101 @@ public void applyFeatureSetShouldFailWhenProjectIsArchived() specService.applyFeatureSet(incomingFeatureSet); } + @Test + public void applyFeatureSetShouldAcceptFeatureLabels() throws InvalidProtocolBufferException { + List entitySpecs = new ArrayList<>(); + entitySpecs.add(EntitySpec.newBuilder().setName("entity1").setValueType(Enum.INT64).build()); + + Map featureLabels0 = + new HashMap<>() { + { + put("label1", "feast1"); + } + }; + + Map featureLabels1 = + new HashMap<>() { + { + put("label1", "feast1"); + put("label2", "feast2"); + } + }; + + List> featureLabels = new ArrayList<>(); + featureLabels.add(featureLabels0); + featureLabels.add(featureLabels1); + + List featureSpecs = new ArrayList<>(); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(Enum.INT64) + .putAllLabels(featureLabels.get(0)) + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature2") + .setValueType(Enum.INT64) + .putAllLabels(featureLabels.get(1)) + .build()); + + FeatureSetSpec featureSetSpec = + FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("featureSetWithConstraints") + .addAllEntities(entitySpecs) + .addAllFeatures(featureSpecs) + .build(); + FeatureSetProto.FeatureSet featureSet = + FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); + + ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSet); + FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet().getSpec(); + + // appliedEntitySpecs needs to be sorted because the list returned by specService may not + // follow the order in the request + List appliedEntitySpecs = new ArrayList<>(appliedFeatureSetSpec.getEntitiesList()); + appliedEntitySpecs.sort(Comparator.comparing(EntitySpec::getName)); + + // appliedFeatureSpecs needs to be sorted because the list returned by specService may not + // follow the order in the request + List appliedFeatureSpecs = + new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); + appliedFeatureSpecs.sort(Comparator.comparing(FeatureSpec::getName)); + + var featureSpecsLabels = + featureSpecs.stream().map(e -> e.getLabelsMap()).collect(Collectors.toList()); + assertEquals(appliedEntitySpecs, entitySpecs); + assertEquals(appliedFeatureSpecs, featureSpecs); + assertEquals(featureSpecsLabels, featureLabels); + } + + @Test + public void applyFeatureSetShouldAcceptFeatureSetLabels() throws InvalidProtocolBufferException { + Map featureSetLabels = + new HashMap<>() { + { + put("description", "My precious feature set"); + } + }; + + FeatureSetSpec featureSetSpec = + FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("preciousFeatureSet") + .putAllLabels(featureSetLabels) + .build(); + FeatureSetProto.FeatureSet featureSet = + FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); + + ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSet); + FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet().getSpec(); + + var appliedLabels = appliedFeatureSetSpec.getLabelsMap(); + + assertEquals(featureSetLabels, appliedLabels); + } + @Test public void shouldUpdateStoreIfConfigChanges() throws InvalidProtocolBufferException { when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0))); @@ -806,19 +854,18 @@ public void shouldFailIfGetFeatureSetWithoutProject() throws InvalidProtocolBuff } private FeatureSet newDummyFeatureSet(String name, int version, String project) { - Field feature = new Field("feature", Enum.INT64); - Field entity = new Field("entity", Enum.STRING); + FeatureSpec f1 = + FeatureSpec.newBuilder() + .setName("feature") + .setValueType(Enum.STRING) + .putLabels("key", "value") + .build(); + Field feature = new Field(f1); + Field entity = TestObjectFactory.CreateEntityField("entity", Enum.STRING); FeatureSet fs = - new FeatureSet( - name, - project, - version, - 100L, - Arrays.asList(entity), - Arrays.asList(feature), - defaultSource, - FeatureSetStatus.STATUS_READY); + TestObjectFactory.CreateFeatureSet( + name, project, version, Arrays.asList(entity), Arrays.asList(feature)); fs.setCreated(Date.from(Instant.ofEpochSecond(10L))); return fs; } diff --git a/core/src/test/java/feast/core/service/TestObjectFactory.java b/core/src/test/java/feast/core/service/TestObjectFactory.java new file mode 100644 index 00000000000..966cb8d8163 --- /dev/null +++ b/core/src/test/java/feast/core/service/TestObjectFactory.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import feast.core.FeatureSetProto; +import feast.core.SourceProto; +import feast.core.model.FeatureSet; +import feast.core.model.Field; +import feast.core.model.Source; +import feast.types.ValueProto; +import java.util.HashMap; +import java.util.List; + +public class TestObjectFactory { + + public static Source defaultSource = + new Source( + SourceProto.SourceType.KAFKA, + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("my-topic") + .build(), + true); + + public static FeatureSet CreateFeatureSet( + String name, String project, int version, List entities, List features) { + return new FeatureSet( + name, + project, + version, + 100L, + entities, + features, + defaultSource, + new HashMap<>(), + FeatureSetProto.FeatureSetStatus.STATUS_READY); + } + + public static Field CreateFeatureField(String name, ValueProto.ValueType.Enum valueType) { + return new Field( + FeatureSetProto.FeatureSpec.newBuilder().setName(name).setValueType(valueType).build()); + } + + public static Field CreateEntityField(String name, ValueProto.ValueType.Enum valueType) { + return new Field( + FeatureSetProto.EntitySpec.newBuilder().setName(name).setValueType(valueType).build()); + } +} diff --git a/core/src/test/java/feast/core/util/TypeConversionTest.java b/core/src/test/java/feast/core/util/TypeConversionTest.java index 75548f34653..02f0a7cee45 100644 --- a/core/src/test/java/feast/core/util/TypeConversionTest.java +++ b/core/src/test/java/feast/core/util/TypeConversionTest.java @@ -18,8 +18,7 @@ import static com.jayway.jsonpath.matchers.JsonPathMatchers.hasJsonPath; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import com.google.protobuf.Timestamp; import java.util.*; @@ -70,6 +69,12 @@ public void convertMapToJsonStringShouldReturnJsonStringForGivenMap() { TypeConversion.convertMapToJsonString(input), hasJsonPath("$.key", equalTo("value"))); } + @Test + public void convertMapToJsonStringShouldReturnEmptyJsonForAnEmptyMap() { + Map input = new HashMap<>(); + assertThat(TypeConversion.convertMapToJsonString(input), equalTo("{}")); + } + @Test public void convertJsonStringToArgsShouldReturnCorrectListOfArgs() { Map input = new HashMap<>(); diff --git a/core/src/test/java/feast/core/validators/FeatureSetValidatorTest.java b/core/src/test/java/feast/core/validators/FeatureSetValidatorTest.java new file mode 100644 index 00000000000..2e1e4e381ae --- /dev/null +++ b/core/src/test/java/feast/core/validators/FeatureSetValidatorTest.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.validators; + +import feast.core.FeatureSetProto; +import feast.types.ValueProto; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class FeatureSetValidatorTest { + + @Rule public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldThrowExceptionForFeatureLabelsWithAnEmptyKey() { + Map featureLabels = + new HashMap<>() { + { + put("", "empty_key"); + } + }; + + List featureSpecs = new ArrayList<>(); + featureSpecs.add( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(ValueProto.ValueType.Enum.INT64) + .putAllLabels(featureLabels) + .build()); + + FeatureSetProto.FeatureSetSpec featureSetSpec = + FeatureSetProto.FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("featureSetWithConstraints") + .addAllFeatures(featureSpecs) + .build(); + FeatureSetProto.FeatureSet featureSet = + FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Feature label keys must not be empty"); + FeatureSetValidator.validateSpec(featureSet); + } + + @Test + public void shouldThrowExceptionForFeatureSetLabelsWithAnEmptyKey() { + + Map featureSetLabels = + new HashMap<>() { + { + put("", "empty_key"); + } + }; + + FeatureSetProto.FeatureSetSpec featureSetSpec = + FeatureSetProto.FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("featureSetWithConstraints") + .putAllLabels(featureSetLabels) + .build(); + FeatureSetProto.FeatureSet featureSet = + FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Feature set label keys must not be empty"); + FeatureSetValidator.validateSpec(featureSet); + } +} diff --git a/protos/feast/core/FeatureSet.proto b/protos/feast/core/FeatureSet.proto index 429d99c8547..73173f09914 100644 --- a/protos/feast/core/FeatureSet.proto +++ b/protos/feast/core/FeatureSet.proto @@ -60,6 +60,9 @@ message FeatureSetSpec { // Optional. Source on which feature rows can be found. // If not set, source will be set to the default value configured in Feast Core. Source source = 6; + + // User defined metadata + map labels = 8; } message EntitySpec { @@ -156,6 +159,9 @@ message FeatureSpec { tensorflow.metadata.v0.TimeDomain time_domain = 17; tensorflow.metadata.v0.TimeOfDayDomain time_of_day_domain = 18; } + + // Labels for user defined metadata on a feature + map labels = 19; } message FeatureSetMeta { diff --git a/sdk/python/feast/feature.py b/sdk/python/feast/feature.py index 9c7ff20f9e2..f5c07070b09 100644 --- a/sdk/python/feast/feature.py +++ b/sdk/python/feast/feature.py @@ -56,7 +56,7 @@ def from_proto(cls, feature_proto: FeatureProto): Feature object """ feature = cls( - name=feature_proto.name, dtype=ValueType(feature_proto.value_type) + name=feature_proto.name, dtype=ValueType(feature_proto.value_type), ) feature.update_presence_constraints(feature_proto) feature.update_shape_type(feature_proto) diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index b4490f025c5..4d215cc9901 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -25,7 +25,9 @@ KAFKA_CHUNK_PRODUCTION_TIMEOUT = 120 # type: int -def _encode_pa_tables(file: str, fs: FeatureSet, row_group_idx: int) -> List[bytes]: +def _encode_pa_tables( + file: str, feature_set: str, fields: dict, row_group_idx: int +) -> List[bytes]: """ Helper function to encode a PyArrow table(s) read from parquet file(s) into FeatureRows. @@ -41,8 +43,11 @@ def _encode_pa_tables(file: str, fs: FeatureSet, row_group_idx: int) -> List[byt File directory of all the parquet file to encode. Parquet file must have more than one row group. - fs (feast.feature_set.FeatureSet): - FeatureSet describing parquet files. + feature_set (str): + Feature set reference in the format f"{project}/{name}:{version}". + + fields (dict[str, enum.Enum.ValueType]): + A mapping of field names to their value types. row_group_idx(int): Row group index to read and encode into byte like FeatureRow @@ -61,12 +66,10 @@ def _encode_pa_tables(file: str, fs: FeatureSet, row_group_idx: int) -> List[byt # Preprocess the columns by converting all its values to Proto values proto_columns = { - field_name: pa_column_to_proto_column(field.dtype, table.column(field_name)) - for field_name, field in fs.fields.items() + field_name: pa_column_to_proto_column(dtype, table.column(field_name)) + for field_name, dtype in fields.items() } - feature_set = f"{fs.project}/{fs.name}:{fs.version}" - # List to store result feature_rows = [] @@ -120,8 +123,12 @@ def get_feature_row_chunks( Iterable list of byte encoded FeatureRow(s). """ + feature_set = f"{fs.project}/{fs.name}:{fs.version}" + + field_map = {field.name: field.dtype for field in fs.fields.values()} + pool = Pool(max_workers) - func = partial(_encode_pa_tables, file, fs) + func = partial(_encode_pa_tables, file, feature_set, field_map) for chunk in pool.imap(func, row_groups): yield chunk return diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index ed0426b2f6a..3082265eccf 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -559,7 +559,16 @@ def test_apply_feature_set_success(self, test_client): and feature_sets[0].name == "my-feature-set-1" and feature_sets[0].features[0].name == "fs1-my-feature-1" and feature_sets[0].features[0].dtype == ValueType.INT64 + and feature_sets[0].features[1].name == "fs1-my-feature-2" + and feature_sets[0].features[1].dtype == ValueType.STRING + and feature_sets[0].entities[0].name == "fs1-my-entity-1" + and feature_sets[0].entities[0].dtype == ValueType.INT64 + and feature_sets[1].features[0].name == "fs2-my-feature-1" + and feature_sets[1].features[0].dtype == ValueType.STRING_LIST + and feature_sets[1].features[1].name == "fs2-my-feature-2" and feature_sets[1].features[1].dtype == ValueType.BYTES_LIST + and feature_sets[1].entities[0].name == "fs2-my-entity-1" + and feature_sets[1].entities[0].dtype == ValueType.INT64 ) @pytest.mark.parametrize( diff --git a/tests/e2e/basic-ingest-redis-serving.py b/tests/e2e/basic-ingest-redis-serving.py index 8e40794344e..e80c5b7af61 100644 --- a/tests/e2e/basic-ingest-redis-serving.py +++ b/tests/e2e/basic-ingest-redis-serving.py @@ -2,12 +2,15 @@ import math import random import time +import grpc from feast.entity import Entity from feast.serving.ServingService_pb2 import ( GetOnlineFeaturesRequest, GetOnlineFeaturesResponse, ) from feast.core.IngestionJob_pb2 import IngestionJobStatus +from feast.core.CoreService_pb2_grpc import CoreServiceStub +from feast.core import CoreService_pb2 from feast.types.Value_pb2 import Value as Value from feast.client import Client from feast.feature_set import FeatureSet, FeatureSetRef @@ -26,6 +29,7 @@ FLOAT_TOLERANCE = 0.00001 PROJECT_NAME = 'basic_' + uuid.uuid4().hex.upper()[0:6] + @pytest.fixture(scope='module') def core_url(pytestconfig): return pytestconfig.getoption("core_url") @@ -109,6 +113,7 @@ def test_basic_ingest_success(client, basic_dataframe): client.ingest(cust_trans_fs, basic_dataframe) time.sleep(5) + @pytest.mark.timeout(45) @pytest.mark.run(order=12) def test_basic_retrieve_online_success(client, basic_dataframe): @@ -146,12 +151,13 @@ def test_basic_retrieve_online_success(client, basic_dataframe): basic_dataframe.iloc[0]["daily_transactions"]) if math.isclose( - sent_daily_transactions, - returned_daily_transactions, - abs_tol=FLOAT_TOLERANCE, + sent_daily_transactions, + returned_daily_transactions, + abs_tol=FLOAT_TOLERANCE, ): break + @pytest.mark.timeout(300) @pytest.mark.run(order=19) def test_basic_ingest_jobs(client, basic_dataframe): @@ -319,20 +325,20 @@ def test_all_types_retrieve_online_success(client, all_types_dataframe): if response is None: continue - returned_float_list = ( response.field_values[0] - .fields[PROJECT_NAME+"/float_list_feature"] + .fields[PROJECT_NAME + "/float_list_feature"] .float_list_val.val ) sent_float_list = all_types_dataframe.iloc[0]["float_list_feature"] if math.isclose( - returned_float_list[0], sent_float_list[0], abs_tol=FLOAT_TOLERANCE + returned_float_list[0], sent_float_list[0], abs_tol=FLOAT_TOLERANCE ): break + @pytest.mark.timeout(300) @pytest.mark.run(order=29) def test_all_types_ingest_jobs(client, all_types_dataframe): @@ -355,6 +361,7 @@ def test_all_types_ingest_jobs(client, all_types_dataframe): ingest_job.wait(IngestionJobStatus.ABORTED) assert ingest_job.status == IngestionJobStatus.ABORTED + @pytest.fixture(scope='module') def large_volume_dataframe(): ROW_COUNT = 100000 @@ -445,9 +452,9 @@ def test_large_volume_retrieve_online_success(client, large_volume_dataframe): large_volume_dataframe.iloc[0]["daily_transactions_large"]) if math.isclose( - sent_daily_transactions, - returned_daily_transactions, - abs_tol=FLOAT_TOLERANCE, + sent_daily_transactions, + returned_daily_transactions, + abs_tol=FLOAT_TOLERANCE, ): break @@ -462,14 +469,14 @@ def all_types_parquet_file(): "customer_id": [np.int32(random.randint(0, 10000)) for _ in range(COUNT)], "int32_feature_parquet": [np.int32(random.randint(0, 10000)) for _ in - range(COUNT)], + range(COUNT)], "int64_feature_parquet": [np.int64(random.randint(0, 10000)) for _ in - range(COUNT)], + range(COUNT)], "float_feature_parquet": [np.float(random.random()) for _ in range(COUNT)], "double_feature_parquet": [np.float64(random.random()) for _ in - range(COUNT)], + range(COUNT)], "string_feature_parquet": ["one" + str(random.random()) for _ in - range(COUNT)], + range(COUNT)], "bytes_feature_parquet": [b"one" for _ in range(COUNT)], "int32_list_feature_parquet": [ np.array([1, 2, 3, random.randint(0, 10000)], dtype=np.int32) @@ -509,6 +516,7 @@ def all_types_parquet_file(): df.to_parquet(file_path, allow_truncated_timestamps=True) return file_path + @pytest.mark.timeout(300) @pytest.mark.run(order=40) def test_all_types_parquet_register_feature_set_success(client): @@ -539,10 +547,86 @@ def test_all_types_parquet_register_feature_set_success(client): @pytest.mark.timeout(600) @pytest.mark.run(order=41) def test_all_types_infer_register_ingest_file_success(client, - all_types_parquet_file): + all_types_parquet_file): # Get feature set all_types_fs = client.get_feature_set(name="all_types_parquet") # Ingest user embedding data client.ingest(feature_set=all_types_fs, source=all_types_parquet_file, force_update=True) + + +# TODO: rewrite these using python SDK once the labels are implemented there +class TestsBasedOnGrpc: + LAST_VERSION = 0 + GRPC_CONNECTION_TIMEOUT = 3 + LABEL_KEY = "my" + LABEL_VALUE = "label" + + @pytest.fixture(scope="module") + def core_service_stub(self, core_url): + if core_url.endswith(":443"): + core_channel = grpc.secure_channel( + core_url, grpc.ssl_channel_credentials() + ) + else: + core_channel = grpc.insecure_channel(core_url) + + try: + grpc.channel_ready_future(core_channel).result(timeout=self.GRPC_CONNECTION_TIMEOUT) + except grpc.FutureTimeoutError: + raise ConnectionError( + f"Connection timed out while attempting to connect to Feast " + f"Core gRPC server {core_url} " + ) + core_service_stub = CoreServiceStub(core_channel) + return core_service_stub + + def apply_feature_set(self, core_service_stub, feature_set_proto): + try: + apply_fs_response = core_service_stub.ApplyFeatureSet( + CoreService_pb2.ApplyFeatureSetRequest(feature_set=feature_set_proto), + timeout=self.GRPC_CONNECTION_TIMEOUT, + ) # type: ApplyFeatureSetResponse + except grpc.RpcError as e: + raise grpc.RpcError(e.details()) + return apply_fs_response.feature_set + + def get_feature_set(self, core_service_stub, name, project): + try: + get_feature_set_response = core_service_stub.GetFeatureSet( + CoreService_pb2.GetFeatureSetRequest( + project=project, name=name.strip(), version=self.LAST_VERSION + ) + ) # type: GetFeatureSetResponse + except grpc.RpcError as e: + raise grpc.RpcError(e.details()) + return get_feature_set_response.feature_set + + @pytest.mark.timeout(45) + @pytest.mark.run(order=51) + def test_register_feature_set_with_labels(self, core_service_stub): + feature_set_name = "test_feature_set_labels" + feature_set_proto = FeatureSet(feature_set_name, PROJECT_NAME).to_proto() + feature_set_proto.spec.labels[self.LABEL_KEY] = self.LABEL_VALUE + self.apply_feature_set(core_service_stub, feature_set_proto) + + retrieved_feature_set = self.get_feature_set(core_service_stub, feature_set_name, PROJECT_NAME) + + assert self.LABEL_KEY in retrieved_feature_set.spec.labels + assert retrieved_feature_set.spec.labels[self.LABEL_KEY] == self.LABEL_VALUE + + @pytest.mark.timeout(45) + @pytest.mark.run(order=52) + def test_register_feature_with_labels(self, core_service_stub): + feature_set_name = "test_feature_labels" + feature_set_proto = FeatureSet(feature_set_name, PROJECT_NAME, features=[Feature("rating", ValueType.INT64)]) \ + .to_proto() + feature_set_proto.spec.features[0].labels[self.LABEL_KEY] = self.LABEL_VALUE + self.apply_feature_set(core_service_stub, feature_set_proto) + + retrieved_feature_set = self.get_feature_set(core_service_stub, feature_set_name, PROJECT_NAME) + retrieved_feature = retrieved_feature_set.spec.features[0] + + assert self.LABEL_KEY in retrieved_feature.labels + assert retrieved_feature.labels[self.LABEL_KEY] == self.LABEL_VALUE