From e082133a14c48d90274b130fc7a42e655c515979 Mon Sep 17 00:00:00 2001 From: David Yan Date: Mon, 19 Sep 2016 17:17:57 -0700 Subject: [PATCH] APEXMALHAR-2246 #resolve use Slice instead of byte[] in the underlying map of SpillableByteArrayListMultimapImpl --- .../SpillableByteArrayListMultimapImpl.java | 31 ++++++++++++++----- ...pillableByteArrayListMultimapImplTest.java | 8 ++--- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java index 5c91350fa5..c0466bdf4e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java @@ -26,7 +26,7 @@ import javax.annotation.Nullable; import javax.validation.constraints.NotNull; -import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; import org.apache.apex.malhar.lib.utils.serde.SliceUtils; @@ -60,7 +60,7 @@ public class SpillableByteArrayListMultimapImpl implements Spillable.Spill private int batchSize = DEFAULT_BATCH_SIZE; @NotNull - private SpillableByteMapImpl map; + private SpillableByteMapImpl map; private SpillableStateStore store; private byte[] identifier; private long bucket; @@ -91,7 +91,7 @@ public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] iden this.serdeKey = Preconditions.checkNotNull(serdeKey); this.serdeValue = Preconditions.checkNotNull(serdeValue); - map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice()); } public SpillableStateStore getStore() @@ -111,7 +111,7 @@ private SpillableArrayListImpl getHelper(@Nullable K key) if (spillableArrayList == null) { Slice keySlice = serdeKey.serialize(key); - Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray()); + Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX)); if (size == null) { return null; @@ -166,6 +166,7 @@ public void clear() @Override public int size() { + // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys return map.size(); } @@ -179,7 +180,7 @@ public boolean isEmpty() public boolean containsKey(@Nullable Object key) { return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), - SIZE_KEY_SUFFIX).toByteArray()); + SIZE_KEY_SUFFIX)); } @Override @@ -191,7 +192,23 @@ public boolean containsValue(@Nullable Object value) @Override public boolean containsEntry(@Nullable Object key, @Nullable Object value) { - throw new UnsupportedOperationException(); + SpillableArrayListImpl spillableArrayList = getHelper((K)key); + if (spillableArrayList == null) { + return false; + } + for (int i = 0; i < spillableArrayList.size(); i++) { + V v = spillableArrayList.get(i); + if (v == null) { + if (value == null) { + return true; + } + } else { + if (v.equals(value)) { + return true; + } + } + } + return false; } @Override @@ -275,7 +292,7 @@ public void endWindow() SpillableArrayListImpl spillableArrayList = cache.get(key); spillableArrayList.endWindow(); - Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX).toByteArray(), + Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX), spillableArrayList.size()); } diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java index 81063b8db2..2c9d7eb29f 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java @@ -239,7 +239,6 @@ public long simpleMultiKeyTestHelper(SpillableStateStore store, map.endWindow(); store.endWindow(); - store.beforeCheckpoint(nextWindowId); return nextWindowId; } @@ -258,12 +257,13 @@ public void recoveryTestWithManagedState() long nextWindowId = 0L; nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); long activationWindow = nextWindowId; - nextWindowId++; - + store.beforeCheckpoint(nextWindowId); SpillableByteArrayListMultimapImpl clonedMap = KryoCloneUtils.cloneObject(map); store.checkpointed(nextWindowId); store.committed(nextWindowId); + nextWindowId++; + store.beginWindow(nextWindowId); map.beginWindow(nextWindowId); @@ -319,7 +319,7 @@ public void recoveryTestWithManagedState() store.setup(context); map.setup(context); - + nextWindowId = activationWindow + 1; store.beginWindow(nextWindowId); map.beginWindow(nextWindowId);