diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/HollowIncrementalProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/HollowIncrementalProducer.java index 7269e67ead..4b0e2236f1 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/HollowIncrementalProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/HollowIncrementalProducer.java @@ -28,8 +28,6 @@ */ public class HollowIncrementalProducer { - private static final Object DELETE_RECORD = new Object(); - private final HollowProducer producer; private final ConcurrentHashMap mutations; private final HollowProducer.Populator populator; @@ -64,7 +62,7 @@ public void discard(Object obj) { } public void delete(RecordPrimaryKey key) { - mutations.put(key, DELETE_RECORD); + mutations.put(key, HollowIncrementalCyclePopulator.DELETE_RECORD); } public void discard(RecordPrimaryKey key) { diff --git a/hollow/src/test/java/com/netflix/hollow/api/producer/HollowIncrementalProducerTest.java b/hollow/src/test/java/com/netflix/hollow/api/producer/HollowIncrementalProducerTest.java index 0d7d2c6476..e4aa2fc739 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/producer/HollowIncrementalProducerTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/producer/HollowIncrementalProducerTest.java @@ -181,6 +181,100 @@ public void publishAndLoadASnapshotDirectly() { assertTypeA(idx, 5, "five", null); } + @Test + public void publishDirectlyAndRestore() { + // Producer is created but not initialized. IncrementalProducer will directly initialize the first snapshot + /// add/modify state of a producer with an empty previous state. delete requests for non-existent records will be ignored + HollowIncrementalProducer incrementalProducer = new HollowIncrementalProducer(createInMemoryProducer()); + + incrementalProducer.addOrModify(new TypeA(1, "one", 100)); + incrementalProducer.addOrModify(new TypeA(2, "two", 2)); + incrementalProducer.addOrModify(new TypeA(3, "three", 300)); + incrementalProducer.addOrModify(new TypeA(3, "three", 3)); + incrementalProducer.addOrModify(new TypeA(4, "five", 6)); + incrementalProducer.delete(new TypeA(5, "five", 5)); + + incrementalProducer.delete(new TypeB(2, "2")); + incrementalProducer.addOrModify(new TypeB(4, "four")); + incrementalProducer.addOrModify(new TypeB(5, "6")); + incrementalProducer.addOrModify(new TypeB(5, "5")); + incrementalProducer.delete(new RecordPrimaryKey("TypeB", new Object[] { 4 })); + incrementalProducer.addOrModify(new TypeB(6, "6")); + + /// .runCycle() flushes the changes to a new data state. + long nextVersion = incrementalProducer.runCycle(); + + /// now we read the changes and assert + HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore).build(); + consumer.triggerRefreshTo(nextVersion); + + HollowPrimaryKeyIndex idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeA", "id1", "id2"); + Assert.assertFalse(idx.containsDuplicates()); + + assertTypeA(idx, 1, "one", 100L); + assertTypeA(idx, 2, "two", 2L); + assertTypeA(idx, 3, "three", 3L); + assertTypeA(idx, 4, "five", 6L); + assertTypeA(idx, 5, "five", null); + + idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id"); + Assert.assertFalse(idx.containsDuplicates()); + + // backing producer was never initialized, so only records added to the incremental producer are here + assertTypeB(idx, 1, null); + assertTypeB(idx, 2, null); + assertTypeB(idx, 3, null); + assertTypeB(idx, 4, null); + assertTypeB(idx, 5, "5"); + assertTypeB(idx, 6, "6"); + + // Create NEW incremental producer which will restore from the state left by the previous incremental producer + /// adding a new type this time (TypeB). + HollowProducer backingProducer = createInMemoryProducer(); + backingProducer.initializeDataModel(TypeA.class, TypeB.class); + backingProducer.restore(nextVersion, blobStore); + + HollowIncrementalProducer incrementalProducer2 = new HollowIncrementalProducer(backingProducer); + + incrementalProducer2.delete(new TypeA(1, "one", 100)); + incrementalProducer2.delete(new TypeA(2, "one", 100)); + incrementalProducer2.addOrModify(new TypeA(5, "five", 5)); + + incrementalProducer2.addOrModify(new TypeB(1, "1")); + incrementalProducer2.addOrModify(new TypeB(2, "2")); + incrementalProducer2.addOrModify(new TypeB(3, "3")); + incrementalProducer2.addOrModify(new TypeB(4, "4")); + incrementalProducer2.delete(new TypeB(5, "ignored")); + + /// .runCycle() flushes the changes to a new data state. + long finalVersion = incrementalProducer2.runCycle(); + + /// now we read the changes and assert + consumer = HollowConsumer.withBlobRetriever(blobStore).build(); + consumer.triggerRefreshTo(finalVersion); + + idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeA", "id1", "id2"); + Assert.assertFalse(idx.containsDuplicates()); + + assertTypeA(idx, 1, "one", null); + assertTypeA(idx, 2, "two", 2L); + assertTypeA(idx, 3, "three", 3L); + assertTypeA(idx, 4, "five", 6L); + assertTypeA(idx, 5, "five", 5L); + + idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id"); + Assert.assertFalse(idx.containsDuplicates()); + + // backing producer was never initialized, so only records added to the incremental producer are here + assertTypeB(idx, 1, "1"); + assertTypeB(idx, 2, "2"); + assertTypeB(idx, 3, "3"); + assertTypeB(idx, 4, "4"); + assertTypeB(idx, 5, null); + assertTypeB(idx, 6, "6"); + + } + @Test public void continuesARestoredState() { HollowProducer genesisProducer = createInMemoryProducer();