Skip to content

Commit

Permalink
allow empty string to select a NoOp write transform (#30)
Browse files Browse the repository at this point in the history
* allow empty string to select a NoOp write transform

* reformat to google styles
  • Loading branch information
tims authored and feast-ci-bot committed Jan 8, 2019
1 parent 7d4eb26 commit 11114c5
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import feast.ingestion.exceptions.ErrorsHandler;
import feast.ingestion.model.Specs;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.SplitFeatures.MultiOutputSplit;
import feast.ingestion.values.PFeatureRows;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.storage.FeatureStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.Attempt;
import feast.types.FeatureRowExtendedProto.Error;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -35,29 +47,19 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import feast.ingestion.exceptions.ErrorsHandler;
import feast.ingestion.model.Specs;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.SplitFeatures.MultiOutputSplit;
import feast.ingestion.values.PFeatureRows;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.storage.FeatureStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.Attempt;
import feast.types.FeatureRowExtendedProto.Error;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;

@AllArgsConstructor
@Slf4j
public class SplitOutputByStore extends PTransform<PFeatureRows, PFeatureRows> {

private Collection<? extends FeatureStore> stores;
private SerializableFunction<FeatureSpec, String> selector;
private Specs specs;

@Override
public PFeatureRows expand(PFeatureRows input) {
Map<String, Write> transforms = getFeatureStoreTransforms();
transforms.put("", new NoOpIO.Write());
Set<String> keys = transforms.keySet();
Preconditions.checkArgument(transforms.size() > 0, "no write transforms found");

Expand Down Expand Up @@ -102,6 +104,7 @@ private Map<String, Write> getFeatureStoreTransforms() {

@AllArgsConstructor
public static class WriteTags extends PTransform<PCollectionTuple, PFeatureRows> {

private Map<TupleTag<FeatureRowExtended>, Write> transforms;
private TupleTag<FeatureRowExtended> mainTag;

Expand All @@ -120,9 +123,7 @@ public PFeatureRows expand(PCollectionTuple input) {
}

String message =
"FeatureRow with output tag.no matching storage, these feature's "
+ "specs may be specifying a store which was unknown when "
+ "ingestion started as they somehow passed validation. ";
"FeatureRows have no matching write transform, these rows should not have passed validation.";
PCollection<FeatureRowExtended> errors =
input.get(mainTag).apply(ParDo.of(new WithErrors(getName(), message)));

Expand All @@ -131,8 +132,11 @@ public PFeatureRows expand(PCollectionTuple input) {
}
}

/** Sets the last attempt error for all rows with a given exception */
/**
* Sets the last attempt error for all rows with a given exception
*/
public static class WithErrors extends DoFn<FeatureRowExtended, FeatureRowExtended> {

private Error error;

public WithErrors(Error error) {
Expand Down
11 changes: 6 additions & 5 deletions ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

@Slf4j
public class ImportJobCSVTest {

@Rule public TemporaryFolder folder = new TemporaryFolder();
@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Rule public TestPipeline testPipeline = TestPipeline.create();
@Rule
public TestPipeline testPipeline = TestPipeline.create();

public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) throws IOException {
return importSpec.toBuilder().putOptions("path", dataFile).build();
Expand Down Expand Up @@ -123,7 +124,7 @@ public void testImportCSV() throws IOException {

PCollection<FeatureRowExtended> writtenToWarehouse =
PCollectionList.of(
WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs())
WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs())
.apply("flatten warehouse input", Flatten.pCollections());

PCollection<FeatureRowExtended> writtenToErrors =
Expand Down Expand Up @@ -188,7 +189,7 @@ public void testImportCSVUnknownServingStoreError() throws IOException {
+ " fields:\n"
+ " - name: id\n"
+ " - featureId: testEntity.none.redisInt32\n" // Redis is not available by
// default from the json specs
// default from the json specs
+ " - featureId: testEntity.none.testString\n"
+ "\n",
ImportSpec.getDefaultInstance());
Expand Down
Loading

0 comments on commit 11114c5

Please sign in to comment.