From 4edbec89e4b6855e2f4a7517ca5538df0ad2a35e Mon Sep 17 00:00:00 2001 From: brightchen Date: Mon, 28 Nov 2016 16:17:15 -0800 Subject: [PATCH] APEXMALHAR-2350 #resolve #comment The key and value stream should match with the bucket --- .../managed/ManagedTimeUnifiedStateImpl.java | 2 +- .../lib/state/spillable/SpillableMapImpl.java | 30 ++++- .../lib/utils/serde/KeyValueSerdeManager.java | 24 +++- .../state/spillable/SpillableMapImplTest.java | 103 ++++++++++++++++++ 4 files changed, 151 insertions(+), 8 deletions(-) diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java index 62ebbc5284..82d381ccf7 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java @@ -216,7 +216,7 @@ protected void deleteBucket(long bucketId) throws IOException @Override protected void addBucketName(long bucketId) { - long operatorId = (long)managedStateContext.getOperatorContext().getId(); + long operatorId = managedStateContext.getOperatorContext().getId(); if (!bucketNamesOnFS.contains(operatorId)) { bucketNamesOnFS.add(operatorId); } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java index e7071a2a4a..56a3b0e948 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java @@ -26,6 +26,7 @@ import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; import org.apache.apex.malhar.lib.utils.serde.BufferSlice; @@ -150,7 +151,7 @@ public V get(Object o) return val; } - Slice valSlice = store.getSync(getBucket(key), keyValueSerdeManager.serializeDataKey(key, false)); + Slice valSlice = store.getSync(getBucketTimeOrId(key), keyValueSerdeManager.serializeDataKey(key, false)); if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { return null; @@ -236,13 +237,29 @@ public void beginWindow(long windowId) @Override public void endWindow() { + boolean isTimeUnifiedStore = (store instanceof ManagedTimeUnifiedStateImpl); for (K key: cache.getChangedKeys()) { - store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), + //the getBucket() returned in fact is time, the bucket assign then assigned the bucketId + long timeOrBucketId = bucket; + long bucketId = timeOrBucketId; + if (isTimeUnifiedStore) { + timeOrBucketId = getBucketTimeOrId(key); + bucketId = ((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId); + } + keyValueSerdeManager.updateBuffersForBucketChange(bucketId); + store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, true), keyValueSerdeManager.serializeValue(cache.get(key))); } for (K key: cache.getRemovedKeys()) { - store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE); + long timeOrBucketId = bucket; + long bucketId = timeOrBucketId; + if (isTimeUnifiedStore) { + timeOrBucketId = getBucketTimeOrId(key); + bucketId = ((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId); + } + keyValueSerdeManager.updateBuffersForBucketChange(bucketId); + store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE); } cache.endWindow(); keyValueSerdeManager.resetReadBuffer(); @@ -253,7 +270,12 @@ public void teardown() { } - private long getBucket(K key) + /** + * + * @param key + * @return The bucket time for time unified store or bucket id for store with fixed bucket + */ + private long getBucketTimeOrId(K key) { return timeExtractor != null ? timeExtractor.getTime(key) : bucket; } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java index e74c7a347c..405683b980 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java @@ -28,6 +28,7 @@ */ public class KeyValueSerdeManager { + public static final long INVALID_BUCKET_ID = -1; protected Serde keySerde; protected Serde valueSerde; @@ -36,6 +37,8 @@ public class KeyValueSerdeManager protected SerializationBuffer valueBuffer; + private long lastBucketId = INVALID_BUCKET_ID; + private transient BucketProvider bucketProvider; protected KeyValueSerdeManager() { @@ -50,13 +53,28 @@ public KeyValueSerdeManager(Serde keySerde, Serde valueSerde) public void setup(BucketProvider bp, long bucketId) { - //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize - Bucket bucketInst = bp.ensureBucket(bucketId); - this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream()); + bucketProvider = bp; + updateBuffersForBucketChange(bucketId); + } + /** + * The bucket can be changed. The write buffer should also be changed if bucket changed. + * @param bucketId + */ + public void updateBuffersForBucketChange(long bucketId) + { + if (lastBucketId == bucketId) { + return; + } + + Bucket bucketInst = bucketProvider.ensureBucket(bucketId); + this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream()); keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream()); + + lastBucketId = bucketId; } + public Slice serializeKey(K key, boolean write) { SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead; diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java index 760bc5cf78..ce51a03b49 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java @@ -25,8 +25,13 @@ import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; import org.apache.apex.malhar.lib.utils.serde.StringSerde; +import com.google.common.base.Preconditions; + import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; @@ -452,4 +457,102 @@ public void recoveryWithManagedStateTest(String opt) throws Exception map1.teardown(); } + + + protected static class SerdeManagerForTest extends AffixKeyValueSerdeManager + { + public SerdeManagerForTest(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde keySerde, Serde valueSerde) + { + super(metaKeySuffix, dataKeyIdentifier, keySerde, valueSerde); + } + + public SerializationBuffer getValueBuffer() + { + return valueBuffer; + } + + public SerializationBuffer getKeyBufferForWrite() + { + return keyBufferForWrite; + } + } + + protected static class SpillableMapImplForTest extends SpillableMapImpl + { + protected SerdeManagerForTest serdeManager; + + public SpillableMapImplForTest(SpillableStateStore store, byte[] identifier, long bucket, Serde serdeKey, + Serde serdeValue) + { + super(store, identifier, bucket, serdeKey, serdeValue); + serdeManager = new SerdeManagerForTest<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue)); + keyValueSerdeManager = serdeManager; + } + + public SpillableMapImplForTest(SpillableStateStore store, byte[] identifier, Serde serdeKey, + Serde serdeValue, TimeExtractor timeExtractor) + { + super(store, identifier, serdeKey, serdeValue, timeExtractor); + serdeManager = new SerdeManagerForTest<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue)); + keyValueSerdeManager = serdeManager; + } + } + + @Test + @Parameters({"TimeUnifiedManagedState"}) + public void serializationBufferTest(String opt) + { + SerializationBuffer keyBuffer = null; + SerializationBuffer valueBuffer = null; + SerializationBuffer currentBuffer; + + setup(opt); + SpillableMapImplForTest map; + if (te == null) { + map = new SpillableMapImplForTest<>(store,ID1,0L,new StringSerde(), new StringSerde()); + } else { + map = new SpillableMapImplForTest<>(store,ID1,new StringSerde(), new StringSerde(), te); + } + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map.beginWindow(windowId); + + map.put("a", "1"); + + map.endWindow(); + store.endWindow(); + + currentBuffer = map.serdeManager.getKeyBufferForWrite(); + Assert.assertTrue(currentBuffer != keyBuffer); + keyBuffer = currentBuffer; + + currentBuffer = map.serdeManager.getValueBuffer(); + Assert.assertTrue(currentBuffer != valueBuffer); + valueBuffer = currentBuffer; + + ++windowId; + store.beginWindow(windowId); + map.beginWindow(windowId); + + //each put use different key to make sure use the different bucket + map.put("b", "2"); + + map.endWindow(); + store.endWindow(); + + currentBuffer = map.serdeManager.getKeyBufferForWrite(); + Assert.assertTrue(currentBuffer != keyBuffer); + keyBuffer = currentBuffer; + + currentBuffer = map.serdeManager.getValueBuffer(); + Assert.assertTrue(currentBuffer != valueBuffer); + valueBuffer = currentBuffer; + + map.teardown(); + store.teardown(); + } }