From d951f3f95bd5a7598e07d7f0485704404a57a459 Mon Sep 17 00:00:00 2001 From: David Yan Date: Thu, 8 Sep 2016 13:59:50 -0700 Subject: [PATCH] APEXMALHAR-2130 #resolve Added a spillable map that takes two keys with support of iterating through all entries with a given first key --- .../malhar/lib/state/spillable/Spillable.java | 73 +++- .../spillable/SpillableArrayListImpl.java | 5 +- .../state/spillable/SpillableByteMapImpl.java | 4 +- .../spillable/SpillableTwoKeyByteMapImpl.java | 192 ++++++++ ...stSlice.java => SerdeCollectionSlice.java} | 49 ++- .../lib/utils/serde/SerdePairSlice.java | 91 ++++ .../state/spillable/SpillableTestUtils.java | 5 +- .../SpillableTwoKeyByteMapImplTest.java | 409 ++++++++++++++++++ .../utils/serde/SerdeCollectionSliceTest.java | 65 +++ ...SliceTest.java => SerdePairSliceTest.java} | 17 +- 10 files changed, 871 insertions(+), 39 deletions(-) create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableTwoKeyByteMapImpl.java rename library/src/main/java/org/apache/apex/malhar/lib/utils/serde/{SerdeListSlice.java => SerdeCollectionSlice.java} (66%) create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java create mode 100644 library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTwoKeyByteMapImplTest.java create mode 100644 library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java rename library/src/test/java/org/apache/apex/malhar/lib/utils/serde/{SerdeListSliceTest.java => SerdePairSliceTest.java} (67%) diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java index 4c9b997887..b77317a35c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java @@ -18,10 +18,13 @@ */ package org.apache.apex.malhar.lib.state.spillable; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Queue; +import org.apache.commons.lang3.tuple.Pair; + import com.google.common.collect.ListMultimap; import com.google.common.collect.Multiset; @@ -37,7 +40,7 @@ public interface Spillable { /** * This represents a spillable {@link java.util.List}. The underlying implementation - * of this list is similar to that of an {@link java.util.ArrayList}. User's that receive an + * of this list is similar to that of an {@link java.util.ArrayList}. Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. * @param The type of the data stored in the {@link SpillableArrayList}. @@ -49,7 +52,7 @@ interface SpillableArrayList extends List /** * This represents a spillable {@link java.util.Map}. Implementations make * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is - * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an + * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. * @param The type of the keys. @@ -62,7 +65,7 @@ interface SpillableByteMap extends Map /** * This represents a spillable {@link com.google.common.collect.ListMultimap} implementation. Implementations make * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is - * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an + * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. * @param The type of the keys. @@ -75,7 +78,7 @@ interface SpillableByteArrayListMultimap extends ListMultimap /** * This represents a spillable {@link com.google.common.collect.Multiset} implementation. Implementations make * some assumptions about serialization and equality. Consider two elements T1 and T2. The assumption is - * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). User's that receive an + * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs to the data structure. */ interface SpillableByteMultiset extends Multiset @@ -83,7 +86,7 @@ interface SpillableByteMultiset extends Multiset } /** - * This represents a spillable {@link java.util.Queue} implementation. User's that receive an + * This represents a spillable {@link java.util.Queue} implementation. Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. * @param The type of the data stored in the queue. @@ -92,6 +95,66 @@ interface SpillableQueue extends Queue { } + /** + * This represents a spillable map implementation that has two keys. This is similar to a + * {@link java.util.Map}> with limited functionality + * + * @param the type of the first key + * @param the type of the second key + * @param the type of the value + */ + interface SpillableTwoKeyByteMap + { + /** + * Returns the value of the entry with the given first and second keys + * + * @param key1 the first key + * @param key2 the second key + * @return the value of the entry + */ + V get(K1 key1, K2 key2); + + /** + * Puts an element with the given first and second key and the given value. + * It overwrites any existing entry with the given first and second keys. + * + * @param key1 the first key + * @param key2 the second key + * @param value the value + */ + void put(K1 key1, K2 key2, V value); + + /** + * Removes all elements that have the given first key + * + * @param key1 the first key + */ + void remove(K1 key1); + + /** + * Removes the element that has the given first key and the given second key + * + * @param key1 the first key + * @param key2 the second key + */ + void remove(K1 key1, K2 key2); + + /** + * Returns the number of elements in the map + * + * @return the number of elements + */ + long size(); + + /** + * Returns the iterator over all key value pairs given the first key + * + * @param key1 the first key + * @return the iterator over all key value pairs that have the given first key + */ + Iterator> iterator(K1 key1); + } + /** * This represents a spillable data structure that needs to be aware of the operator * callbacks. All concrete or abstract implementations of spillable data structures diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java index da5b14046d..b5656c2932 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java @@ -18,6 +18,7 @@ */ package org.apache.apex.malhar.lib.state.spillable; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -27,7 +28,7 @@ import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice; import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.kryo.DefaultSerializer; @@ -92,7 +93,7 @@ public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, this.store = Preconditions.checkNotNull(store); this.serde = Preconditions.checkNotNull(serde); - map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeListSlice(serde)); + map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeCollectionSlice(serde, ArrayList.class)); } /** diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java index f36f2dc930..e5b5fc6572 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java @@ -72,13 +72,13 @@ private SpillableByteMapImpl() } /** - * Creats a {@link SpillableByteMapImpl}. + * Creates a {@link SpillableByteMapImpl}. * @param store The {@link SpillableStateStore} in which to spill to. * @param identifier The Id of this {@link SpillableByteMapImpl}. * @param bucket The Id of the bucket used to store this * {@link SpillableByteMapImpl} in the provided {@link SpillableStateStore}. * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. - * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + * @param serdeValue The {@link Serde} to use when serializing and deserializing values. */ public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde serdeKey, Serde serdeValue) diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableTwoKeyByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableTwoKeyByteMapImpl.java new file mode 100644 index 0000000000..a0c8625b30 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableTwoKeyByteMapImpl.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * An implementation of {@link SpillableTwoKeyByteMap} + * @param The type of the first key + * @param The type of the second key + * @param The type of values. + */ +public class SpillableTwoKeyByteMapImpl implements Spillable.SpillableTwoKeyByteMap, Spillable.SpillableComponent, + Serializable +{ + private SpillableStateStore store; + private SpillableByteMapImpl, V> map; + private SpillableByteMapImpl> keyMap; + + private SpillableTwoKeyByteMapImpl() + { + // for kryo + } + + /** + * Creates a {@link SpillableByteMapImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param identifier The Id of this {@link SpillableByteMapImpl}. + * @param bucket The Id of the bucket used to store this + * {@link SpillableByteMapImpl} in the provided {@link SpillableStateStore}. + * @param serdeKey1 The {@link Serde} to use when serializing and deserializing the first key. + * @param serdeKey2 The {@link Serde} to use when serializing and deserializing the second key. + * @param serdeValue The {@link Serde} to use when serializing and deserializing values. + */ + public SpillableTwoKeyByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey1, Serde serdeKey2, Serde serdeValue) + { + this.store = store; + map = new SpillableByteMapImpl<>(store, identifier, bucket, new SerdePairSlice<>(serdeKey1, serdeKey2), serdeValue); + keyMap = new SpillableByteMapImpl<>(store, identifier, bucket, serdeKey1, new SerdeCollectionSlice>(serdeKey2, (Class>)(Class)TreeSet.class)); + } + + public SpillableStateStore getStore() + { + return this.store; + } + + @Override + public V get(K1 key1, K2 key2) + { + return map.get(new ImmutablePair<>(key1, key2)); + } + + @Override + public void put(K1 key1, K2 key2, V value) + { + map.put(new ImmutablePair<>(key1, key2), value); + Set keys = keyMap.get(key1); + if (keys == null) { + keys = new TreeSet<>(); + } + keys.add(key2); + keyMap.put(key1, keys); + } + + @Override + public void remove(K1 key1) + { + Set keys = keyMap.get(key1); + if (keys != null) { + for (K2 key2 : keys) { + map.remove(new ImmutablePair<>(key1, key2)); + } + } + keyMap.remove(key1); + } + + @Override + public void remove(K1 key1, K2 key2) + { + Set keys = keyMap.get(key1); + if (keys != null) { + map.remove(new ImmutablePair<>(key1, key2)); + keys.remove(key2); + if (keys.isEmpty()) { + keyMap.remove(key1); + } else { + keyMap.put(key1, keys); + } + } + } + + @Override + public long size() + { + return map.size(); + } + + @Override + public Iterator> iterator(final K1 key1) + { + final Set keys2 = keyMap.get(key1); + + return new Iterator>() + { + private Iterator listIterator = keys2 == null ? null : keys2.iterator(); + private K2 lastKey2; + + @Override + public boolean hasNext() + { + return listIterator != null && listIterator.hasNext(); + } + + @Override + public Map.Entry next() + { + if (listIterator == null) { + throw new NoSuchElementException(); + } else { + lastKey2 = listIterator.next(); + return new AbstractMap.SimpleEntry<>(lastKey2, map.get(new ImmutablePair<>(key1, lastKey2))); + } + } + + @Override + public void remove() + { + SpillableTwoKeyByteMapImpl.this.remove(key1, lastKey2); + } + }; + } + + @Override + public void setup(Context.OperatorContext context) + { + map.setup(context); + keyMap.setup(context); + } + + @Override + public void teardown() + { + keyMap.teardown(); + map.teardown(); + } + + @Override + public void beginWindow(long windowId) + { + map.beginWindow(windowId); + keyMap.beginWindow(windowId); + } + + @Override + public void endWindow() + { + keyMap.endWindow(); + map.endWindow(); + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java similarity index 66% rename from library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java rename to library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java index 68d11c8191..5e3d2b771c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java @@ -18,6 +18,7 @@ */ package org.apache.apex.malhar.lib.utils.serde; +import java.util.Collection; import java.util.List; import javax.validation.constraints.NotNull; @@ -26,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.datatorrent.lib.appdata.gpo.GPOUtils; @@ -37,35 +39,40 @@ * @since 3.5.0 */ @InterfaceStability.Evolving -public class SerdeListSlice implements Serde, Slice> +public class SerdeCollectionSlice> implements Serde { @NotNull private Serde serde; - private SerdeListSlice() + @NotNull + private Class collectionClass; + + private SerdeCollectionSlice() { // for Kryo } /** - * Creates a {@link SerdeListSlice}. + * Creates a {@link SerdeCollectionSlice}. * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list. */ - public SerdeListSlice(@NotNull Serde serde) + public SerdeCollectionSlice(@NotNull Serde serde, @NotNull Class collectionClass) { this.serde = Preconditions.checkNotNull(serde); + this.collectionClass = Preconditions.checkNotNull(collectionClass); } @Override - public Slice serialize(List objects) + public Slice serialize(CollectionT objects) { Slice[] slices = new Slice[objects.size()]; int size = 4; - for (int index = 0; index < objects.size(); index++) { - Slice slice = serde.serialize(objects.get(index)); - slices[index] = slice; + int index = 0; + for (T object : objects) { + Slice slice = serde.serialize(object); + slices[index++] = slice; size += slice.length; } @@ -76,7 +83,7 @@ public Slice serialize(List objects) System.arraycopy(sizeBytes, 0, bytes, offset, 4); offset += 4; - for (int index = 0; index < slices.length; index++) { + for (index = 0; index < slices.length; index++) { Slice slice = slices[index]; System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length); offset += slice.length; @@ -86,25 +93,29 @@ public Slice serialize(List objects) } @Override - public List deserialize(Slice slice, MutableInt offset) + public CollectionT deserialize(Slice slice, MutableInt offset) { MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue()); int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset); - List list = Lists.newArrayListWithCapacity(numElements); sliceOffset.subtract(slice.offset); - - for (int index = 0; index < numElements; index++) { - T object = serde.deserialize(slice, sliceOffset); - list.add(object); + try { + CollectionT collection = collectionClass.newInstance(); + + for (int index = 0; index < numElements; index++) { + T object = serde.deserialize(slice, sliceOffset); + collection.add(object); + } + + offset.setValue(sliceOffset.intValue()); + return collection; + } catch (Exception ex) { + throw Throwables.propagate(ex); } - - offset.setValue(sliceOffset.intValue()); - return list; } @Override - public List deserialize(Slice slice) + public CollectionT deserialize(Slice slice) { return deserialize(slice, new MutableInt(0)); } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java new file mode 100644 index 0000000000..60f230f9c0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of {@link Serde} which serializes and deserializes pairs. + */ +@InterfaceStability.Evolving +public class SerdePairSlice implements Serde, Slice> +{ + @NotNull + private Serde serde1; + @NotNull + private Serde serde2; + + private SerdePairSlice() + { + // for Kryo + } + + /** + * Creates a {@link SerdePairSlice}. + * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair + * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair + */ + public SerdePairSlice(@NotNull Serde serde1, @NotNull Serde serde2) + { + this.serde1 = Preconditions.checkNotNull(serde1); + this.serde2 = Preconditions.checkNotNull(serde2); + } + + @Override + public Slice serialize(Pair pair) + { + int size = 0; + + Slice slice1 = serde1.serialize(pair.getLeft()); + size += slice1.length; + Slice slice2 = serde2.serialize(pair.getRight()); + size += slice2.length; + + byte[] bytes = new byte[size]; + System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length); + System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length); + + return new Slice(bytes); + } + + @Override + public Pair deserialize(Slice slice, MutableInt offset) + { + T1 first = serde1.deserialize(slice, offset); + T2 second = serde2.deserialize(slice, offset); + return new ImmutablePair<>(first, second); + } + + @Override + public Pair deserialize(Slice slice) + { + return deserialize(slice, new MutableInt(0)); + } +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java index 00ea58d2e5..4f87bbd5a4 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java @@ -18,6 +18,7 @@ */ package org.apache.apex.malhar.lib.state.spillable; +import java.util.ArrayList; import java.util.List; import org.junit.Assert; @@ -27,7 +28,7 @@ import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils; import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice; import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; import org.apache.apex.malhar.lib.utils.serde.SliceUtils; import org.apache.commons.lang3.mutable.MutableInt; @@ -44,7 +45,7 @@ public class SpillableTestUtils { public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice(); - public static SerdeListSlice SERDE_STRING_LIST_SLICE = new SerdeListSlice(new SerdeStringSlice()); + public static SerdeCollectionSlice> SERDE_STRING_LIST_SLICE = new SerdeCollectionSlice(new SerdeStringSlice(), ArrayList.class); private SpillableTestUtils() { diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTwoKeyByteMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTwoKeyByteMapImplTest.java new file mode 100644 index 0000000000..66297db782 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTwoKeyByteMapImplTest.java @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; + +public class SpillableTwoKeyByteMapImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + public static final byte[] ID2 = new byte[]{(byte)1}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleGetAndPutTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleGetAndPutTestHelper(store); + } + + @Test + public void simpleGetAndPutManagedStateTest() + { + simpleGetAndPutTestHelper(testMeta.store); + } + + private void simpleGetAndPutTestHelper(SpillableStateStore store) + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableTwoKeyByteMapImpl map = new SpillableTwoKeyByteMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice(), + new SerdeIntSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(0, map.size()); + + map.put("I", "a", 1); + map.put("II", "b", 2); + map.put("II", "c", 3); + + Assert.assertEquals(3, map.size()); + + Assert.assertEquals(1, map.get("I", "a").intValue()); + Assert.assertEquals(2, map.get("II", "b").intValue()); + Assert.assertEquals(3, map.get("II", "c").intValue()); + Assert.assertEquals(null, map.get("II", "d")); + Assert.assertEquals(null, map.get("III", "d")); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(3, map.size()); + + Assert.assertEquals(1, map.get("I", "a").intValue()); + Assert.assertEquals(2, map.get("II", "b").intValue()); + Assert.assertEquals(3, map.get("II", "c").intValue()); + Assert.assertEquals(null, map.get("III", "d")); + + map.put("III", "d", 4); + map.put("III", "e", 5); + map.put("III", "f", 6); + + Assert.assertEquals(6, map.size()); + + Assert.assertEquals(4, map.get("III", "d").intValue()); + Assert.assertEquals(5, map.get("III", "e").intValue()); + Assert.assertEquals(6, map.get("III", "f").intValue()); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + map.teardown(); + store.teardown(); + } + + @Test + public void simpleRemoveTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleRemoveTestHelper(store); + } + + @Test + public void simpleRemoveManagedStateTest() + { + simpleRemoveTestHelper(testMeta.store); + } + + private void simpleRemoveTestHelper(SpillableStateStore store) + { + SpillableTwoKeyByteMapImpl map = new SpillableTwoKeyByteMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice(), + new SerdeIntSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(0, map.size()); + + map.put("I", "a", 1); + map.put("II", "b", 2); + map.put("II", "c", 3); + + Assert.assertEquals(3, map.size()); + + map.remove("II"); + + Assert.assertEquals(1, map.get("I", "a").intValue()); + Assert.assertEquals(null, map.get("II", "b")); + Assert.assertEquals(null, map.get("II", "c")); + Assert.assertEquals(null, map.get("III", "d")); + + Assert.assertEquals(1, map.size()); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(1, map.size()); + + Assert.assertEquals(1, map.get("I", "a").intValue()); + Assert.assertEquals(null, map.get("II", "b")); + Assert.assertEquals(null, map.get("II", "c")); + Assert.assertEquals(null, map.get("III", "d")); + + map.put("III", "d", 4); + map.put("III", "e", 5); + map.put("III", "f", 6); + + Assert.assertEquals(4, map.size()); + + Assert.assertEquals(4, map.get("III", "d").intValue()); + Assert.assertEquals(5, map.get("III", "e").intValue()); + Assert.assertEquals(6, map.get("III", "f").intValue()); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + map.remove("I", "a"); + map.remove("III", "d"); + Assert.assertEquals(null, map.get("I", "a")); + Assert.assertEquals(null, map.get("II", "b")); + Assert.assertEquals(null, map.get("II", "c")); + Assert.assertEquals(null, map.get("III", "d")); + Assert.assertEquals(5, map.get("III", "e").intValue()); + Assert.assertEquals(6, map.get("III", "f").intValue()); + Assert.assertEquals(null, map.get("III", "g")); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + map.teardown(); + store.teardown(); + } + + @Test + public void multiMapPerBucketTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + multiMapPerBucketTestHelper(store); + } + + @Test + public void multiMapPerBucketManagedStateTest() + { + multiMapPerBucketTestHelper(testMeta.store); + } + + public void multiMapPerBucketTestHelper(SpillableStateStore store) + { + SpillableTwoKeyByteMapImpl map1 = new SpillableTwoKeyByteMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice(), + new SerdeIntSlice()); + SpillableTwoKeyByteMapImpl map2 = new SpillableTwoKeyByteMapImpl<>(store, ID2, 0L, + new SerdeStringSlice(), + new SerdeStringSlice(), + new SerdeIntSlice()); + + store.setup(testMeta.operatorContext); + map1.setup(testMeta.operatorContext); + map2.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + map1.put("I", "a", 1); + + Assert.assertEquals(1, map1.get("I", "a").intValue()); + Assert.assertEquals(null, map2.get("I", "a")); + + map2.put("I", "a", 5); + + Assert.assertEquals(1, map1.get("I", "a").intValue()); + Assert.assertEquals(5, map2.get("I", "a").intValue()); + + map1.put("II", "b", 2); + map2.put("II", "c", 3); + + Assert.assertEquals(1, map1.get("I", "a").intValue()); + Assert.assertEquals(2, map1.get("II", "b").intValue()); + + Assert.assertEquals(5, map2.get("I", "a").intValue()); + Assert.assertEquals(null, map2.get("II", "b")); + Assert.assertEquals(3, map2.get("II", "c").intValue()); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + windowId++; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + map1.remove("I", "a"); + + Assert.assertEquals(null, map1.get("I", "a")); + Assert.assertEquals(5, map2.get("I", "a").intValue()); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + windowId++; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + map1.teardown(); + map2.teardown(); + store.teardown(); + } + + @Test + public void recoveryWithManagedStateTest() throws Exception + { + SpillableTwoKeyByteMapImpl map1 = new SpillableTwoKeyByteMapImpl<>(testMeta.store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice(), + new SerdeIntSlice()); + + testMeta.store.setup(testMeta.operatorContext); + map1.setup(testMeta.operatorContext); + + testMeta.store.beginWindow(0); + map1.beginWindow(0); + map1.put("I", "x", 1); + map1.put("I", "y", 2); + map1.put("II", "z", 3); + map1.put("III", "zz", 33); + Assert.assertEquals(4, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + + testMeta.store.beginWindow(1); + map1.beginWindow(1); + Assert.assertEquals(4, map1.size()); + map1.put("I", "x", 4); + map1.put("I", "y", 5); + map1.remove("III", "zz"); + Assert.assertEquals(3, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + testMeta.store.beforeCheckpoint(1); + testMeta.store.checkpointed(1); + + SpillableTwoKeyByteMapImpl clonedMap1 = KryoCloneUtils.cloneObject(map1); + + testMeta.store.beginWindow(2); + map1.beginWindow(2); + Assert.assertEquals(3, map1.size()); + map1.put("I", "x", 6); + map1.put("I", "y", 7); + map1.put("III", "w", 8); + Assert.assertEquals(4, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + + // simulating crash here + map1.teardown(); + testMeta.store.teardown(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + + map1 = clonedMap1; + map1.getStore().setup(context); + map1.setup(testMeta.operatorContext); + + map1.getStore().beginWindow(2); + map1.beginWindow(2); + Assert.assertEquals(3, map1.size()); + Assert.assertEquals(4, map1.get("I", "x").intValue()); + Assert.assertEquals(5, map1.get("I", "y").intValue()); + Assert.assertEquals(3, map1.get("II", "z").intValue()); + map1.endWindow(); + map1.getStore().endWindow(); + + map1.teardown(); + } + +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java new file mode 100644 index 0000000000..f6085f67f1 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import com.datatorrent.netlet.util.Slice; + +public class SerdeCollectionSliceTest +{ + @Test + public void testSerdeList() + { + SerdeCollectionSlice> serdeList = + new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class>)(Class)ArrayList.class); + + List stringList = Lists.newArrayList("a", "b", "c"); + + Slice slice = serdeList.serialize(stringList); + + List deserializedList = serdeList.deserialize(slice); + + Assert.assertEquals(stringList, deserializedList); + } + + @Test + public void testSerdeSet() + { + SerdeCollectionSlice> serdeSet = + new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class>)(Class)HashSet.class); + + Set stringList = Sets.newHashSet("a", "b", "c"); + + Slice slice = serdeSet.serialize(stringList); + + Set deserializedSet = serdeSet.deserialize(slice); + + Assert.assertEquals(stringList, deserializedSet); + } +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java similarity index 67% rename from library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java rename to library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java index f7753d2b92..6684a9f5bf 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java @@ -18,28 +18,27 @@ */ package org.apache.apex.malhar.lib.utils.serde; -import java.util.List; - import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import com.datatorrent.netlet.util.Slice; -public class SerdeListSliceTest +public class SerdePairSliceTest { @Test public void simpleSerdeTest() { - SerdeListSlice serdeList = new SerdeListSlice(new SerdeStringSlice()); + SerdePairSlice serdePair = new SerdePairSlice<>(new SerdeStringSlice(), new SerdeIntSlice()); - List stringList = Lists.newArrayList("a", "b", "c"); + Pair pair = new ImmutablePair<>("abc", 123); - Slice slice = serdeList.serialize(stringList); + Slice slice = serdePair.serialize(pair); - List deserializedList = serdeList.deserialize(slice); + Pair deserializedPair = serdePair.deserialize(slice); - Assert.assertEquals(stringList, deserializedList); + Assert.assertEquals(pair, deserializedPair); } }