diff --git a/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java b/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java index 7e9e235b5b..81dadaf38e 100644 --- a/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java +++ b/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java @@ -28,12 +28,12 @@ import feast.ingestion.service.SpecService.Builder; import feast.ingestion.service.SpecService.UnsupportedBuilder; import feast.specs.ImportSpecProto.ImportSpec; -import feast.storage.ErrorsStore; -import feast.storage.ServingStore; -import feast.storage.WarehouseStore; -import feast.storage.service.ErrorsStoreService; -import feast.storage.service.ServingStoreService; -import feast.storage.service.WarehouseStoreService; +import feast.store.errors.FeatureErrorsFactory; +import feast.store.errors.FeatureErrorsFactoryService; +import feast.store.serving.FeatureServingFactory; +import feast.store.serving.FeatureServingFactoryService; +import feast.store.warehouse.FeatureWarehouseFactory; +import feast.store.warehouse.FeatureWarehouseFactoryService; import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; @@ -78,19 +78,19 @@ Specs provideSpecs(SpecService.Builder specService) { @Provides @Singleton - List provideWarehouseStores() { - return WarehouseStoreService.getAll(); + List provideWarehouseStores() { + return FeatureWarehouseFactoryService.getAll(); } @Provides @Singleton - List provideServingStores() { - return ServingStoreService.getAll(); + List provideServingStores() { + return FeatureServingFactoryService.getAll(); } @Provides @Singleton - List provideErrorsStores() { - return ErrorsStoreService.getAll(); + List provideErrorsStores() { + return FeatureErrorsFactoryService.getAll(); } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/ErrorsStoreTransform.java b/ingestion/src/main/java/feast/ingestion/transform/ErrorsStoreTransform.java index 7b8e41030e..92590b91df 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ErrorsStoreTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ErrorsStoreTransform.java @@ -23,10 +23,10 @@ import com.google.inject.Inject; import feast.ingestion.model.Specs; import feast.ingestion.options.ImportJobPipelineOptions; -import feast.ingestion.transform.FeatureIO.Write; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ErrorsStore; -import feast.storage.noop.NoOpIO; +import feast.store.FeatureStoreWrite; +import feast.store.NoOpIO; +import feast.store.errors.FeatureErrorsFactory; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -35,18 +35,19 @@ import org.apache.hadoop.hbase.util.Strings; @Slf4j -public class ErrorsStoreTransform extends FeatureIO.Write { +public class ErrorsStoreTransform extends FeatureStoreWrite { private String errorsStoreType; private StorageSpec errorsStoreSpec; private Specs specs; - private List errorsStores; + private List errorsStoreFactories; @Inject public ErrorsStoreTransform( - ImportJobPipelineOptions options, Specs specs, List errorsStores) { + ImportJobPipelineOptions options, Specs specs, + List errorsStoreFactories) { this.specs = specs; - this.errorsStores = errorsStores; + this.errorsStoreFactories = errorsStoreFactories; this.errorsStoreType = options.getErrorsStoreType(); if (!Strings.isEmpty(errorsStoreType)) { @@ -60,7 +61,7 @@ public ErrorsStoreTransform( @Override public PDone expand(PCollection input) { - Write write; + FeatureStoreWrite write; if (Strings.isEmpty(errorsStoreType)) { write = new NoOpIO.Write(); } else { @@ -70,11 +71,11 @@ public PDone expand(PCollection input) { return PDone.in(input.getPipeline()); } - ErrorsStore getErrorStore() { + FeatureErrorsFactory getErrorStore() { checkArgument(!errorsStoreType.isEmpty(), "Errors store type not provided"); - for (ErrorsStore errorsStore : errorsStores) { - if (errorsStore.getType().equals(errorsStoreType)) { - return errorsStore; + for (FeatureErrorsFactory errorsStoreFactory : errorsStoreFactories) { + if (errorsStoreFactory.getType().equals(errorsStoreType)) { + return errorsStoreFactory; } } throw new IllegalArgumentException("Errors store type not found"); diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureIO.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureIO.java deleted file mode 100644 index c6d9a3b1c0..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/FeatureIO.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.transform; - -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -import feast.types.FeatureRowProto.FeatureRow; -import lombok.AllArgsConstructor; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; - -public class FeatureIO { - - public abstract static class Read extends PTransform> {} - - public abstract static class Write extends PTransform, PDone> {} - - /** Used during setup if read transform can not be determined. */ - @AllArgsConstructor - public static class UnknownRead extends Read { - private String message; - - @Override - public PCollection expand(PInput input) { - throw new IllegalArgumentException(message); - } - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/ServingStoreTransform.java b/ingestion/src/main/java/feast/ingestion/transform/ServingStoreTransform.java index feb712da0d..024687bdfa 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ServingStoreTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ServingStoreTransform.java @@ -21,21 +21,20 @@ import feast.ingestion.metrics.FeastMetrics; import feast.ingestion.model.Specs; import feast.ingestion.values.PFeatureRows; -import feast.storage.ServingStore; +import feast.store.serving.FeatureServingFactory; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import java.util.List; - @Slf4j public class ServingStoreTransform extends PTransform { - private List stores; + private List stores; private Specs specs; @Inject - public ServingStoreTransform(List stores, Specs specs) { + public ServingStoreTransform(List stores, Specs specs) { this.stores = stores; this.specs = specs; } @@ -50,7 +49,8 @@ public PFeatureRows expand(PFeatureRows input) { output.getMain().apply("metrics.store.lag", ParDo.of(FeastMetrics.lagUpdateDoFn())); output.getMain().apply("metrics.store.main", ParDo.of(FeastMetrics.incrDoFn("serving_stored"))); - output.getErrors().apply("metrics.store.errors", ParDo.of(FeastMetrics.incrDoFn("serving_errors"))); + output.getErrors() + .apply("metrics.store.errors", ParDo.of(FeastMetrics.incrDoFn("serving_errors"))); return output; } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java index cb11490dd3..c7e58892f0 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/SplitOutputByStore.java @@ -20,12 +20,12 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; import feast.ingestion.transform.SplitFeatures.MultiOutputSplit; import feast.ingestion.values.PFeatureRows; import feast.specs.FeatureSpecProto.FeatureSpec; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.FeatureStore; +import feast.store.FeatureStoreWrite; +import feast.store.FeatureStoreFactory; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import java.util.Collection; import java.util.HashMap; @@ -46,20 +46,20 @@ @Slf4j public class SplitOutputByStore extends PTransform { - private Collection stores; + private Collection stores; private SerializableFunction selector; private Specs specs; @Override public PFeatureRows expand(PFeatureRows input) { - Map transforms = getFeatureStoreTransforms(); + Map transforms = getFeatureStoreTransforms(); Set keys = transforms.keySet(); log.info(String.format("Splitting on keys = [%s]", String.join(",", keys))); MultiOutputSplit splitter = new MultiOutputSplit<>(selector, keys, specs); PCollectionTuple splits = input.getMain().apply(splitter); - Map, Write> taggedTransforms = new HashMap<>(); + Map, FeatureStoreWrite> taggedTransforms = new HashMap<>(); for (String key : transforms.keySet()) { TupleTag tag = splitter.getStrategy().getTag(key); taggedTransforms.put(tag, transforms.get(key)); @@ -72,17 +72,17 @@ public PFeatureRows expand(PFeatureRows input) { input.getErrors()); } - private Map getStoresMap() { - Map storesMap = new HashMap<>(); - for (FeatureStore servingStore : stores) { + private Map getStoresMap() { + Map storesMap = new HashMap<>(); + for (FeatureStoreFactory servingStore : stores) { storesMap.put(servingStore.getType(), servingStore); } return storesMap; } - private Map getFeatureStoreTransforms() { - Map storesMap = getStoresMap(); - Map transforms = new HashMap<>(); + private Map getFeatureStoreTransforms() { + Map storesMap = getStoresMap(); + Map transforms = new HashMap<>(); Map storageSpecs = specs.getStorageSpecs(); for (String storeId : storageSpecs.keySet()) { StorageSpec storageSpec = storageSpecs.get(storeId); @@ -109,14 +109,14 @@ private Map getFeatureStoreTransforms() { public static class WriteTags extends PTransform> { - private Map, Write> transforms; + private Map, FeatureStoreWrite> transforms; private TupleTag mainTag; @Override public PCollection expand(PCollectionTuple tuple) { List> outputList = Lists.newArrayList(); for (TupleTag tag : transforms.keySet()) { - Write write = transforms.get(tag); + FeatureStoreWrite write = transforms.get(tag); Preconditions.checkNotNull(write, String.format("Null transform for tag=%s", tag.getId())); PCollection input = tuple.get(tag); input.apply(String.format("Write to %s", tag.getId()), write); diff --git a/ingestion/src/main/java/feast/ingestion/transform/WarehouseStoreTransform.java b/ingestion/src/main/java/feast/ingestion/transform/WarehouseStoreTransform.java index dcfb9c4433..de70541961 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WarehouseStoreTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WarehouseStoreTransform.java @@ -18,23 +18,23 @@ package feast.ingestion.transform; import com.google.inject.Inject; +import feast.ingestion.metrics.FeastMetrics; +import feast.ingestion.model.Specs; +import feast.ingestion.values.PFeatureRows; +import feast.store.warehouse.FeatureWarehouseFactory; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import feast.ingestion.metrics.FeastMetrics; -import feast.ingestion.model.Specs; -import feast.ingestion.values.PFeatureRows; -import feast.storage.WarehouseStore; @Slf4j public class WarehouseStoreTransform extends PTransform { - private List stores; + private List stores; private Specs specs; @Inject - public WarehouseStoreTransform(List stores, Specs specs) { + public WarehouseStoreTransform(List stores, Specs specs) { this.stores = stores; this.specs = specs; } @@ -48,8 +48,10 @@ public PFeatureRows expand(PFeatureRows input) { stores, (featureSpec) -> featureSpec.getDataStores().getWarehouse().getId(), specs)); - output.getMain().apply("metrics.store.main", ParDo.of(FeastMetrics.incrDoFn("warehouse_stored"))); - output.getErrors().apply("metrics.store.errors", ParDo.of(FeastMetrics.incrDoFn("warehouse_errors"))); + output.getMain() + .apply("metrics.store.main", ParDo.of(FeastMetrics.incrDoFn("warehouse_stored"))); + output.getErrors() + .apply("metrics.store.errors", ParDo.of(FeastMetrics.incrDoFn("warehouse_errors"))); return output; } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowsDoFn.java index 40fc6054fd..2704c5e766 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowsDoFn.java @@ -31,10 +31,10 @@ import feast.specs.ImportSpecProto.Field; import feast.specs.ImportSpecProto.ImportSpec; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ServingStore; -import feast.storage.WarehouseStore; -import feast.storage.service.ServingStoreService; -import feast.storage.service.WarehouseStoreService; +import feast.store.serving.FeatureServingFactory; +import feast.store.serving.FeatureServingFactoryService; +import feast.store.warehouse.FeatureWarehouseFactory; +import feast.store.warehouse.FeatureWarehouseFactoryService; import feast.types.FeatureProto.Feature; import feast.types.FeatureRowProto.FeatureRow; import feast.types.GranularityProto.Granularity.Enum; @@ -66,10 +66,10 @@ public void setup() { featureIds.add(field.getFeatureId()); } } - for (ServingStore store : ServingStoreService.getAll()) { + for (FeatureServingFactory store : FeatureServingFactoryService.getAll()) { supportedServingTypes.add(store.getType()); } - for (WarehouseStore store : WarehouseStoreService.getAll()) { + for (FeatureWarehouseFactory store : FeatureWarehouseFactoryService.getAll()) { supportedWarehouseTypes.add(store.getType()); } } diff --git a/ingestion/src/main/java/feast/source/bigquery/BigQueryToFeatureRowFn.java b/ingestion/src/main/java/feast/source/bigquery/BigQueryToFeatureRowFn.java index 795735410d..dfe26c45b7 100644 --- a/ingestion/src/main/java/feast/source/bigquery/BigQueryToFeatureRowFn.java +++ b/ingestion/src/main/java/feast/source/bigquery/BigQueryToFeatureRowFn.java @@ -23,7 +23,7 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.common.collect.Maps; import com.google.protobuf.Timestamp; -import feast.storage.bigquery.ValueBigQueryBuilder; +import feast.store.warehouse.bigquery.ValueBigQueryBuilder; import java.util.Map; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; @@ -34,8 +34,6 @@ import feast.types.FeatureProto.Feature; import feast.types.FeatureRowProto.FeatureRow; import feast.types.ValueProto.Value; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is a serializable function used with the BigQueryIO for fetching feature rows directly from diff --git a/ingestion/src/main/java/feast/storage/StoreClient.java b/ingestion/src/main/java/feast/storage/StoreClient.java deleted file mode 100644 index 9f1302187b..0000000000 --- a/ingestion/src/main/java/feast/storage/StoreClient.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.storage; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import feast.serving.ServingAPIProto.TimestampRange; -import feast.specs.FeatureSpecProto.FeatureSpec; -import feast.types.FeatureRowProto.FeatureRow; - -/** Abstraction of Client for Feast Storage. */ -public interface StoreClient { - - /** Get a the latest FeatureRow containing features for a single entity id */ - FeatureRow getLatest(String entityName, String entityId, List featureSpecs); - - /** Get a the latest FeatureRow for containing features for each entity id */ - List getLatest( - String entityName, Collection entityIds, List featureSpecs); - - /** - * Get a the N most recent FeatureRows for single entity id. Each featureRow will contain at least - * one of the features provided - */ - List getLatest( - String entityName, String entityId, List featureSpecs, int limit); - - /** - * Get up to N most recent FeatureRows for each entity id. Each featureRow will contain at least - * one of the features provided - * @return A map of feature ids to lists of FeatureRows - */ - Map> getLatestRange( - String entityName, Collection entityId, List featureSpecs, int limit, TimestampRange tsRange); -} diff --git a/ingestion/src/main/java/feast/storage/file/json/JsonFileFeatureIO.java b/ingestion/src/main/java/feast/storage/file/json/JsonFileFeatureIO.java deleted file mode 100644 index 0ebbbcb540..0000000000 --- a/ingestion/src/main/java/feast/storage/file/json/JsonFileFeatureIO.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.storage.file.json; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.util.JsonFormat; -import feast.ingestion.transform.FeatureIO; -import feast.storage.file.FileStoreOptions; -import feast.storage.file.TextFileFeatureIO; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -import lombok.AllArgsConstructor; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -public class JsonFileFeatureIO { - - public static Write writeRowExtended(FileStoreOptions options) { - return new Write(options, (rowExtended) -> rowExtended); - } - - public static Write writeRow(FileStoreOptions options) { - return new Write(options, (rowExtended) -> rowExtended.getRow()); - } - - @AllArgsConstructor - public static class Write extends FeatureIO.Write { - - private FileStoreOptions options; - private SerializableFunction messageFunction; - - @Override - public PDone expand(PCollection input) { - return input.apply( - "Write Text Files", - new TextFileFeatureIO.Write( - options, - (rowExtended) -> { - try { - return JsonFormat.printer() - .omittingInsignificantWhitespace() - .print(messageFunction.apply(rowExtended)); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - }, - ".json")); - } - } -} diff --git a/ingestion/src/main/java/feast/storage/file/json/JsonFileStores.java b/ingestion/src/main/java/feast/storage/file/json/JsonFileStores.java deleted file mode 100644 index 11dda1e3b7..0000000000 --- a/ingestion/src/main/java/feast/storage/file/json/JsonFileStores.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.storage.file.json; - -import com.google.auto.service.AutoService; -import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; -import feast.options.OptionsParser; -import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ErrorsStore; -import feast.storage.ServingStore; -import feast.storage.WarehouseStore; -import feast.storage.file.FileStoreOptions; -import lombok.AllArgsConstructor; - -public class JsonFileStores { - - private static final String JSON_FILES_TYPE = "file.json"; - - @AutoService(WarehouseStore.class) - @AllArgsConstructor - public static class JsonFileWarehouseStore implements WarehouseStore { - - @Override - public Write create(StorageSpec storageSpec, Specs specs) { - FileStoreOptions options = - OptionsParser.parse(storageSpec.getOptionsMap(), FileStoreOptions.class); - options.jobName = specs.getJobName(); - return JsonFileFeatureIO.writeRow(options); - } - - @Override - public String getType() { - return JSON_FILES_TYPE; - } - } - - @AutoService(ServingStore.class) - @AllArgsConstructor - public static class JsonFileServingStore implements ServingStore { - - @Override - public Write create(StorageSpec storageSpec, Specs specs) { - FileStoreOptions options = - OptionsParser.parse(storageSpec.getOptionsMap(), FileStoreOptions.class); - options.jobName = specs.getJobName(); - return JsonFileFeatureIO.writeRow(options); - } - - @Override - public String getType() { - return JSON_FILES_TYPE; - } - } - - @AutoService(ErrorsStore.class) - @AllArgsConstructor - public static class JsonFileErrorsStore implements ErrorsStore { - - @Override - public Write create(StorageSpec storageSpec, Specs specs) { - FileStoreOptions options = - OptionsParser.parse(storageSpec.getOptionsMap(), FileStoreOptions.class); - options.jobName = specs.getJobName(); - return JsonFileFeatureIO.writeRowExtended(options); - } - - @Override - public String getType() { - return JSON_FILES_TYPE; - } - } -} diff --git a/ingestion/src/main/java/feast/storage/noop/NoOpServingStore.java b/ingestion/src/main/java/feast/storage/noop/NoOpServingStore.java deleted file mode 100644 index 15de592144..0000000000 --- a/ingestion/src/main/java/feast/storage/noop/NoOpServingStore.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.storage.noop; - -import com.google.auto.service.AutoService; -import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; -import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ServingStore; - -@AutoService(ServingStore.class) -public class NoOpServingStore implements ServingStore { - - @Override - public Write create(StorageSpec storageSpec, Specs specs) { - return new NoOpIO.Write(); - } - - @Override - public String getType() { - return "noop"; - } -} diff --git a/ingestion/src/main/java/feast/storage/noop/NoOpWarehouseStore.java b/ingestion/src/main/java/feast/storage/noop/NoOpWarehouseStore.java deleted file mode 100644 index f69e7c9fbb..0000000000 --- a/ingestion/src/main/java/feast/storage/noop/NoOpWarehouseStore.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.storage.noop; - -import com.google.auto.service.AutoService; -import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; -import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ServingStore; -import feast.storage.WarehouseStore; - -@AutoService(WarehouseStore.class) -public class NoOpWarehouseStore implements WarehouseStore { - - @Override - public Write create(StorageSpec storageSpec, Specs specs) { - return new NoOpIO.Write(); - } - - @Override - public String getType() { - return "noop"; - } -} diff --git a/ingestion/src/main/java/feast/storage/FeatureStore.java b/ingestion/src/main/java/feast/store/FeatureStoreFactory.java similarity index 81% rename from ingestion/src/main/java/feast/storage/FeatureStore.java rename to ingestion/src/main/java/feast/store/FeatureStoreFactory.java index a969c1476a..998df98b92 100644 --- a/ingestion/src/main/java/feast/storage/FeatureStore.java +++ b/ingestion/src/main/java/feast/store/FeatureStoreFactory.java @@ -15,14 +15,13 @@ * */ -package feast.storage; +package feast.store; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO; import feast.specs.StorageSpecProto.StorageSpec; -public interface FeatureStore { - FeatureIO.Write create(StorageSpec storageSpec, Specs specs); +public interface FeatureStoreFactory { + FeatureStoreWrite create(StorageSpec storageSpec, Specs specs); String getType(); } diff --git a/ingestion/src/main/java/feast/store/FeatureStoreWrite.java b/ingestion/src/main/java/feast/store/FeatureStoreWrite.java new file mode 100644 index 0000000000..759ff3bfa2 --- /dev/null +++ b/ingestion/src/main/java/feast/store/FeatureStoreWrite.java @@ -0,0 +1,25 @@ +/* + * Copyright 2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.store; + +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +public abstract class FeatureStoreWrite extends PTransform, PDone> {} diff --git a/ingestion/src/main/java/feast/storage/file/FileStoreOptions.java b/ingestion/src/main/java/feast/store/FileStoreOptions.java similarity index 97% rename from ingestion/src/main/java/feast/storage/file/FileStoreOptions.java rename to ingestion/src/main/java/feast/store/FileStoreOptions.java index 84a5caa137..5df669f0e9 100644 --- a/ingestion/src/main/java/feast/storage/file/FileStoreOptions.java +++ b/ingestion/src/main/java/feast/store/FileStoreOptions.java @@ -15,7 +15,7 @@ * */ -package feast.storage.file; +package feast.store; import com.fasterxml.jackson.annotation.JsonIgnore; import javax.validation.constraints.NotEmpty; diff --git a/ingestion/src/main/java/feast/storage/noop/NoOpIO.java b/ingestion/src/main/java/feast/store/NoOpIO.java similarity index 89% rename from ingestion/src/main/java/feast/storage/noop/NoOpIO.java rename to ingestion/src/main/java/feast/store/NoOpIO.java index 45e06e5801..65a68118c9 100644 --- a/ingestion/src/main/java/feast/storage/noop/NoOpIO.java +++ b/ingestion/src/main/java/feast/store/NoOpIO.java @@ -15,9 +15,8 @@ * */ -package feast.storage.noop; +package feast.store; -import feast.ingestion.transform.FeatureIO; import feast.ingestion.transform.fn.Identity; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import org.apache.beam.sdk.transforms.ParDo; @@ -26,7 +25,7 @@ public class NoOpIO { - public static class Write extends FeatureIO.Write { + public static class Write extends FeatureStoreWrite { @Override public PDone expand(PCollection input) { diff --git a/ingestion/src/main/java/feast/storage/file/TextFileFeatureIO.java b/ingestion/src/main/java/feast/store/TextFileDynamicIO.java similarity index 65% rename from ingestion/src/main/java/feast/storage/file/TextFileFeatureIO.java rename to ingestion/src/main/java/feast/store/TextFileDynamicIO.java index 740a269212..ee7ed10751 100644 --- a/ingestion/src/main/java/feast/storage/file/TextFileFeatureIO.java +++ b/ingestion/src/main/java/feast/store/TextFileDynamicIO.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ * */ -package feast.storage.file; +package feast.store; import lombok.AllArgsConstructor; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -23,43 +23,46 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.WriteFilesResult; import org.apache.beam.sdk.transforms.Contextful; -import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PDone; import org.joda.time.Duration; -import feast.ingestion.transform.FeatureIO; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -public class TextFileFeatureIO { - private TextFileFeatureIO() {} +public class TextFileDynamicIO { + + private TextFileDynamicIO() { + } @AllArgsConstructor - public static class Write extends FeatureIO.Write { + public static class Write extends PTransform>, PDone> { + private final FileStoreOptions options; - private final SerializableFunction toTextFunction; private final String suffix; - /** Writes to different file sinks based on a */ + /** + * Writes to different file sinks based on a + */ @Override - public PDone expand(PCollection input) { + public PDone expand(PCollection> input) { final String folderName = options.jobName != null ? options.jobName : "unknown-jobs"; - FileIO.Write write = - FileIO.writeDynamic() - .by((rowExtended) -> rowExtended.getRow().getEntityName()) + FileIO.Write> write = + FileIO.>writeDynamic() + .by(KV::getKey) .withDestinationCoder(StringUtf8Coder.of()) .withNaming( Contextful.fn( - (entityName) -> FileIO.Write.defaultNaming(folderName + "/" + entityName, suffix))) - .via(Contextful.fn(toTextFunction), Contextful.fn((entityName) -> TextIO.sink())) + (key) -> FileIO.Write.defaultNaming(folderName + "/" + key + "/part-", suffix))) + .via(Contextful.fn(KV::getValue), Contextful.fn((entityName) -> TextIO.sink())) .to(options.path); if (input.isBounded().equals(IsBounded.UNBOUNDED)) { - Window minuteWindow = - Window.into(FixedWindows.of(options.getWindowDuration())) + Window> minuteWindow = + Window.>into(FixedWindows.of(options.getWindowDuration())) .triggering(AfterWatermark.pastEndOfWindow()) .discardingFiredPanes() .withAllowedLateness(Duration.ZERO); diff --git a/ingestion/src/main/java/feast/storage/ServingStore.java b/ingestion/src/main/java/feast/store/errors/FeatureErrorsFactory.java similarity index 81% rename from ingestion/src/main/java/feast/storage/ServingStore.java rename to ingestion/src/main/java/feast/store/errors/FeatureErrorsFactory.java index aa08181381..6047e4c088 100644 --- a/ingestion/src/main/java/feast/storage/ServingStore.java +++ b/ingestion/src/main/java/feast/store/errors/FeatureErrorsFactory.java @@ -15,6 +15,8 @@ * */ -package feast.storage; +package feast.store.errors; -public interface ServingStore extends FeatureStore {} +import feast.store.FeatureStoreFactory; + +public interface FeatureErrorsFactory extends FeatureStoreFactory {} diff --git a/ingestion/src/main/java/feast/storage/service/ErrorsStoreService.java b/ingestion/src/main/java/feast/store/errors/FeatureErrorsFactoryService.java similarity index 59% rename from ingestion/src/main/java/feast/storage/service/ErrorsStoreService.java rename to ingestion/src/main/java/feast/store/errors/FeatureErrorsFactoryService.java index 0f2e8a30e0..4f5ad2c50a 100644 --- a/ingestion/src/main/java/feast/storage/service/ErrorsStoreService.java +++ b/ingestion/src/main/java/feast/store/errors/FeatureErrorsFactoryService.java @@ -15,37 +15,41 @@ * */ -package feast.storage.service; +package feast.store.errors; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import feast.storage.ErrorsStore; -import lombok.extern.slf4j.Slf4j; - import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; +import lombok.extern.slf4j.Slf4j; +/** + * Service class for fetching all the FeatureErrorsFactory instances available + */ @Slf4j -public class ErrorsStoreService { +public class FeatureErrorsFactoryService { - private static ServiceLoader serviceLoader = ServiceLoader.load(ErrorsStore.class); - private static List manuallyRegistered = new ArrayList<>(); + private static ServiceLoader serviceLoader = ServiceLoader + .load(FeatureErrorsFactory.class); + private static List manuallyRegistered = new ArrayList<>(); static { - for (ErrorsStore store : getAll()) { - log.info("ErrorsStore type found: " + store.getType()); + for (FeatureErrorsFactory store : getAll()) { + log.info("FeatureErrorsFactory type found: " + store.getType()); } } - public static List getAll() { + public static List getAll() { return Lists.newArrayList( Iterators.concat(manuallyRegistered.iterator(), serviceLoader.iterator())); } - /** Get store of the given subclass. */ - public static T get(Class clazz) { - for (ErrorsStore store : getAll()) { + /** + * Get store of the given subclass. + */ + public static T get(Class clazz) { + for (FeatureErrorsFactory store : getAll()) { if (clazz.isInstance(store)) { //noinspection unchecked return (T) store; @@ -54,7 +58,7 @@ public static T get(Class clazz) { return null; } - public static void register(ErrorsStore store) { + public static void register(FeatureErrorsFactory store) { manuallyRegistered.add(store); } } diff --git a/ingestion/src/main/java/feast/store/errors/json/JsonFileErrorsFactory.java b/ingestion/src/main/java/feast/store/errors/json/JsonFileErrorsFactory.java new file mode 100644 index 0000000000..3a56382b1f --- /dev/null +++ b/ingestion/src/main/java/feast/store/errors/json/JsonFileErrorsFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.store.errors.json; + +import com.google.auto.service.AutoService; +import feast.ingestion.model.Specs; +import feast.store.FeatureStoreWrite; +import feast.options.OptionsParser; +import feast.specs.StorageSpecProto.StorageSpec; +import feast.store.FileStoreOptions; +import feast.store.errors.FeatureErrorsFactory; +import lombok.AllArgsConstructor; + +@AutoService(FeatureErrorsFactory.class) +@AllArgsConstructor + +public class JsonFileErrorsFactory implements FeatureErrorsFactory { + + private static final String JSON_FILES_TYPE = "file.json"; + + @Override + public FeatureStoreWrite create(StorageSpec storageSpec, Specs specs) { + FileStoreOptions options = + OptionsParser.parse(storageSpec.getOptionsMap(), FileStoreOptions.class); + options.jobName = specs.getJobName(); + return new JsonFileErrorsWrite(options); + } + + @Override + public String getType() { + return JSON_FILES_TYPE; + } +} diff --git a/ingestion/src/main/java/feast/store/errors/json/JsonFileErrorsWrite.java b/ingestion/src/main/java/feast/store/errors/json/JsonFileErrorsWrite.java new file mode 100644 index 0000000000..ef95bf634b --- /dev/null +++ b/ingestion/src/main/java/feast/store/errors/json/JsonFileErrorsWrite.java @@ -0,0 +1,56 @@ +/* + * Copyright 2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.store.errors.json; + +import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import feast.store.FeatureStoreWrite; +import feast.store.FileStoreOptions; +import feast.store.TextFileDynamicIO; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import lombok.AllArgsConstructor; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +@AllArgsConstructor +public class JsonFileErrorsWrite extends FeatureStoreWrite { + + private FileStoreOptions options; + + @Override + public PDone expand(PCollection input) { + return input.apply("Map to strings", MapElements.into(kvs(strings(), strings())).via( + (rowExtended) -> { + try { + return KV.of( + rowExtended.getRow().getEntityName(), + JsonFormat.printer().omittingInsignificantWhitespace() + .print(rowExtended) + ); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + )).apply("Write Error Json Files", new TextFileDynamicIO.Write(options, ".json")); + } +} diff --git a/ingestion/src/main/java/feast/storage/stderr/LogIO.java b/ingestion/src/main/java/feast/store/errors/logging/LogIO.java similarity index 83% rename from ingestion/src/main/java/feast/storage/stderr/LogIO.java rename to ingestion/src/main/java/feast/store/errors/logging/LogIO.java index abd6f7c9ff..45a6c6826a 100644 --- a/ingestion/src/main/java/feast/storage/stderr/LogIO.java +++ b/ingestion/src/main/java/feast/store/errors/logging/LogIO.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,9 +15,9 @@ * */ -package feast.storage.stderr; +package feast.store.errors.logging; -import feast.ingestion.transform.FeatureIO; +import feast.store.FeatureStoreWrite; import feast.ingestion.transform.fn.LoggerDoFn; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import lombok.AllArgsConstructor; @@ -29,7 +29,7 @@ public class LogIO { @AllArgsConstructor - public static class Write extends FeatureIO.Write { + public static class Write extends FeatureStoreWrite { private Level level; diff --git a/ingestion/src/main/java/feast/storage/stderr/StderrErrorsStore.java b/ingestion/src/main/java/feast/store/errors/logging/StderrFeatureErrorsFactory.java similarity index 68% rename from ingestion/src/main/java/feast/storage/stderr/StderrErrorsStore.java rename to ingestion/src/main/java/feast/store/errors/logging/StderrFeatureErrorsFactory.java index 623b7383aa..0e904928db 100644 --- a/ingestion/src/main/java/feast/storage/stderr/StderrErrorsStore.java +++ b/ingestion/src/main/java/feast/store/errors/logging/StderrFeatureErrorsFactory.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,22 +15,22 @@ * */ -package feast.storage.stderr; +package feast.store.errors.logging; import com.google.auto.service.AutoService; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; +import feast.store.FeatureStoreWrite; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ErrorsStore; +import feast.store.errors.FeatureErrorsFactory; import org.slf4j.event.Level; -@AutoService(ErrorsStore.class) -public class StderrErrorsStore implements ErrorsStore { +@AutoService(FeatureErrorsFactory.class) +public class StderrFeatureErrorsFactory implements FeatureErrorsFactory { public static final String TYPE_STDERR = "stderr"; - + @Override - public Write create(StorageSpec storageSpec, Specs specs) { + public FeatureStoreWrite create(StorageSpec storageSpec, Specs specs) { return new LogIO.Write(Level.ERROR); } diff --git a/ingestion/src/main/java/feast/storage/stderr/StdoutErrorsStore.java b/ingestion/src/main/java/feast/store/errors/logging/StdoutFeatureErrorsFactory.java similarity index 68% rename from ingestion/src/main/java/feast/storage/stderr/StdoutErrorsStore.java rename to ingestion/src/main/java/feast/store/errors/logging/StdoutFeatureErrorsFactory.java index 7c5cc46437..afc3a35d1b 100644 --- a/ingestion/src/main/java/feast/storage/stderr/StdoutErrorsStore.java +++ b/ingestion/src/main/java/feast/store/errors/logging/StdoutFeatureErrorsFactory.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,22 +15,22 @@ * */ -package feast.storage.stderr; +package feast.store.errors.logging; import com.google.auto.service.AutoService; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; +import feast.store.FeatureStoreWrite; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ErrorsStore; +import feast.store.errors.FeatureErrorsFactory; import org.slf4j.event.Level; -@AutoService(ErrorsStore.class) -public class StdoutErrorsStore implements ErrorsStore { +@AutoService(FeatureErrorsFactory.class) +public class StdoutFeatureErrorsFactory implements FeatureErrorsFactory { public static final String TYPE_STDOUT = "stdout"; @Override - public Write create(StorageSpec storageSpec, Specs specs) { + public FeatureStoreWrite create(StorageSpec storageSpec, Specs specs) { return new LogIO.Write(Level.INFO); } diff --git a/ingestion/src/main/java/feast/storage/ErrorsStore.java b/ingestion/src/main/java/feast/store/serving/FeatureServingFactory.java similarity index 81% rename from ingestion/src/main/java/feast/storage/ErrorsStore.java rename to ingestion/src/main/java/feast/store/serving/FeatureServingFactory.java index 4fde4ec00f..ca9491973e 100644 --- a/ingestion/src/main/java/feast/storage/ErrorsStore.java +++ b/ingestion/src/main/java/feast/store/serving/FeatureServingFactory.java @@ -15,6 +15,8 @@ * */ -package feast.storage; +package feast.store.serving; -public interface ErrorsStore extends FeatureStore {} +import feast.store.FeatureStoreFactory; + +public interface FeatureServingFactory extends FeatureStoreFactory {} diff --git a/ingestion/src/main/java/feast/storage/service/ServingStoreService.java b/ingestion/src/main/java/feast/store/serving/FeatureServingFactoryService.java similarity index 59% rename from ingestion/src/main/java/feast/storage/service/ServingStoreService.java rename to ingestion/src/main/java/feast/store/serving/FeatureServingFactoryService.java index a1ab609594..d720c0dca1 100644 --- a/ingestion/src/main/java/feast/storage/service/ServingStoreService.java +++ b/ingestion/src/main/java/feast/store/serving/FeatureServingFactoryService.java @@ -15,7 +15,7 @@ * */ -package feast.storage.service; +package feast.store.serving; import avro.shaded.com.google.common.collect.Lists; import com.google.common.collect.Iterators; @@ -23,27 +23,33 @@ import java.util.List; import java.util.ServiceLoader; import lombok.extern.slf4j.Slf4j; -import feast.storage.ServingStore; +/** + * Service class for fetching all the FeatureServingFactory instances available + */ @Slf4j -public class ServingStoreService { - private static ServiceLoader serviceLoader = ServiceLoader.load(ServingStore.class); - private static List manuallyRegistered = new ArrayList<>(); +public class FeatureServingFactoryService { + + private static ServiceLoader serviceLoader = ServiceLoader + .load(FeatureServingFactory.class); + private static List manuallyRegistered = new ArrayList<>(); static { - for (ServingStore store : getAll()) { - log.info("ServingStore type found: " + store.getType()); + for (FeatureServingFactory store : getAll()) { + log.info("FeatureServingFactory type found: " + store.getType()); } } - public static List getAll() { + public static List getAll() { return Lists.newArrayList( Iterators.concat(manuallyRegistered.iterator(), serviceLoader.iterator())); } - /** Get store of the given subclass. */ - public static T get(Class clazz) { - for (ServingStore store : getAll()) { + /** + * Get store of the given subclass. + */ + public static T get(Class clazz) { + for (FeatureServingFactory store : getAll()) { if (clazz.isInstance(store)) { //noinspection unchecked return (T) store; @@ -52,7 +58,7 @@ public static T get(Class clazz) { return null; } - public static void register(ServingStore store) { + public static void register(FeatureServingFactory store) { manuallyRegistered.add(store); } } diff --git a/ingestion/src/main/java/feast/storage/FeatureStoreMigration.java b/ingestion/src/main/java/feast/store/serving/FeatureServingStoreClient.java similarity index 54% rename from ingestion/src/main/java/feast/storage/FeatureStoreMigration.java rename to ingestion/src/main/java/feast/store/serving/FeatureServingStoreClient.java index a35e7db3fc..5aa7251b89 100644 --- a/ingestion/src/main/java/feast/storage/FeatureStoreMigration.java +++ b/ingestion/src/main/java/feast/store/serving/FeatureServingStoreClient.java @@ -15,19 +15,27 @@ * */ -package feast.storage; +package feast.store.serving; import feast.specs.FeatureSpecProto.FeatureSpec; +import feast.types.FeatureRowProto.FeatureRow; +import java.util.Collection; import java.util.List; -import lombok.Value; /** - * This class is for notifying storage plugins about new or changed FeatureSpecs so they can update - * appropriate schemas. + * Abstraction of Client for Feast Serving Store */ -@Value -public class FeatureStoreMigration { - List create; - List drop; - List update; -} +public interface FeatureServingStoreClient { + + /** + * Get a the latest FeatureRow containing features for a single entity id + */ + FeatureRow get(String entityName, String entityId, List featureSpecs); + + /** + * Get a the latest FeatureRow for containing features for each entity id + */ + List get( + String entityName, Collection entityIds, List featureSpecs); + +} \ No newline at end of file diff --git a/ingestion/src/main/java/feast/storage/bigtable/BigTableFeatureOptions.java b/ingestion/src/main/java/feast/store/serving/bigtable/BigTableFeatureOptions.java similarity index 95% rename from ingestion/src/main/java/feast/storage/bigtable/BigTableFeatureOptions.java rename to ingestion/src/main/java/feast/store/serving/bigtable/BigTableFeatureOptions.java index 6997ae4397..68bb7e6ec5 100644 --- a/ingestion/src/main/java/feast/storage/bigtable/BigTableFeatureOptions.java +++ b/ingestion/src/main/java/feast/store/serving/bigtable/BigTableFeatureOptions.java @@ -15,7 +15,7 @@ * */ -package feast.storage.bigtable; +package feast.store.serving.bigtable; import feast.options.Options; diff --git a/ingestion/src/main/java/feast/storage/bigtable/BigTableServingStoreFactory.java b/ingestion/src/main/java/feast/store/serving/bigtable/BigTableServingStoreFactory.java similarity index 79% rename from ingestion/src/main/java/feast/storage/bigtable/BigTableServingStoreFactory.java rename to ingestion/src/main/java/feast/store/serving/bigtable/BigTableServingStoreFactory.java index e6aa7f3232..9583a5d431 100644 --- a/ingestion/src/main/java/feast/storage/bigtable/BigTableServingStoreFactory.java +++ b/ingestion/src/main/java/feast/store/serving/bigtable/BigTableServingStoreFactory.java @@ -15,22 +15,23 @@ * */ -package feast.storage.bigtable; +package feast.store.serving.bigtable; import com.google.auto.service.AutoService; import com.google.common.base.Preconditions; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; +import feast.store.FeatureStoreWrite; import feast.options.OptionsParser; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ServingStore; +import feast.store.serving.FeatureServingFactory; + +@AutoService(FeatureServingFactory.class) +public class BigTableServingStoreFactory implements FeatureServingFactory { -@AutoService(ServingStore.class) -public class BigTableServingStoreFactory implements ServingStore { public static String TYPE_BIGTABLE = "bigtable"; @Override - public Write create(StorageSpec storageSpec, Specs specs) { + public FeatureStoreWrite create(StorageSpec storageSpec, Specs specs) { Preconditions.checkArgument(storageSpec.getType().equals(getType())); BigTableStoreOptions options = diff --git a/ingestion/src/main/java/feast/storage/bigtable/BigTableStoreOptions.java b/ingestion/src/main/java/feast/store/serving/bigtable/BigTableStoreOptions.java similarity index 95% rename from ingestion/src/main/java/feast/storage/bigtable/BigTableStoreOptions.java rename to ingestion/src/main/java/feast/store/serving/bigtable/BigTableStoreOptions.java index fcfcace69b..9e1bbfb8b4 100644 --- a/ingestion/src/main/java/feast/storage/bigtable/BigTableStoreOptions.java +++ b/ingestion/src/main/java/feast/store/serving/bigtable/BigTableStoreOptions.java @@ -15,7 +15,7 @@ * */ -package feast.storage.bigtable; +package feast.store.serving.bigtable; import java.io.Serializable; import javax.validation.constraints.NotEmpty; diff --git a/ingestion/src/main/java/feast/storage/bigtable/FeatureRowBigTableIO.java b/ingestion/src/main/java/feast/store/serving/bigtable/FeatureRowBigTableIO.java similarity index 95% rename from ingestion/src/main/java/feast/storage/bigtable/FeatureRowBigTableIO.java rename to ingestion/src/main/java/feast/store/serving/bigtable/FeatureRowBigTableIO.java index 7437f59c55..4dc289aeb2 100644 --- a/ingestion/src/main/java/feast/storage/bigtable/FeatureRowBigTableIO.java +++ b/ingestion/src/main/java/feast/store/serving/bigtable/FeatureRowBigTableIO.java @@ -15,12 +15,12 @@ * */ -package feast.storage.bigtable; +package feast.store.serving.bigtable; import com.google.cloud.bigtable.beam.CloudBigtableConfiguration; import com.google.cloud.bigtable.beam.CloudBigtableIO; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO; +import feast.store.FeatureStoreWrite; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import java.util.Collections; import lombok.extern.slf4j.Slf4j; @@ -34,7 +34,7 @@ @Slf4j public class FeatureRowBigTableIO { - public static class Write extends FeatureIO.Write { + public static class Write extends FeatureStoreWrite { private BigTableStoreOptions bigTableOptions; private Specs specs; diff --git a/ingestion/src/main/java/feast/storage/bigtable/FeatureRowToBigTableMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/bigtable/FeatureRowToBigTableMutationDoFn.java similarity index 99% rename from ingestion/src/main/java/feast/storage/bigtable/FeatureRowToBigTableMutationDoFn.java rename to ingestion/src/main/java/feast/store/serving/bigtable/FeatureRowToBigTableMutationDoFn.java index bfd44ceda8..1d14b1201e 100644 --- a/ingestion/src/main/java/feast/storage/bigtable/FeatureRowToBigTableMutationDoFn.java +++ b/ingestion/src/main/java/feast/store/serving/bigtable/FeatureRowToBigTableMutationDoFn.java @@ -15,7 +15,7 @@ * */ -package feast.storage.bigtable; +package feast.store.serving.bigtable; import com.google.common.base.Charsets; import feast.SerializableCache; diff --git a/ingestion/src/main/java/feast/storage/redis/FeatureRowRedisIO.java b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowRedisIO.java similarity index 88% rename from ingestion/src/main/java/feast/storage/redis/FeatureRowRedisIO.java rename to ingestion/src/main/java/feast/store/serving/redis/FeatureRowRedisIO.java index d58abdc0f5..9c5772140b 100644 --- a/ingestion/src/main/java/feast/storage/redis/FeatureRowRedisIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowRedisIO.java @@ -15,11 +15,11 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO; -import feast.storage.redis.RedisCustomIO.RedisMutation; +import feast.store.FeatureStoreWrite; +import feast.store.serving.redis.RedisCustomIO.RedisMutation; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -27,7 +27,8 @@ public class FeatureRowRedisIO { - public static class Write extends FeatureIO.Write { + public static class Write extends FeatureStoreWrite { + private final RedisStoreOptions options; private final Specs specs; diff --git a/ingestion/src/main/java/feast/storage/redis/FeatureRowToRedisMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java similarity index 96% rename from ingestion/src/main/java/feast/storage/redis/FeatureRowToRedisMutationDoFn.java rename to ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java index dad225d7ba..dbe57bff9f 100644 --- a/ingestion/src/main/java/feast/storage/redis/FeatureRowToRedisMutationDoFn.java +++ b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java @@ -15,7 +15,7 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; import static com.google.common.base.Preconditions.checkArgument; @@ -26,8 +26,8 @@ import feast.specs.FeatureSpecProto.FeatureSpec; import feast.storage.RedisProto.RedisBucketKey; import feast.storage.RedisProto.RedisBucketValue; -import feast.storage.redis.RedisCustomIO.Method; -import feast.storage.redis.RedisCustomIO.RedisMutation; +import feast.store.serving.redis.RedisCustomIO.Method; +import feast.store.serving.redis.RedisCustomIO.RedisMutation; import feast.types.FeatureProto.Feature; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import feast.types.FeatureRowProto.FeatureRow; diff --git a/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java similarity index 99% rename from ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java rename to ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index d9b8261093..4dc53467c7 100644 --- a/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -16,7 +16,7 @@ */ // package io.suryawirawan.henry.beam.redis.io; -package feast.storage.redis; +package feast.store.serving.redis; import lombok.Builder; import lombok.Data; diff --git a/ingestion/src/main/java/feast/storage/redis/RedisFeatureOptions.java b/ingestion/src/main/java/feast/store/serving/redis/RedisFeatureOptions.java similarity index 97% rename from ingestion/src/main/java/feast/storage/redis/RedisFeatureOptions.java rename to ingestion/src/main/java/feast/store/serving/redis/RedisFeatureOptions.java index e4654e2a43..cf868cc47b 100644 --- a/ingestion/src/main/java/feast/storage/redis/RedisFeatureOptions.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisFeatureOptions.java @@ -15,7 +15,7 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; import com.fasterxml.jackson.annotation.JsonIgnore; import org.joda.time.Duration; diff --git a/ingestion/src/main/java/feast/storage/redis/RedisServingStore.java b/ingestion/src/main/java/feast/store/serving/redis/RedisServingFactory.java similarity index 80% rename from ingestion/src/main/java/feast/storage/redis/RedisServingStore.java rename to ingestion/src/main/java/feast/store/serving/redis/RedisServingFactory.java index 6b5fc23ce5..ca023c616b 100644 --- a/ingestion/src/main/java/feast/storage/redis/RedisServingStore.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisServingFactory.java @@ -15,22 +15,23 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; import com.google.auto.service.AutoService; import com.google.common.base.Preconditions; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; +import feast.store.FeatureStoreWrite; import feast.options.OptionsParser; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.ServingStore; +import feast.store.serving.FeatureServingFactory; + +@AutoService(FeatureServingFactory.class) +public class RedisServingFactory implements FeatureServingFactory { -@AutoService(ServingStore.class) -public class RedisServingStore implements ServingStore { public static final String TYPE_REDIS = "redis"; @Override - public Write create(StorageSpec storageSpec, Specs specs) { + public FeatureStoreWrite create(StorageSpec storageSpec, Specs specs) { Preconditions.checkArgument( storageSpec.getType().equals(TYPE_REDIS), "Storage spec type was not " + TYPE_REDIS); RedisStoreOptions options = diff --git a/ingestion/src/main/java/feast/storage/redis/RedisStoreOptions.java b/ingestion/src/main/java/feast/store/serving/redis/RedisStoreOptions.java similarity index 97% rename from ingestion/src/main/java/feast/storage/redis/RedisStoreOptions.java rename to ingestion/src/main/java/feast/store/serving/redis/RedisStoreOptions.java index eecffac5b2..9b257d40f0 100644 --- a/ingestion/src/main/java/feast/storage/redis/RedisStoreOptions.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisStoreOptions.java @@ -15,7 +15,7 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; import feast.options.Options; import javax.validation.constraints.NotEmpty; diff --git a/ingestion/src/main/java/feast/storage/WarehouseStore.java b/ingestion/src/main/java/feast/store/warehouse/FeatureWarehouseFactory.java similarity index 80% rename from ingestion/src/main/java/feast/storage/WarehouseStore.java rename to ingestion/src/main/java/feast/store/warehouse/FeatureWarehouseFactory.java index b4bda0a33d..72b5617962 100644 --- a/ingestion/src/main/java/feast/storage/WarehouseStore.java +++ b/ingestion/src/main/java/feast/store/warehouse/FeatureWarehouseFactory.java @@ -15,6 +15,10 @@ * */ -package feast.storage; +package feast.store.warehouse; -public interface WarehouseStore extends FeatureStore {} +import feast.store.FeatureStoreFactory; + +public interface FeatureWarehouseFactory extends FeatureStoreFactory { + +} diff --git a/ingestion/src/main/java/feast/storage/service/WarehouseStoreService.java b/ingestion/src/main/java/feast/store/warehouse/FeatureWarehouseFactoryService.java similarity index 60% rename from ingestion/src/main/java/feast/storage/service/WarehouseStoreService.java rename to ingestion/src/main/java/feast/store/warehouse/FeatureWarehouseFactoryService.java index 6236be0ab0..06ae723613 100644 --- a/ingestion/src/main/java/feast/storage/service/WarehouseStoreService.java +++ b/ingestion/src/main/java/feast/store/warehouse/FeatureWarehouseFactoryService.java @@ -15,7 +15,7 @@ * */ -package feast.storage.service; +package feast.store.warehouse; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -23,28 +23,30 @@ import java.util.List; import java.util.ServiceLoader; import lombok.extern.slf4j.Slf4j; -import feast.storage.WarehouseStore; +/** + * Service class for fetching all the FeatureWarehouseFactory instances available + */ @Slf4j -public class WarehouseStoreService { - private static ServiceLoader serviceLoader = - ServiceLoader.load(WarehouseStore.class); - private static List manuallyRegistered = new ArrayList<>(); +public class FeatureWarehouseFactoryService { + private static ServiceLoader serviceLoader = + ServiceLoader.load(FeatureWarehouseFactory.class); + private static List manuallyRegistered = new ArrayList<>(); static { - for (WarehouseStore store : getAll()) { - log.info("WarehouseStore type found: " + store.getType()); + for (FeatureWarehouseFactory store : getAll()) { + log.info("FeatureWarehouseFactory type found: " + store.getType()); } } - public static List getAll() { + public static List getAll() { return Lists.newArrayList( Iterators.concat(manuallyRegistered.iterator(), serviceLoader.iterator())); } /** Get store of the given subclass. */ - public static T get(Class clazz) { - for (WarehouseStore store : getAll()) { + public static T get(Class clazz) { + for (FeatureWarehouseFactory store : getAll()) { if (clazz.isInstance(store)) { //noinspection unchecked return (T) store; @@ -53,7 +55,7 @@ public static T get(Class clazz) { return null; } - public static void register(WarehouseStore store) { + public static void register(FeatureWarehouseFactory store) { manuallyRegistered.add(store); } } diff --git a/ingestion/src/main/java/feast/storage/bigquery/BigQueryStoreOptions.java b/ingestion/src/main/java/feast/store/warehouse/bigquery/BigQueryStoreOptions.java similarity index 93% rename from ingestion/src/main/java/feast/storage/bigquery/BigQueryStoreOptions.java rename to ingestion/src/main/java/feast/store/warehouse/bigquery/BigQueryStoreOptions.java index 95c70be24b..f82efb58e8 100644 --- a/ingestion/src/main/java/feast/storage/bigquery/BigQueryStoreOptions.java +++ b/ingestion/src/main/java/feast/store/warehouse/bigquery/BigQueryStoreOptions.java @@ -15,9 +15,8 @@ * */ -package feast.storage.bigquery; +package feast.store.warehouse.bigquery; -import java.io.Serializable; import javax.validation.constraints.NotEmpty; import feast.options.Options; diff --git a/ingestion/src/main/java/feast/storage/bigquery/BigQueryWarehouseStore.java b/ingestion/src/main/java/feast/store/warehouse/bigquery/BigQueryWarehouseFactory.java similarity index 79% rename from ingestion/src/main/java/feast/storage/bigquery/BigQueryWarehouseStore.java rename to ingestion/src/main/java/feast/store/warehouse/bigquery/BigQueryWarehouseFactory.java index a7682908bd..33ad75a32b 100644 --- a/ingestion/src/main/java/feast/storage/bigquery/BigQueryWarehouseStore.java +++ b/ingestion/src/main/java/feast/store/warehouse/bigquery/BigQueryWarehouseFactory.java @@ -15,22 +15,22 @@ * */ -package feast.storage.bigquery; +package feast.store.warehouse.bigquery; import com.google.auto.service.AutoService; import com.google.common.base.Preconditions; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Write; +import feast.store.FeatureStoreWrite; import feast.options.OptionsParser; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.WarehouseStore; +import feast.store.warehouse.FeatureWarehouseFactory; -@AutoService(WarehouseStore.class) -public class BigQueryWarehouseStore implements WarehouseStore { +@AutoService(FeatureWarehouseFactory.class) +public class BigQueryWarehouseFactory implements FeatureWarehouseFactory { public static String TYPE_BIGQUERY = "bigquery"; @Override - public Write create(StorageSpec storageSpec, Specs specs) { + public FeatureStoreWrite create(StorageSpec storageSpec, Specs specs) { Preconditions.checkArgument( storageSpec.getType().equals(TYPE_BIGQUERY), "Storage spec type was not " + TYPE_BIGQUERY); diff --git a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java b/ingestion/src/main/java/feast/store/warehouse/bigquery/FeatureRowBigQueryIO.java similarity index 97% rename from ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java rename to ingestion/src/main/java/feast/store/warehouse/bigquery/FeatureRowBigQueryIO.java index b5e039cc9e..c8ec8a7825 100644 --- a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java +++ b/ingestion/src/main/java/feast/store/warehouse/bigquery/FeatureRowBigQueryIO.java @@ -15,13 +15,13 @@ * */ -package feast.storage.bigquery; +package feast.store.warehouse.bigquery; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Strings; import com.google.inject.Inject; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO; +import feast.store.FeatureStoreWrite; import feast.ingestion.transform.SplitFeatures.SingleOutputSplit; import feast.specs.EntitySpecProto.EntitySpec; import feast.specs.FeatureSpecProto.FeatureSpec; @@ -45,7 +45,7 @@ @Slf4j public class FeatureRowBigQueryIO { - public static class Write extends FeatureIO.Write { + public static class Write extends FeatureStoreWrite { private final BigQueryStoreOptions bigQueryOptions; private final Specs specs; diff --git a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowToBigQueryTableRowDoFn.java b/ingestion/src/main/java/feast/store/warehouse/bigquery/FeatureRowToBigQueryTableRowDoFn.java similarity index 96% rename from ingestion/src/main/java/feast/storage/bigquery/FeatureRowToBigQueryTableRowDoFn.java rename to ingestion/src/main/java/feast/store/warehouse/bigquery/FeatureRowToBigQueryTableRowDoFn.java index a05569f61d..c666d4d71a 100644 --- a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowToBigQueryTableRowDoFn.java +++ b/ingestion/src/main/java/feast/store/warehouse/bigquery/FeatureRowToBigQueryTableRowDoFn.java @@ -15,13 +15,11 @@ * */ -package feast.storage.bigquery; +package feast.store.warehouse.bigquery; import com.google.api.services.bigquery.model.TableRow; import feast.ingestion.util.DateUtil; import feast.ingestion.model.Specs; -import feast.ingestion.model.Specs; -import feast.ingestion.util.DateUtil; import feast.specs.FeatureSpecProto.FeatureSpec; import feast.types.FeatureProto.Feature; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; diff --git a/ingestion/src/main/java/feast/storage/bigquery/ValueBigQueryBuilder.java b/ingestion/src/main/java/feast/store/warehouse/bigquery/ValueBigQueryBuilder.java similarity index 99% rename from ingestion/src/main/java/feast/storage/bigquery/ValueBigQueryBuilder.java rename to ingestion/src/main/java/feast/store/warehouse/bigquery/ValueBigQueryBuilder.java index ab292e3e0d..34700dc96b 100644 --- a/ingestion/src/main/java/feast/storage/bigquery/ValueBigQueryBuilder.java +++ b/ingestion/src/main/java/feast/store/warehouse/bigquery/ValueBigQueryBuilder.java @@ -15,7 +15,7 @@ * */ -package feast.storage.bigquery; +package feast.store.warehouse.bigquery; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.protobuf.ByteString; diff --git a/ingestion/src/main/java/feast/store/warehouse/json/JsonFileWarehouseFactory.java b/ingestion/src/main/java/feast/store/warehouse/json/JsonFileWarehouseFactory.java new file mode 100644 index 0000000000..5572772fcc --- /dev/null +++ b/ingestion/src/main/java/feast/store/warehouse/json/JsonFileWarehouseFactory.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.store.warehouse.json; + +import com.google.auto.service.AutoService; +import feast.ingestion.model.Specs; +import feast.store.FeatureStoreWrite; +import feast.options.OptionsParser; +import feast.specs.StorageSpecProto.StorageSpec; +import feast.store.FileStoreOptions; +import feast.store.warehouse.FeatureWarehouseFactory; +import lombok.AllArgsConstructor; + +@AutoService(FeatureWarehouseFactory.class) +@AllArgsConstructor +public class JsonFileWarehouseFactory implements FeatureWarehouseFactory { + + private static final String JSON_FILES_TYPE = "file.json"; + + @Override + public FeatureStoreWrite create(StorageSpec storageSpec, Specs specs) { + FileStoreOptions options = + OptionsParser.parse(storageSpec.getOptionsMap(), FileStoreOptions.class); + options.jobName = specs.getJobName(); + return new JsonFileWarehouseWrite(options); + } + + @Override + public String getType() { + return JSON_FILES_TYPE; + } +} diff --git a/ingestion/src/main/java/feast/store/warehouse/json/JsonFileWarehouseWrite.java b/ingestion/src/main/java/feast/store/warehouse/json/JsonFileWarehouseWrite.java new file mode 100644 index 0000000000..fd3050a049 --- /dev/null +++ b/ingestion/src/main/java/feast/store/warehouse/json/JsonFileWarehouseWrite.java @@ -0,0 +1,56 @@ +/* + * Copyright 2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.store.warehouse.json; + +import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import feast.store.FeatureStoreWrite; +import feast.store.FileStoreOptions; +import feast.store.TextFileDynamicIO; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import lombok.AllArgsConstructor; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +@AllArgsConstructor +public class JsonFileWarehouseWrite extends FeatureStoreWrite { + + private FileStoreOptions options; + + @Override + public PDone expand(PCollection input) { + return input.apply("Map to strings", MapElements.into(kvs(strings(), strings())).via( + (rowExtended) -> { + try { + return KV.of( + rowExtended.getRow().getEntityName(), + JsonFormat.printer().omittingInsignificantWhitespace() + .print(rowExtended.getRow()) + ); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + )).apply("Write Warehouse Json Files", new TextFileDynamicIO.Write(options, ".json")); + } +} diff --git a/ingestion/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/FeastPubsubHelper.java b/ingestion/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/FeastPubsubHelper.java deleted file mode 100644 index 9f2853bc47..0000000000 --- a/ingestion/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/FeastPubsubHelper.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.beam.sdk.io.gcp.pubsub; - -import com.google.common.collect.Lists; -import com.google.protobuf.Message; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; -import org.apache.beam.sdk.options.PipelineOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This PubSub helper exists because including the canonical client libraries can result in - * dependency problems. We place it in the beam package because we need protected methods. - */ -public class FeastPubsubHelper { - - private final PipelineOptions options; - private transient PubsubClient client; - - public FeastPubsubHelper(PipelineOptions options) { - this.options = options; - } - - private PubsubClient getClient() throws IOException { - if (client == null) { - client = PubsubJsonClient.FACTORY.newClient(null, null, options.as(PubsubOptions.class)); - } - return client; - } - - public int publish(TopicPath topicPath, Message message) throws IOException { - List messages = - Lists.newArrayList( - new OutgoingMessage( - message.toByteArray(), new HashMap<>(), System.currentTimeMillis(), null)); - return getClient().publish(topicPath, messages); - } -} diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java index bd7460e637..6379e8dfad 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java @@ -19,7 +19,7 @@ import static feast.FeastMatchers.hasCount; import static feast.NormalizeFeatureRows.normalize; -import static feast.storage.MockErrorsStore.MOCK_ERRORS_STORE_TYPE; +import static feast.store.MockFeatureErrorsFactory.MOCK_ERRORS_STORE_TYPE; import static org.junit.Assert.assertEquals; import com.google.common.base.Charsets; @@ -38,12 +38,12 @@ import feast.ingestion.service.SpecRetrievalException; import feast.ingestion.util.ProtoUtil; import feast.specs.ImportSpecProto.ImportSpec; -import feast.storage.MockErrorsStore; -import feast.storage.MockServingStore; -import feast.storage.MockWarehouseStore; -import feast.storage.service.ErrorsStoreService; -import feast.storage.service.ServingStoreService; -import feast.storage.service.WarehouseStoreService; +import feast.store.MockFeatureErrorsFactory; +import feast.store.MockServingFactory; +import feast.store.MockWarehouseFactory; +import feast.store.errors.FeatureErrorsFactoryService; +import feast.store.serving.FeatureServingFactoryService; +import feast.store.warehouse.FeatureWarehouseFactoryService; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import feast.types.FeatureRowProto.FeatureRow; import feast.types.GranularityProto.Granularity; @@ -121,16 +121,21 @@ public void testImportCSV() throws IOException { job.expand(); PCollection writtenToServing = - PCollectionList.of(ServingStoreService.get(MockServingStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureServingFactoryService.get(MockServingFactory.class).getWrite() + .getInputs()) .apply("flatten serving input", Flatten.pCollections()); PCollection writtenToWarehouse = PCollectionList.of( - WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) + FeatureWarehouseFactoryService.get(MockWarehouseFactory.class).getWrite() + .getInputs()) .apply("flatten warehouse input", Flatten.pCollections()); PCollection writtenToErrors = - PCollectionList.of(ErrorsStoreService.get(MockErrorsStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureErrorsFactoryService.get(MockFeatureErrorsFactory.class).getWrite() + .getInputs()) .apply("flatten errors input", Flatten.pCollections()); List expectedRows = @@ -212,16 +217,21 @@ public void testImportCSV_withSample1() throws IOException { job.expand(); PCollection writtenToServing = - PCollectionList.of(ServingStoreService.get(MockServingStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureServingFactoryService.get(MockServingFactory.class).getWrite() + .getInputs()) .apply("flatten serving input", Flatten.pCollections()); PCollection writtenToWarehouse = PCollectionList.of( - WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) + FeatureWarehouseFactoryService.get(MockWarehouseFactory.class).getWrite() + .getInputs()) .apply("flatten warehouse input", Flatten.pCollections()); PCollection writtenToErrors = - PCollectionList.of(ErrorsStoreService.get(MockErrorsStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureErrorsFactoryService.get(MockFeatureErrorsFactory.class).getWrite() + .getInputs()) .apply("flatten errors input", Flatten.pCollections()); PAssert.that(writtenToServing).satisfies(hasCount(1)); @@ -271,16 +281,21 @@ public void testImportCSV_withCoalesceRows() throws IOException { job.expand(); PCollection writtenToServing = - PCollectionList.of(ServingStoreService.get(MockServingStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureServingFactoryService.get(MockServingFactory.class).getWrite() + .getInputs()) .apply("flatten serving input", Flatten.pCollections()); PCollection writtenToWarehouse = PCollectionList.of( - WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) + FeatureWarehouseFactoryService.get(MockWarehouseFactory.class).getWrite() + .getInputs()) .apply("flatten warehouse input", Flatten.pCollections()); PCollection writtenToErrors = - PCollectionList.of(ErrorsStoreService.get(MockErrorsStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureErrorsFactoryService.get(MockFeatureErrorsFactory.class).getWrite() + .getInputs()) .apply("flatten errors input", Flatten.pCollections()); PAssert.that(writtenToErrors).satisfies(hasCount(0)); @@ -394,11 +409,15 @@ public void testImportWithErrors() throws IOException { job.expand(); PCollection writtenToServing = - PCollectionList.of(ServingStoreService.get(MockServingStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureServingFactoryService.get(MockServingFactory.class).getWrite() + .getInputs()) .apply("flatten serving input", Flatten.pCollections()); PCollection writtenToErrors = - PCollectionList.of(ErrorsStoreService.get(MockErrorsStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureErrorsFactoryService.get(MockFeatureErrorsFactory.class).getWrite() + .getInputs()) .apply("flatten errors input", Flatten.pCollections()); PAssert.that(writtenToErrors) @@ -459,11 +478,15 @@ public void testImportWithoutWarehouseStore() throws IOException { job.expand(); PCollection writtenToServing = - PCollectionList.of(ServingStoreService.get(MockServingStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureServingFactoryService.get(MockServingFactory.class).getWrite() + .getInputs()) .apply("flatten serving input", Flatten.pCollections()); PCollection writtenToErrors = - PCollectionList.of(ErrorsStoreService.get(MockErrorsStore.class).getWrite().getInputs()) + PCollectionList + .of(FeatureErrorsFactoryService.get(MockFeatureErrorsFactory.class).getWrite() + .getInputs()) .apply("flatten errors input", Flatten.pCollections()); PAssert.that(writtenToErrors) diff --git a/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java b/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java index 83c0ef3e02..bddb79515b 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java @@ -18,14 +18,14 @@ package feast.ingestion.transform; import static feast.ingestion.model.Errors.toError; -import static feast.storage.MockErrorsStore.MOCK_ERRORS_STORE_TYPE; +import static feast.store.MockFeatureErrorsFactory.MOCK_ERRORS_STORE_TYPE; import feast.ingestion.model.Specs; import feast.ingestion.options.ImportJobPipelineOptions; -import feast.storage.MockErrorsStore; -import feast.storage.service.ErrorsStoreService; -import feast.storage.stderr.StderrErrorsStore; -import feast.storage.stderr.StdoutErrorsStore; +import feast.store.MockFeatureErrorsFactory; +import feast.store.errors.FeatureErrorsFactoryService; +import feast.store.errors.logging.StderrFeatureErrorsFactory; +import feast.store.errors.logging.StdoutFeatureErrorsFactory; import feast.types.FeatureRowExtendedProto.Attempt; import feast.types.FeatureRowExtendedProto.Error; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; @@ -75,7 +75,7 @@ private FeatureRowExtended errorOf(String transform, Throwable cause) { @Test public void shouldWriteToGivenErrorsStore() { - MockErrorsStore mockStore = new MockErrorsStore(); + MockFeatureErrorsFactory mockStore = new MockFeatureErrorsFactory(); options.setErrorsStoreType(MOCK_ERRORS_STORE_TYPE); ErrorsStoreTransform transform = new ErrorsStoreTransform(options, specs, Lists.newArrayList(mockStore)); @@ -91,9 +91,9 @@ public void shouldWriteToGivenErrorsStore() { @Test public void logErrorsToStdErr() { - options.setErrorsStoreType(StderrErrorsStore.TYPE_STDERR); + options.setErrorsStoreType(StderrFeatureErrorsFactory.TYPE_STDERR); ErrorsStoreTransform transform = - new ErrorsStoreTransform(options, specs, ErrorsStoreService.getAll()); + new ErrorsStoreTransform(options, specs, FeatureErrorsFactoryService.getAll()); inputs.apply(transform); pipeline.run(); } @@ -101,9 +101,9 @@ public void logErrorsToStdErr() { @Test public void logErrorsToStdOut() { - options.setErrorsStoreType(StdoutErrorsStore.TYPE_STDOUT); + options.setErrorsStoreType(StdoutFeatureErrorsFactory.TYPE_STDOUT); ErrorsStoreTransform transform = - new ErrorsStoreTransform(options, specs, ErrorsStoreService.getAll()); + new ErrorsStoreTransform(options, specs, FeatureErrorsFactoryService.getAll()); inputs.apply(transform); pipeline.run(); } @@ -112,7 +112,7 @@ public void logErrorsToStdOut() { public void logToNull() { //options.setErrorsStoreType(...); // no errors store type set ErrorsStoreTransform transform = - new ErrorsStoreTransform(options, specs, ErrorsStoreService.getAll()); + new ErrorsStoreTransform(options, specs, FeatureErrorsFactoryService.getAll()); inputs.apply(transform); pipeline.run(); } diff --git a/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java index 7758a77489..183f117156 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/SplitOutputByStoreTest.java @@ -26,7 +26,6 @@ import feast.ingestion.model.Specs; import feast.ingestion.model.Values; import feast.ingestion.service.MockSpecService; -import feast.ingestion.transform.FeatureIO.Write; import feast.ingestion.transform.SplitOutputByStore.WriteTags; import feast.ingestion.values.PFeatureRows; import feast.specs.EntitySpecProto.EntitySpec; @@ -37,9 +36,10 @@ import feast.specs.ImportSpecProto.ImportSpec; import feast.specs.ImportSpecProto.Schema; import feast.specs.StorageSpecProto.StorageSpec; -import feast.storage.FeatureStore; -import feast.storage.MockFeatureStore; -import feast.storage.MockTransforms; +import feast.store.FeatureStoreWrite; +import feast.store.FeatureStoreFactory; +import feast.store.MockFeatureStore; +import feast.store.MockTransforms; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import feast.types.FeatureRowProto.FeatureRow; import java.util.Collections; @@ -78,7 +78,7 @@ public void testSplit() { DataStores.newBuilder().setServing(DataStore.newBuilder().setId("store2"))).build()) .addStorage(StorageSpec.newBuilder().setId("store1").setType("type1").build()) .addStorage(StorageSpec.newBuilder().setId("store2").setType("type2").build()); - List stores = + List stores = Lists.newArrayList(new MockFeatureStore("type1"), new MockFeatureStore("type2")); Specs specs = Specs.of( @@ -186,7 +186,7 @@ public void testSplitWhereFeature2HasNoStoreId() { "store1", StorageSpec.newBuilder().setId("store1").setType("type1").build()); specService.storageSpecs.put( "store2", StorageSpec.newBuilder().setId("store2").setType("type2").build()); - List stores = + List stores = Lists.newArrayList(new MockFeatureStore("type1"), new MockFeatureStore("type2")); Specs specs = Specs.of( @@ -284,7 +284,7 @@ public void testSplitWhereNoStorageSpec() { .build(), specService); assertNull(specs.getError()); - List stores = Collections.emptyList(); + List stores = Collections.emptyList(); SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs); PCollection input = @@ -346,7 +346,7 @@ public void testSplitWhereNoStorageSpecForAFeature() { .build(), specService); assertNull(specs.getError()); - List stores = Collections.emptyList(); + List stores = Collections.emptyList(); SplitOutputByStore split = new SplitOutputByStore(stores, selector, specs); PCollection input = @@ -385,7 +385,7 @@ public void testWriteTags() { TupleTag tag3 = new TupleTag<>("TAG3"); TupleTag mainTag = new TupleTag<>("TAG4"); - Map, Write> transforms = ImmutableMap., Write>builder() + Map, FeatureStoreWrite> transforms = ImmutableMap., FeatureStoreWrite>builder() .put(tag1, new MockTransforms.Write()) .put(tag2, new MockTransforms.Write()) // tag3 and mainTag do not have write transforms. @@ -418,7 +418,7 @@ public void testWriteTags() { // input 4 is is returned in the output because it is the main tag. PAssert.that(output).containsInAnyOrder(rowex1, rowex2, rowex4); - // Each non main tagged input should be written to corresponding Write transform + // Each non main tagged input should be written to corresponding FeatureStoreWrite transform PAssert.that(((MockTransforms.Write) transforms.get(tag1)).getInputs().get(0)) .containsInAnyOrder(rowex1); PAssert.that(((MockTransforms.Write) transforms.get(tag2)).getInputs().get(0)) diff --git a/ingestion/src/test/java/feast/storage/file/FileStoreOptionsTest.java b/ingestion/src/test/java/feast/store/FileStoreOptionsTest.java similarity index 93% rename from ingestion/src/test/java/feast/storage/file/FileStoreOptionsTest.java rename to ingestion/src/test/java/feast/store/FileStoreOptionsTest.java index 4a2be27a70..0d8e06182c 100644 --- a/ingestion/src/test/java/feast/storage/file/FileStoreOptionsTest.java +++ b/ingestion/src/test/java/feast/store/FileStoreOptionsTest.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,10 +15,11 @@ * */ -package feast.storage.file; +package feast.store; import static org.junit.Assert.assertEquals; +import feast.store.FileStoreOptions; import org.joda.time.Duration; import org.junit.Test; import feast.options.OptionsParser; diff --git a/ingestion/src/test/java/feast/storage/MockErrorsStore.java b/ingestion/src/test/java/feast/store/MockFeatureErrorsFactory.java similarity index 67% rename from ingestion/src/test/java/feast/storage/MockErrorsStore.java rename to ingestion/src/test/java/feast/store/MockFeatureErrorsFactory.java index e83ba5641e..49721147a6 100644 --- a/ingestion/src/test/java/feast/storage/MockErrorsStore.java +++ b/ingestion/src/test/java/feast/store/MockFeatureErrorsFactory.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,15 +15,17 @@ * */ -package feast.storage; +package feast.store; import com.google.auto.service.AutoService; +import feast.store.errors.FeatureErrorsFactory; -@AutoService(ErrorsStore.class) -public class MockErrorsStore extends MockFeatureStore implements ErrorsStore { +@AutoService(FeatureErrorsFactory.class) +public class MockFeatureErrorsFactory extends MockFeatureStore implements + FeatureErrorsFactory { public static final String MOCK_ERRORS_STORE_TYPE = "errors.mock"; - public MockErrorsStore() { + public MockFeatureErrorsFactory() { super(MOCK_ERRORS_STORE_TYPE); } } diff --git a/ingestion/src/test/java/feast/storage/MockFeatureStore.java b/ingestion/src/test/java/feast/store/MockFeatureStore.java similarity index 85% rename from ingestion/src/test/java/feast/storage/MockFeatureStore.java rename to ingestion/src/test/java/feast/store/MockFeatureStore.java index 2e957d4de2..cdde9e9740 100644 --- a/ingestion/src/test/java/feast/storage/MockFeatureStore.java +++ b/ingestion/src/test/java/feast/store/MockFeatureStore.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,13 +15,13 @@ * */ -package feast.storage; +package feast.store; import feast.ingestion.model.Specs; import feast.specs.StorageSpecProto.StorageSpec; import lombok.Getter; -public class MockFeatureStore implements FeatureStore { +public class MockFeatureStore implements FeatureStoreFactory { private final String type; @Getter private MockTransforms.Write write; diff --git a/ingestion/src/test/java/feast/storage/MockServingStore.java b/ingestion/src/test/java/feast/store/MockServingFactory.java similarity index 67% rename from ingestion/src/test/java/feast/storage/MockServingStore.java rename to ingestion/src/test/java/feast/store/MockServingFactory.java index 2c60bcbf78..11195c1385 100644 --- a/ingestion/src/test/java/feast/storage/MockServingStore.java +++ b/ingestion/src/test/java/feast/store/MockServingFactory.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,15 +15,17 @@ * */ -package feast.storage; +package feast.store; import com.google.auto.service.AutoService; +import feast.store.serving.FeatureServingFactory; -@AutoService(ServingStore.class) -public class MockServingStore extends MockFeatureStore implements ServingStore { +@AutoService(FeatureServingFactory.class) +public class MockServingFactory extends MockFeatureStore implements + FeatureServingFactory { public static final String MOCK_SERVING_STORE_TYPE = "serving.mock"; - public MockServingStore() { + public MockServingFactory() { super(MOCK_SERVING_STORE_TYPE); } } diff --git a/ingestion/src/test/java/feast/storage/MockTransforms.java b/ingestion/src/test/java/feast/store/MockTransforms.java similarity index 87% rename from ingestion/src/test/java/feast/storage/MockTransforms.java rename to ingestion/src/test/java/feast/store/MockTransforms.java index 1f5a573495..7d241e77ee 100644 --- a/ingestion/src/test/java/feast/storage/MockTransforms.java +++ b/ingestion/src/test/java/feast/store/MockTransforms.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,10 +15,9 @@ * */ -package feast.storage; +package feast.store; import com.google.common.collect.Lists; -import feast.ingestion.transform.FeatureIO; import feast.ingestion.transform.fn.Identity; import feast.specs.StorageSpecProto.StorageSpec; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; @@ -31,7 +30,7 @@ public class MockTransforms { @Getter - public static class Write extends FeatureIO.Write { + public static class Write extends FeatureStoreWrite { List> inputs = Lists.newArrayList(); private StorageSpec spec; diff --git a/ingestion/src/test/java/feast/storage/MockWarehouseStore.java b/ingestion/src/test/java/feast/store/MockWarehouseFactory.java similarity index 67% rename from ingestion/src/test/java/feast/storage/MockWarehouseStore.java rename to ingestion/src/test/java/feast/store/MockWarehouseFactory.java index dbe9ab8caf..ddfd06556a 100644 --- a/ingestion/src/test/java/feast/storage/MockWarehouseStore.java +++ b/ingestion/src/test/java/feast/store/MockWarehouseFactory.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,15 +15,17 @@ * */ -package feast.storage; +package feast.store; import com.google.auto.service.AutoService; +import feast.store.warehouse.FeatureWarehouseFactory; -@AutoService(WarehouseStore.class) -public class MockWarehouseStore extends MockFeatureStore implements WarehouseStore { +@AutoService(FeatureWarehouseFactory.class) +public class MockWarehouseFactory extends MockFeatureStore implements + FeatureWarehouseFactory { public static final String MOCK_WAREHOUSE_STORE_TYPE = "warehouse.mock"; - public MockWarehouseStore() { + public MockWarehouseFactory() { super(MOCK_WAREHOUSE_STORE_TYPE); } } diff --git a/ingestion/src/test/java/feast/storage/file/TextFileFeatureIOTest.java b/ingestion/src/test/java/feast/store/TextFileDynamicIOTest.java similarity index 56% rename from ingestion/src/test/java/feast/storage/file/TextFileFeatureIOTest.java rename to ingestion/src/test/java/feast/store/TextFileDynamicIOTest.java index 84b87ca671..0a52b4bc78 100644 --- a/ingestion/src/test/java/feast/storage/file/TextFileFeatureIOTest.java +++ b/ingestion/src/test/java/feast/store/TextFileDynamicIOTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,9 +15,9 @@ * */ -package feast.storage.file; +package feast.store; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import com.google.common.collect.Lists; @@ -27,43 +27,54 @@ import java.nio.file.Path; import java.util.List; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -import feast.types.FeatureRowProto.FeatureRow; -public class TextFileFeatureIOTest { - @Rule public TemporaryFolder folder = new TemporaryFolder(); +@Slf4j +public class TextFileDynamicIOTest { - @Rule public TestPipeline testPipeline = TestPipeline.create(); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Rule + public TestPipeline testPipeline = TestPipeline.create(); @Test public void testWrite() throws IOException { File path = folder.newFolder(); FileStoreOptions options = new FileStoreOptions(); - options.jobName = testPipeline.getOptions().getJobName(); + options.jobName = "test-job"; options.path = path.getAbsolutePath(); - TextFileFeatureIO.Write write = - new TextFileFeatureIO.Write( - options, (featureRow) -> "output: " + featureRow.getRow().getEntityName(), ".text"); + TextFileDynamicIO.Write write = + new TextFileDynamicIO.Write( + options, ".text"); - PCollection rowExtended = - testPipeline.apply( - Create.of( - FeatureRowExtended.newBuilder() - .setRow(FeatureRow.newBuilder().setEntityName("testEntity")) - .build())); + PCollection> rowExtended = + testPipeline.apply(Create.of( + KV.of("part1", "line1"), + KV.of("part1", "line2"), + KV.of("part2", "line3"), + KV.of("part2", "line4"))); rowExtended.apply(write); testPipeline.run(); - List files = Files.walk(path.toPath()).collect(Collectors.toList()); + List part1 = getAllLines(path.toPath().resolve("test-job/part1")); + List part2 = getAllLines(path.toPath().resolve("test-job/part2")); + assertThat(part1, containsInAnyOrder("line1", "line2")); + assertThat(part2, containsInAnyOrder("line3", "line4")); + } + + List getAllLines(Path path) throws IOException { + List files = Files.walk(path).collect(Collectors.toList()); List lines = Lists.newArrayList(); for (Path file : files) { System.out.println(file); @@ -71,6 +82,6 @@ public void testWrite() throws IOException { lines.addAll(Files.readAllLines(file)); } } - assertThat(lines, equalTo(Lists.newArrayList("output: testEntity"))); + return lines; } } diff --git a/ingestion/src/test/java/feast/storage/bigtable/BigTableFeatureOptionsTest.java b/ingestion/src/test/java/feast/store/serving/bigtable/BigTableFeatureOptionsTest.java similarity index 91% rename from ingestion/src/test/java/feast/storage/bigtable/BigTableFeatureOptionsTest.java rename to ingestion/src/test/java/feast/store/serving/bigtable/BigTableFeatureOptionsTest.java index 8525626583..dfd3b85339 100644 --- a/ingestion/src/test/java/feast/storage/bigtable/BigTableFeatureOptionsTest.java +++ b/ingestion/src/test/java/feast/store/serving/bigtable/BigTableFeatureOptionsTest.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,7 @@ * */ -package feast.storage.bigtable; +package feast.store.serving.bigtable; import org.junit.Assert; import org.junit.Test; diff --git a/ingestion/src/test/java/feast/storage/bigtable/BigTableStoreOptionsTest.java b/ingestion/src/test/java/feast/store/serving/bigtable/BigTableStoreOptionsTest.java similarity index 95% rename from ingestion/src/test/java/feast/storage/bigtable/BigTableStoreOptionsTest.java rename to ingestion/src/test/java/feast/store/serving/bigtable/BigTableStoreOptionsTest.java index a777f50ffd..b9957a2b45 100644 --- a/ingestion/src/test/java/feast/storage/bigtable/BigTableStoreOptionsTest.java +++ b/ingestion/src/test/java/feast/store/serving/bigtable/BigTableStoreOptionsTest.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,7 @@ * */ -package feast.storage.bigtable; +package feast.store.serving.bigtable; import org.junit.Assert; import org.junit.Test; diff --git a/ingestion/src/test/java/feast/storage/redis/FeatureRowRedisIOWriteTest.java b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowRedisIOWriteTest.java similarity index 93% rename from ingestion/src/test/java/feast/storage/redis/FeatureRowRedisIOWriteTest.java rename to ingestion/src/test/java/feast/store/serving/redis/FeatureRowRedisIOWriteTest.java index 591736c1be..3dd1b10112 100644 --- a/ingestion/src/test/java/feast/storage/redis/FeatureRowRedisIOWriteTest.java +++ b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowRedisIOWriteTest.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,10 +15,10 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; -import static feast.storage.redis.FeatureRowToRedisMutationDoFn.getFeatureIdSha1Prefix; -import static feast.storage.redis.FeatureRowToRedisMutationDoFn.getRedisBucketKey; +import static feast.store.serving.redis.FeatureRowToRedisMutationDoFn.getFeatureIdSha1Prefix; +import static feast.store.serving.redis.FeatureRowToRedisMutationDoFn.getRedisBucketKey; import static org.junit.Assert.assertEquals; import com.google.common.io.Resources; @@ -29,7 +29,7 @@ import feast.ingestion.model.Values; import feast.ingestion.service.FileSpecService; import feast.ingestion.service.SpecService; -import feast.ingestion.transform.FeatureIO; +import feast.store.FeatureStoreWrite; import feast.ingestion.util.DateUtil; import feast.specs.ImportSpecProto.Field; import feast.specs.ImportSpecProto.ImportSpec; @@ -110,7 +110,7 @@ public void testWriteNoneGranularity() throws IOException { Specs specs = getSpecs(); specs.validate(); - new RedisServingStore().create(specs.getStorageSpec("REDIS1"), specs); + new RedisServingFactory().create(specs.getStorageSpec("REDIS1"), specs); FeatureRowRedisIO.Write write = new FeatureRowRedisIO.Write( RedisStoreOptions.builder().host("localhost").port(REDIS_PORT).build(), specs); @@ -154,7 +154,7 @@ public void testWriteNoneGranularity() throws IOException { @Test public void testWriteNoneGranularityFromOptions() throws IOException { Specs specs = getSpecs(); - FeatureIO.Write write = new RedisServingStore().create(specs.getStorageSpec("REDIS1"), specs); + FeatureStoreWrite write = new RedisServingFactory().create(specs.getStorageSpec("REDIS1"), specs); FeatureRowExtended rowExtended = FeatureRowExtended.newBuilder() @@ -195,7 +195,7 @@ public void testWriteNoneGranularityFromOptions() throws IOException { @Test public void testWriteHourGranularity() throws IOException { Specs specs = getSpecs(); - FeatureIO.Write write = new RedisServingStore().create(specs.getStorageSpec("REDIS1"), specs); + FeatureStoreWrite write = new RedisServingFactory().create(specs.getStorageSpec("REDIS1"), specs); FeatureRowExtended rowExtended = FeatureRowExtended.newBuilder() diff --git a/ingestion/src/test/java/feast/storage/redis/FeatureRowToRedisMutationDoFnTest.java b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java similarity index 93% rename from ingestion/src/test/java/feast/storage/redis/FeatureRowToRedisMutationDoFnTest.java rename to ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java index 242042fb35..64bc401426 100644 --- a/ingestion/src/test/java/feast/storage/redis/FeatureRowToRedisMutationDoFnTest.java +++ b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,7 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; import feast.ingestion.util.DateUtil; import feast.storage.RedisProto.RedisBucketKey; diff --git a/ingestion/src/test/java/feast/storage/redis/RedisFeatureOptionsTest.java b/ingestion/src/test/java/feast/store/serving/redis/RedisFeatureOptionsTest.java similarity index 95% rename from ingestion/src/test/java/feast/storage/redis/RedisFeatureOptionsTest.java rename to ingestion/src/test/java/feast/store/serving/redis/RedisFeatureOptionsTest.java index 2cbbffb1db..bf49aca612 100644 --- a/ingestion/src/test/java/feast/storage/redis/RedisFeatureOptionsTest.java +++ b/ingestion/src/test/java/feast/store/serving/redis/RedisFeatureOptionsTest.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,7 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; import org.joda.time.Duration; import org.junit.Assert; diff --git a/ingestion/src/test/java/feast/storage/redis/RedisStoreOptionsTest.java b/ingestion/src/test/java/feast/store/serving/redis/RedisStoreOptionsTest.java similarity index 92% rename from ingestion/src/test/java/feast/storage/redis/RedisStoreOptionsTest.java rename to ingestion/src/test/java/feast/store/serving/redis/RedisStoreOptionsTest.java index 87b7fc98fa..335a3b1fed 100644 --- a/ingestion/src/test/java/feast/storage/redis/RedisStoreOptionsTest.java +++ b/ingestion/src/test/java/feast/store/serving/redis/RedisStoreOptionsTest.java @@ -1,11 +1,11 @@ /* - * Copyright 2018 The Feast Authors + * Copyright 2019 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,7 @@ * */ -package feast.storage.redis; +package feast.store.serving.redis; import static org.junit.Assert.assertEquals; diff --git a/ingestion/src/test/java/feast/storage/bigquery/BigQueryStoreOptionsTest.java b/ingestion/src/test/java/feast/store/warehouse/bigquery/BigQueryStoreOptionsTest.java similarity index 97% rename from ingestion/src/test/java/feast/storage/bigquery/BigQueryStoreOptionsTest.java rename to ingestion/src/test/java/feast/store/warehouse/bigquery/BigQueryStoreOptionsTest.java index 92a30cf56b..4563d3e6fb 100644 --- a/ingestion/src/test/java/feast/storage/bigquery/BigQueryStoreOptionsTest.java +++ b/ingestion/src/test/java/feast/store/warehouse/bigquery/BigQueryStoreOptionsTest.java @@ -15,7 +15,7 @@ * */ -package feast.storage.bigquery; +package feast.store.warehouse.bigquery; import org.junit.Assert; import org.junit.Test; diff --git a/ingestion/src/test/java/feast/storage/bigquery/ValueBigQueryBuilderTest.java b/ingestion/src/test/java/feast/store/warehouse/bigquery/ValueBigQueryBuilderTest.java similarity index 98% rename from ingestion/src/test/java/feast/storage/bigquery/ValueBigQueryBuilderTest.java rename to ingestion/src/test/java/feast/store/warehouse/bigquery/ValueBigQueryBuilderTest.java index 4ddc2d8c45..648fa5f151 100644 --- a/ingestion/src/test/java/feast/storage/bigquery/ValueBigQueryBuilderTest.java +++ b/ingestion/src/test/java/feast/store/warehouse/bigquery/ValueBigQueryBuilderTest.java @@ -15,7 +15,7 @@ * */ -package feast.storage.bigquery; +package feast.store.warehouse.bigquery; import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertTrue; diff --git a/ingestion/src/test/java/feast/storage/file/json/JsonFileFeatureIOTest.java b/ingestion/src/test/java/feast/store/warehouse/json/JsonFileFeatureIOTest.java similarity index 59% rename from ingestion/src/test/java/feast/storage/file/json/JsonFileFeatureIOTest.java rename to ingestion/src/test/java/feast/store/warehouse/json/JsonFileFeatureIOTest.java index cdc761b4ce..97e09f9e7f 100644 --- a/ingestion/src/test/java/feast/storage/file/json/JsonFileFeatureIOTest.java +++ b/ingestion/src/test/java/feast/store/warehouse/json/JsonFileFeatureIOTest.java @@ -15,56 +15,78 @@ * */ -package feast.storage.file.json; +package feast.store.warehouse.json; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import com.google.common.collect.Lists; +import feast.store.FileStoreOptions; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import feast.types.FeatureRowProto.FeatureRow; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import feast.storage.file.FileStoreOptions; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -import feast.types.FeatureRowProto.FeatureRow; +@Slf4j public class JsonFileFeatureIOTest { - @Rule public TemporaryFolder folder = new TemporaryFolder(); - @Rule public TestPipeline testPipeline = TestPipeline.create(); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Rule + public TestPipeline testPipeline = TestPipeline.create(); @Test public void testWrite() throws IOException { File path = folder.newFolder(); FileStoreOptions options = new FileStoreOptions(); - options.jobName = testPipeline.getOptions().getJobName(); + options.jobName = "test-job"; options.path = path.getAbsolutePath(); - JsonFileFeatureIO.Write write = - new JsonFileFeatureIO.Write(options, FeatureRowExtended::getRow); + JsonFileWarehouseWrite write = new JsonFileWarehouseWrite(options); PCollection rowExtended = testPipeline.apply( Create.of( FeatureRowExtended.newBuilder() .setRow( - FeatureRow.newBuilder().setEntityName("testEntity").setEntityKey("1234")) + FeatureRow.newBuilder().setEntityName("testEntity1").setEntityKey("1234")) + .build(), + FeatureRowExtended.newBuilder() + .setRow( + FeatureRow.newBuilder().setEntityName("testEntity2").setEntityKey("1234")) .build())); rowExtended.apply(write); testPipeline.run(); - List files = Files.walk(path.toPath()).collect(Collectors.toList()); + for (Path p : Files.walk(path.toPath()).collect(Collectors.toList())) { + if (p.toFile().isFile()) { + log.debug(p.toString()); + } + } + List part1 = getAllLines(path.toPath().resolve("test-job/testEntity1")); + List part2 = getAllLines(path.toPath().resolve("test-job/testEntity2")); + assertThat(part1, + containsInAnyOrder("{\"entityKey\":\"1234\",\"entityName\":\"testEntity1\"}")); + assertThat(part2, + containsInAnyOrder("{\"entityKey\":\"1234\",\"entityName\":\"testEntity2\"}")); + } + + List getAllLines(Path path) throws IOException { + List files = Files.walk(path).collect(Collectors.toList()); List lines = Lists.newArrayList(); for (Path file : files) { System.out.println(file); @@ -72,8 +94,6 @@ public void testWrite() throws IOException { lines.addAll(Files.readAllLines(file)); } } - assertThat( - lines, - equalTo(Lists.newArrayList("{\"entityKey\":\"1234\",\"entityName\":\"testEntity\"}"))); + return lines; } }