diff --git a/ingestion/src/main/java/feast/ingestion/transform/FilterOutdatedFeatureRow.java b/ingestion/src/main/java/feast/ingestion/transform/FilterOutdatedFeatureRow.java new file mode 100644 index 00000000000..bf5d3663528 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/FilterOutdatedFeatureRow.java @@ -0,0 +1,88 @@ +package feast.ingestion.transform; + +import com.google.auto.value.AutoValue; +import feast.storage.RedisProto; +import feast.types.FeatureRowProto; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.Duration; + +@AutoValue +abstract public class FilterOutdatedFeatureRow + extends PTransform, PCollection> { + + public abstract Duration getStateExpiryDuration(); + + public static FilterOutdatedFeatureRow.Builder newBuilder() { + return new AutoValue_FilterOutdatedFeatureRow.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setStateExpiryDuration(Duration stateExpiryDuration); + public abstract FilterOutdatedFeatureRow build(); + } + + private RedisProto.RedisKey getKey(FeatureRowProto.FeatureRow featureRow) { + RedisProto.RedisKey.Builder builder = RedisProto.RedisKey.newBuilder().setFeatureSet(featureRow.getFeatureSet()); + featureRow.getFieldsList().forEach(builder::addEntities); + return builder.build(); + } + + public static class FilterDoFn extends DoFn, FeatureRowProto.FeatureRow> { + private static final String LAST_UPDATED = "last_updated"; + private static final String EXPIRY_TIMER = "state_expiry_timer"; + + private final Duration stateExpiryDuration; + + FilterDoFn(Duration stateExpiryDuration) { + this.stateExpiryDuration = stateExpiryDuration; + } + + @StateId(LAST_UPDATED) + private final StateSpec> lastUpdatedStateSpec = StateSpecs.value(VarLongCoder.of()); + + @TimerId(EXPIRY_TIMER) + private final TimerSpec stateExpiryTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + + @OnTimer(EXPIRY_TIMER) + public void onExpiry(OnTimerContext context, @StateId("last_updated") ValueState lastUpdatedState) { + lastUpdatedState.clear(); + } + + @ProcessElement + public void processElement(ProcessContext context, + @StateId("last_updated") ValueState lastUpdatedState, + @TimerId("state_expiry_timer") Timer stateExpiryTimer) { + Long lastUpdatedTimestamp = lastUpdatedState.read(); + Long currentTimestamp = context.element().getValue().getEventTimestamp().getSeconds(); + if(lastUpdatedTimestamp == null || currentTimestamp > lastUpdatedTimestamp) { + lastUpdatedState.write(currentTimestamp); + context.output(context.element().getValue()); + stateExpiryTimer.offset(stateExpiryDuration).setRelative(); + } + } + } + + @Override + public PCollection expand(PCollection featureRowCollection) { + return featureRowCollection + .apply( + MapElements.into( + TypeDescriptors.kvs(TypeDescriptor.of(RedisProto.RedisKey.class), + TypeDescriptor.of(FeatureRowProto.FeatureRow.class)) + ).via((FeatureRowProto.FeatureRow featureRow) -> KV.of(getKey(featureRow), featureRow))) + .apply(ParDo.of(new FilterDoFn(getStateExpiryDuration()))); + } + + +} diff --git a/ingestion/src/test/java/feast/ingestion/transform/FilterOutdatedFeatureRowTest.java b/ingestion/src/test/java/feast/ingestion/transform/FilterOutdatedFeatureRowTest.java new file mode 100644 index 00000000000..6f3bfd4bc42 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/FilterOutdatedFeatureRowTest.java @@ -0,0 +1,56 @@ +package feast.ingestion.transform; + +import com.google.protobuf.Timestamp; +import feast.types.FeatureRowProto; +import feast.types.FieldProto; +import feast.types.ValueProto; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; + +public class FilterOutdatedFeatureRowTest { + + @Rule + public transient TestPipeline p = TestPipeline.create(); + + private FeatureRowProto.FeatureRow newFeatureRow(String featureSet, String fieldName, Integer value, Long secondsSinceEpoch) { + return FeatureRowProto.FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setSeconds(secondsSinceEpoch).build()) + .setFeatureSet(featureSet) + .addFields( + FieldProto.Field.newBuilder() + .setName(fieldName) + .setValue(ValueProto.Value.newBuilder().setInt32Val(value).build())) + .build(); + } + + @Test + public void shouldFilterOutdatedFeatureRow() { + Duration expiryTime = Duration.standardSeconds(120); + + FeatureRowProto.FeatureRow feature1Recent = newFeatureRow("fs1", "fn", 1, 90L); + FeatureRowProto.FeatureRow feature2Recent = newFeatureRow("fs2", "fn", 1, 80L); + FeatureRowProto.FeatureRow feature1Outdated = newFeatureRow("fs1", "fn", 1, 80L); + FeatureRowProto.FeatureRow feature1ResentAfterExpiry = newFeatureRow("fs1", "fn", 1, 85L); + + TestStream featureRowTestStream = TestStream.create(ProtoCoder.of(FeatureRowProto.FeatureRow.class)) + .advanceWatermarkTo(new Instant(0L)) + .addElements(feature1Recent, feature2Recent, feature1Outdated) + .advanceWatermarkTo(new Instant(0L).plus(expiryTime.plus(1))) + .addElements(feature1ResentAfterExpiry) + .advanceWatermarkToInfinity(); + + PCollection filtered = p.apply(featureRowTestStream) + .apply(FilterOutdatedFeatureRow.newBuilder().setStateExpiryDuration(expiryTime).build()); + PAssert.that(filtered).containsInAnyOrder(feature1Recent, feature2Recent, feature1ResentAfterExpiry); + p.run(); + + } + +}