From 825a09460442992bd98eb026e1ef11ce148de5ea Mon Sep 17 00:00:00 2001 From: brightchen Date: Mon, 15 Aug 2016 17:46:27 -0700 Subject: [PATCH 1/5] APEXMALHAR-2190 #resolve #comment Use reusable buffer to serial spillable data structure --- .../spillable/SpillableArrayListImpl.java | 41 +++- .../SpillableByteArrayListMultimapImpl.java | 18 +- .../apex/malhar/lib/utils/serde/Block.java | 170 +++++++++++++++ .../malhar/lib/utils/serde/BlocksStream.java | 160 ++++++++++++++ .../malhar/lib/utils/serde/ByteStream.java | 39 ++++ .../malhar/lib/utils/serde/FixedBlock.java | 97 +++++++++ .../lib/utils/serde/LengthValueBuffer.java | 197 ++++++++++++++++++ .../utils/serde/ResetableWindowListener.java | 13 ++ .../malhar/lib/utils/serde/SerToLVBuffer.java | 10 + .../utils/serde/SerdeArrayWithLVBuffer.java | 137 ++++++++++++ .../serde/SerdeCollectionWithLVBuffer.java | 164 +++++++++++++++ .../lib/utils/serde/SerdeListSlice.java | 4 +- .../serde/SerdeListSliceWithLVBuffer.java | 66 ++++++ .../lib/utils/serde/SerdeStringSlice.java | 11 +- .../utils/serde/SerdeStringWithLVBuffer.java | 74 +++++++ .../utils/serde/WindowableBlocksStream.java | 133 ++++++++++++ .../lib/utils/serde/WindowableByteStream.java | 5 + .../lib/utils/serde/BufferStreamTester.java | 150 +++++++++++++ .../lib/utils/serde/SerialToLVBufferTest.java | 140 +++++++++++++ 19 files changed, 1611 insertions(+), 18 deletions(-) create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ByteStream.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/FixedBlock.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ResetableWindowListener.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToLVBuffer.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithLVBuffer.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithLVBuffer.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithLVBuffer.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableBlocksStream.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableByteStream.java create mode 100644 library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BufferStreamTester.java create mode 100644 library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerialToLVBufferTest.java diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java index 5d469066bf..a648e4349c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java @@ -25,9 +25,11 @@ import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.utils.serde.LengthValueBuffer; +import org.apache.apex.malhar.lib.utils.serde.SerToLVBuffer; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeListSliceWithLVBuffer; import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.kryo.DefaultSerializer; @@ -63,6 +65,9 @@ public class SpillableArrayListImpl implements Spillable.SpillableArrayList valueSerde; + protected transient LengthValueBuffer buffer; + private SpillableArrayListImpl() { //for kryo @@ -85,12 +90,7 @@ public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, @NotNull SpillableStateStore store, @NotNull Serde serde) { - this.bucketId = bucketId; - this.prefix = Preconditions.checkNotNull(prefix); - this.store = Preconditions.checkNotNull(store); - this.serde = Preconditions.checkNotNull(serde); - - map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeListSlice(serde)); + this(bucketId, prefix, store, serde, DEFAULT_BATCH_SIZE); } /** @@ -110,10 +110,35 @@ public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, @NotNull Serde serde, int batchSize) { - this(bucketId, prefix, store, serde); + this(bucketId, prefix, store, serde, DEFAULT_BATCH_SIZE, new LengthValueBuffer()); + } + + public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, + @NotNull SpillableStateStore store, + @NotNull Serde serde, + @NotNull LengthValueBuffer buffer) + { + this(bucketId, prefix, store, serde, DEFAULT_BATCH_SIZE, buffer); + } + + public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, @NotNull SpillableStateStore store, + @NotNull Serde serde, int batchSize, @NotNull LengthValueBuffer buffer) + { + this.bucketId = bucketId; + this.prefix = Preconditions.checkNotNull(prefix); + this.store = Preconditions.checkNotNull(store); + this.serde = Preconditions.checkNotNull(serde); + + if (!(serde instanceof SerToLVBuffer)) { + throw new IllegalArgumentException("Invalid serde, expect instanceof SerToLVBuffer"); + } + valueSerde = new SerdeListSliceWithLVBuffer((SerToLVBuffer)serde, buffer); + Preconditions.checkArgument(this.batchSize > 0); this.batchSize = batchSize; + + map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), valueSerde); } public void setSize(int size) diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java index ba0bb773c6..12cba6bb9f 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.utils.serde.LengthValueBuffer; import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; @@ -65,6 +66,8 @@ public class SpillableByteArrayListMultimapImpl implements Spillable.Spill private Serde serdeKey; private Serde serdeValue; + protected transient LengthValueBuffer buffer; + private SpillableByteArrayListMultimapImpl() { // for kryo @@ -82,13 +85,22 @@ private SpillableByteArrayListMultimapImpl() public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde serdeKey, Serde serdeValue) + { + this(store, identifier, bucket, serdeKey, serdeValue, new LengthValueBuffer()); + } + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue, + LengthValueBuffer buffer) { this.store = Preconditions.checkNotNull(store); this.identifier = Preconditions.checkNotNull(identifier); this.bucket = bucket; this.serdeKey = Preconditions.checkNotNull(serdeKey); this.serdeValue = Preconditions.checkNotNull(serdeValue); - + this.buffer = Preconditions.checkNotNull(buffer); + map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); } @@ -116,7 +128,7 @@ private SpillableArrayListImpl getHelper(@Nullable K key) } Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice); - spillableArrayList = new SpillableArrayListImpl(bucket, keyPrefix.toByteArray(), store, serdeValue); + spillableArrayList = new SpillableArrayListImpl(bucket, keyPrefix.toByteArray(), store, serdeValue, buffer); spillableArrayList.setSize(size); } @@ -199,7 +211,7 @@ public boolean put(@Nullable K key, @Nullable V value) if (spillableArrayList == null) { Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key)); - spillableArrayList = new SpillableArrayListImpl(bucket, keyPrefix.toByteArray(), store, serdeValue); + spillableArrayList = new SpillableArrayListImpl(bucket, keyPrefix.toByteArray(), store, serdeValue, buffer); cache.put(key, spillableArrayList); } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java new file mode 100644 index 0000000000..d9a46e9569 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import com.datatorrent.netlet.util.Slice; + +/** + * + * keep the information of one block + * + */ +public class Block implements ByteStream +{ + public static final transient Logger logger = LoggerFactory.getLogger(Block.class); + + public static final int DEFAULT_BLOCK_SIZE = 1000000; + + //the capacity of the block + protected int capacity; + + //the size of the data. + protected int size; + + protected int objectBeginOffset = 0; + protected byte[] buffer; + + protected List slices = Lists.newArrayList(); + + public Block() + { + this(DEFAULT_BLOCK_SIZE); + } + + public Block(int capacity) + { + if (capacity <= 0) { + throw new IllegalArgumentException("Invalid capacity: " + capacity); + } + buffer = new byte[capacity]; + this.capacity = capacity; + } + + /** + * check the buffer size and reallocate if buffer is not enough + * + * @param length + */ + protected void checkOrReallocateBuffer(int length) + { + if (size + length <= capacity) { + return; + } + + //calculate the new capacity + capacity = (size + length) * 2; + + logger.info("Going to assign buffer size: {}", capacity); + + byte[] oldBuffer = buffer; + buffer = new byte[capacity]; + + //NOTES: it's not a good idea to move the data after expose the slices. + //but if move the data, also need to change the exposed slices( we suppose client code will not cache the buffer reference + if (size > 0) { + System.arraycopy(oldBuffer, 0, buffer, 0, size); + + for (Slice slice : slices) { + slice.buffer = buffer; + } + } + } + + public void write(byte[] data) + { + write(data, 0, data.length); + } + + public void write(byte[] data, final int offset, final int length) + { + checkOrReallocateBuffer(length); + + System.arraycopy(data, offset, buffer, size, length); + size += length; + } + + /** + * the process of write an object should be: write(), write() ... write(), + * when write object done( before write another object), call toSlice(); + * + * @return + */ + public Slice toSlice() + { + if (size == objectBeginOffset) { + throw new RuntimeException("data size is zero."); + } + Slice slice = new Slice(buffer, objectBeginOffset, size - objectBeginOffset); + slices.add(slice); + //prepare for next object + objectBeginOffset = size; + return slice; + } + + public void reset() + { + size = 0; + slices.clear(); + objectBeginOffset = 0; + } + + /** + * check if has enough space for the length + * + * @param length + * @return + */ + public boolean hasEnoughSpace(int length) + { + return size + length < capacity; + } + + public int size() + { + return size; + } + + public boolean isFresh() + { + return (size == 0 && objectBeginOffset == 0 && slices.isEmpty() ); + } + + /** + * is the block clear. The written object should have output + * @return + */ + public boolean isClear() + { + return objectBeginOffset == size; + } + + @Override + public void release() + { + reset(); + buffer = null; + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java new file mode 100644 index 0000000000..f25ca6b4e8 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +import com.datatorrent.netlet.util.Slice; + +/** + * A stream implemented by an array of block ( or a map from index to block ) + * BlockBuffer can reallocate the memory and copy the data if buffer is not + * enough But it is not the good solution if there already have slices output. + * BlockStream try to avoid copy the data which already output slices + * + */ +public class BlocksStream implements ByteStream +{ + private static final Logger logger = LoggerFactory.getLogger(BlocksStream.class); + + public static final int DEFAULT_BLOCK_CAPACITY = 1000000; + + //the initial capacity of each block + protected final int blockCapacity; + + protected Map blocks = Maps.newHashMap(); + //the index of current block, valid block index should >= 0 + protected int currentBlockIndex = 0; + protected int size = 0; + + protected FixedBlock currentBlock; + + public BlocksStream() + { + this(DEFAULT_BLOCK_CAPACITY); + } + + public BlocksStream(int blockCapacity) + { + this.blockCapacity = blockCapacity; + } + + @Override + public void write(byte[] data) + { + write(data, 0, data.length); + } + + /** + * This write could be the first or the continuous write for an object. For t + * + * @param data + * @param offset + * @param length + */ + @Override + public void write(byte[] data, final int offset, final int length) + { + //start with a block which at least can hold current data + currentBlock = getOrCreateCurrentBlock(); + try { + currentBlock.write(data, offset, length); + } catch (FixedBlock.OutOfBlockBufferMemoryException e) { + //use next block + FixedBlock previousBlock = null; + + previousBlock = moveToNextBlock(); + if(!currentBlock.isFresh()) { + throw new RuntimeException("New block is not flash."); + } + if(!previousBlock.isClear()) { + previousBlock.moveLastObjectDataTo(currentBlock); + } + currentBlock.write(data, offset, length); + } + size += length; + } + + /** + * + * @return The previous block + */ + protected FixedBlock moveToNextBlock() + { + FixedBlock previousBlock = currentBlock; + + ++currentBlockIndex; + currentBlock = getOrCreateCurrentBlock(); + if (!currentBlock.isFresh()) { + throw new RuntimeException("Assigned non flash block."); + } + return previousBlock; + } + + protected FixedBlock getOrCreateCurrentBlock() + { + FixedBlock block = blocks.get(currentBlockIndex); + if (block == null) { + block = new FixedBlock(blockCapacity); + blocks.put(currentBlockIndex, block); + } + return block; + } + + @Override + public int size() + { + return size; + } + + /** + * + * this is the last call which represent the end of an object + */ + @Override + public Slice toSlice() + { + return blocks.get(currentBlockIndex).toSlice(); + } + + /** + * Don't need to maintain original buffer now. + */ + @Override + public void reset() + { + currentBlockIndex = 0; + size = 0; + for (FixedBlock block : blocks.values()) { + block.reset(); + } + } + + @Override + public void release() + { + reset(); + blocks.clear(); + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ByteStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ByteStream.java new file mode 100644 index 0000000000..2984167985 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ByteStream.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import com.datatorrent.netlet.util.Slice; + +public interface ByteStream +{ + void write(byte[] data); + + void write(byte[] data, final int offset, final int length); + + int size(); + + Slice toSlice(); + + void reset(); + + /** + * release allocated resource + */ + void release(); +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/FixedBlock.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/FixedBlock.java new file mode 100644 index 0000000000..55b0e2d439 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/FixedBlock.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.netlet.util.Slice; + +/** + * This implementation will not move the memory if already output slices. + * The memory could be moved if there hadn't any output slices. + * + */ +public class FixedBlock extends Block +{ + private static final Logger logger = LoggerFactory.getLogger(FixedBlock.class); + + public static class OutOfBlockBufferMemoryException extends RuntimeException + { + private static final long serialVersionUID = 3813792889200989131L; + } + + public FixedBlock() + { + super(); + } + + public FixedBlock(int capacity) + { + super(capacity); + } + + @Override + public void write(byte[] data, final int offset, final int length) throws OutOfBlockBufferMemoryException + { + super.write(data, offset, length); + } + + /** + * check the buffer size and reallocate if buffer is not enough + * + * @param length + */ + @Override + protected void checkOrReallocateBuffer(int length) throws OutOfBlockBufferMemoryException + { + if (size + length > capacity && slices.size() > 0) { + throw new OutOfBlockBufferMemoryException(); + } + + super.checkOrReallocateBuffer(length); + } + + /** + * Similar as toSlice, this method is used to get the information of the + * object regards the data already write to buffer. But unlike toSlice() which + * indicate all data of this object already done, this method can be called at + * any time + */ + public Slice getLastObjectSlice() + { + return new Slice(buffer, objectBeginOffset, size - objectBeginOffset); + } + + public void discardLastObjectData() + { + if (objectBeginOffset == 0) { + return; + } + size = objectBeginOffset; + } + + public void moveLastObjectDataTo(FixedBlock newBuffer) + { + if(size > objectBeginOffset) { + newBuffer.write(buffer, objectBeginOffset, size - objectBeginOffset); + discardLastObjectData(); + } + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java new file mode 100644 index 0000000000..62878f63fb --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Map; + +import org.apache.apex.malhar.lib.state.spillable.WindowListener; +import org.apache.commons.lang3.mutable.MutableInt; + +import com.google.common.collect.Maps; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * This class keep the object with length value format.try to get rid of memory slice and memory data copy Basically used by memory + * serialize + * + */ +public class LengthValueBuffer implements ResetableWindowListener +{ + protected WindowableByteStream windowableByteStream; + protected Map placeHolderIdentifierToValue = Maps.newHashMap(); + + public LengthValueBuffer() + { + windowableByteStream = createWindowableByteStream(); + } + + public LengthValueBuffer(int capacity) + { + windowableByteStream = createWindowableByteStream(capacity); + } + + protected final transient byte[] tmpLengthAsBytes = new byte[4]; + protected final transient MutableInt tmpOffset = new MutableInt(0); + + public void setObjectLength(int length) + { + try { + GPOUtils.serializeInt(length, tmpLengthAsBytes, new MutableInt(0)); + windowableByteStream.write(tmpLengthAsBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * only set value. + * + * @param value + * @param offset + * @param length + */ + public void setObjectValue(byte[] value, int offset, int length) + { + windowableByteStream.write(value, offset, length); + } + + /** + * set value and length. the input value is value only, it doesn't include + * length information. + * + * @param value + * @param offset + * @param length + */ + public void setObjectWithValue(byte[] value, int offset, int length) + { + setObjectLength(length); + setObjectValue(value, offset, length); + } + + public void setObjectWithValue(byte[] value) + { + setObjectWithValue(value, 0, value.length); + } + + /** + * mark place hold for length. In some case, we don't know the length until + * really processed data. mark place holder for set length later. + * + * @return the identity for this placeholder + */ + protected static final byte[] lengthPlaceHolder = new byte[]{0, 0, 0, 0}; + + public int markPlaceHolderForLength() + { + try { + int offset = windowableByteStream.size(); + windowableByteStream.write(lengthPlaceHolder); + return offset; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public int getSize() + { + return windowableByteStream.size(); + } + + /** + * + * @param placeHolderId + * @param length + */ + public void setValueForLengthPlaceHolder(int placeHolderId, int length) + { + //don't convert to byte array now. just keep the information + placeHolderIdentifierToValue.put(placeHolderId, length); + } + + /** + * This method should be called only the whole object has been written + * @return The slice which represents the object + */ + public Slice toSlice() + { + Slice slice = windowableByteStream.toSlice(); + + if (placeHolderIdentifierToValue != null && !placeHolderIdentifierToValue.isEmpty()) { + MutableInt offset = new MutableInt(); + for (Map.Entry entry : placeHolderIdentifierToValue.entrySet()) { + offset.setValue(slice.offset + entry.getKey()); + GPOUtils.serializeInt(entry.getValue(), slice.buffer, offset); + } + } + + return slice; + } + + + /** + * reset the environment to reuse the resource. + */ + public void reset() + { + windowableByteStream.reset(); + placeHolderIdentifierToValue.clear(); + } + + + @Override + public void beginWindow(long windowId) + { + windowableByteStream.beginWindow(windowId); + } + + @Override + public void endWindow() + { + windowableByteStream.endWindow(); + } + + /** + * reset for all windows which window id less or equal input windowId + * this interface doesn't enforce to call reset window for each windows. Several windows can be reset at the same time. + * @param windowId + */ + public void resetUpToWindow(long windowId) + { + windowableByteStream.resetUpToWindow(windowId); + } + + public void release() + { + reset(); + windowableByteStream.release(); + } + + public WindowableByteStream createWindowableByteStream() + { + return new WindowableBlocksStream(); + } + + public WindowableByteStream createWindowableByteStream(int capacity) + { + return new WindowableBlocksStream(capacity); + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ResetableWindowListener.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ResetableWindowListener.java new file mode 100644 index 0000000000..e51bdcabe0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ResetableWindowListener.java @@ -0,0 +1,13 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.apex.malhar.lib.state.spillable.WindowListener; + +public interface ResetableWindowListener extends WindowListener +{ + /** + * reset for all windows which window id less or equal input windowId + * this interface doesn't enforce to call reset window for each windows. Several windows can be reset at the same time. + * @param windowId + */ + void resetUpToWindow(long windowId); +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToLVBuffer.java new file mode 100644 index 0000000000..6823bc9d37 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToLVBuffer.java @@ -0,0 +1,10 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import com.datatorrent.netlet.util.Slice; + +public interface SerToLVBuffer extends Serde +{ + void serTo(T object, LengthValueBuffer buffer); + + void reset(); +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithLVBuffer.java new file mode 100644 index 0000000000..f7cb8f9df7 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithLVBuffer.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +public class SerdeArrayWithLVBuffer implements SerToLVBuffer +{ + protected Class clazz; + protected LengthValueBuffer buffer; + protected SerToLVBuffer itemSerde; + + protected SerdeArrayWithLVBuffer() + { + } + + public SerdeArrayWithLVBuffer(Class clazz) + { + this.clazz = clazz; + } + + public SerdeArrayWithLVBuffer(Class clazz, LengthValueBuffer buffer) + { + this.clazz = clazz; + this.buffer = buffer; + } + + public SerdeArrayWithLVBuffer(SerToLVBuffer itemSerde, LengthValueBuffer buffer) + { + this.itemSerde = itemSerde; + this.buffer = buffer; + } + + @Override + public Slice serialize(T[] objects) + { + if (buffer == null) { + buffer = new LengthValueBuffer(); + } + serTo(objects, buffer); + return buffer.toSlice(); + } + + @Override + public void serTo(T[] objects, LengthValueBuffer buffer) + { + if (objects.length == 0) { + return; + } + buffer.setObjectLength(objects.length); + SerToLVBuffer serializer = getItemSerToLVBuffer(); + for (T object : objects) { + serializer.serTo(object, buffer); + } + } + + @SuppressWarnings("unchecked") + protected SerToLVBuffer getItemSerToLVBuffer() + { + if (itemSerde != null) { + return itemSerde; + } + + if (String.class.equals(clazz)) { + itemSerde = (SerToLVBuffer)new SerdeStringWithLVBuffer(); + return itemSerde; + } + + throw new UnsupportedOperationException(); + } + + @Override + public T[] deserialize(Slice slice, MutableInt sliceOffset) + { + int numOfElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset); + if (numOfElements <= 0) { + throw new IllegalArgumentException( + "The length of the array is less than or equal to zero. length: " + numOfElements); + } + + T[] array = createObjectArray(numOfElements); + + for (int index = 0; index < numOfElements; ++index) { + array[index] = getItemSerToLVBuffer().deserialize(slice, sliceOffset); + } + return array; + } + + @SuppressWarnings("unchecked") + protected T[] createObjectArray(int length) + { + if (String.class == clazz) { + return (T[])new String[length]; + } + + throw new IllegalArgumentException("Unknow class information: " + clazz); + } + + @Override + public T[] deserialize(Slice slice) + { + return deserialize(slice, new MutableInt(slice.offset)); + } + + @Override + public void reset() + { + if (buffer != null) { + buffer.reset(); + } + } + + public void setItemClass(Class clazz) + { + this.clazz = clazz; + } + +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithLVBuffer.java new file mode 100644 index 0000000000..b85d4283e0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithLVBuffer.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Collection; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +public class SerdeCollectionWithLVBuffer> implements SerToLVBuffer +{ + protected Class itemClass; + protected LengthValueBuffer buffer; + protected SerToLVBuffer itemSerde; + protected Class collectionClass; + + protected SerdeCollectionWithLVBuffer() + { + } + + public SerdeCollectionWithLVBuffer(Class clazz) + { + this.itemClass = clazz; + } + + public SerdeCollectionWithLVBuffer(Class itemClass, LengthValueBuffer buffer) + { + this.itemClass = itemClass; + this.buffer = buffer; + } + + public SerdeCollectionWithLVBuffer(SerToLVBuffer itemSerde, LengthValueBuffer buffer) + { + this.itemSerde = itemSerde; + this.buffer = buffer; + } + + @Override + public Slice serialize(C objects) + { + if (buffer == null) { + buffer = new LengthValueBuffer(); + } + serTo(objects, buffer); + return buffer.toSlice(); + } + + @Override + public void serTo(C objects, LengthValueBuffer buffer) + { + if (objects.size() == 0) { + return; + } + buffer.setObjectLength(objects.size()); + SerToLVBuffer serializer = getItemSerToLVBuffer(); + for (T object : objects) { + serializer.serTo(object, buffer); + } + } + + @SuppressWarnings("unchecked") + protected SerToLVBuffer getItemSerToLVBuffer() + { + if (itemSerde != null) { + return itemSerde; + } + + if (String.class.equals(itemClass)) { + itemSerde = (SerToLVBuffer)new SerdeStringWithLVBuffer(); + return itemSerde; + } + + throw new UnsupportedOperationException(); + } + + @Override + public C deserialize(Slice slice, MutableInt sliceOffset) + { + int numOfElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset); + if (numOfElements <= 0) { + throw new IllegalArgumentException( + "The length of the array is less than or equal to zero. length: " + numOfElements); + } + + C collection = createObjectCollection(numOfElements); + + for (int index = 0; index < numOfElements; ++index) { + collection.add(getItemSerToLVBuffer().deserialize(slice, sliceOffset)); + } + return collection; + } + + protected C createObjectCollection(int length) + { + if (collectionClass == null) { + throw new IllegalArgumentException("NO collection class information."); + } + + try { + return collectionClass.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("Can't instancial collection class: " + collectionClass.getName()); + } + } + + @Override + public C deserialize(Slice slice) + { + return deserialize(slice, new MutableInt(slice.offset)); + } + + @Override + public void reset() + { + if (buffer != null) { + buffer.reset(); + } + } + + public void setItemClass(Class clazz) + { + this.itemClass = clazz; + } + + public Class getCollectionClass() + { + return collectionClass; + } + + /** + * The class of the output collection. It should be C or it's sub-class. + * Due to the type erasure of runtime, the caller probably can't get the class with type info at runtime + * So, do runtime check instead of compile time check + * + * @param collectionClass + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void setCollectionClass(Class collectionClass) + { + if (collectionClass.isInterface()) { + throw new IllegalArgumentException("collectionClass should be a class instead of interface."); + } + + this.collectionClass = (Class)collectionClass; + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java index 2a7947d2c6..130835f6a9 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java @@ -38,9 +38,9 @@ public class SerdeListSlice implements Serde, Slice> { @NotNull - private Serde serde; + protected Serde serde; - private SerdeListSlice() + protected SerdeListSlice() { // for Kryo } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java new file mode 100644 index 0000000000..9fc7dc7146 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.List; + +import javax.validation.constraints.NotNull; + +import com.google.common.base.Preconditions; + +import com.datatorrent.netlet.util.Slice; + +public class SerdeListSliceWithLVBuffer extends SerdeListSlice implements SerToLVBuffer> +{ + protected SerToLVBuffer itemSerTo; + protected LengthValueBuffer buffer; + + protected SerdeListSliceWithLVBuffer() + { + // for Kryo + } + + public SerdeListSliceWithLVBuffer(@NotNull SerToLVBuffer serde, LengthValueBuffer buffer) + { + this.itemSerTo = Preconditions.checkNotNull(serde); + this.buffer = Preconditions.checkNotNull(buffer); + } + + @Override + public Slice serialize(List objects) + { + serTo(objects, buffer); + return buffer.toSlice(); + } + + @Override + public void serTo(List objects, LengthValueBuffer buffer) + { + buffer.setObjectLength(objects.size()); + for (T object : objects) { + itemSerTo.serTo(object, buffer);; + } + } + + public void reset() + { + buffer.reset(); + } + +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java index 80ee597a0c..6478088eca 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java @@ -36,18 +36,19 @@ public Slice serialize(String object) return new Slice(GPOUtils.serializeString(object)); } + /** + * The slice could be a buffer of multiple objects, + * We should not assume deserialize whole slice, the offset indicates where to start. + */ @Override public String deserialize(Slice object, MutableInt offset) { - offset.add(object.offset); - String string = GPOUtils.deserializeString(object.buffer, offset); - offset.subtract(object.offset); - return string; + return GPOUtils.deserializeString(object.buffer, offset); } @Override public String deserialize(Slice object) { - return deserialize(object, new MutableInt(0)); + return deserialize(object, new MutableInt(object.offset)); } } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithLVBuffer.java new file mode 100644 index 0000000000..43dc0f9fa5 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithLVBuffer.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import com.google.common.base.Preconditions; + +import com.datatorrent.netlet.util.Slice; + +public class SerdeStringWithLVBuffer extends SerdeStringSlice implements SerToLVBuffer +{ + //implement with shared buff + protected LengthValueBuffer buffer; + + /** + * if don't use SerdeStringWithLVBuffer.serialize(String), can ignore LVBuffer + */ + public SerdeStringWithLVBuffer() + { + } + + public SerdeStringWithLVBuffer(LengthValueBuffer buffer) + { + this.buffer = Preconditions.checkNotNull(buffer); + } + + @Override + public Slice serialize(String object) + { + if (buffer == null) { + buffer = new LengthValueBuffer(); + } + serTo(object, buffer); + return buffer.toSlice(); + } + +// implement with tmp buffer +// @Override +// public Slice serialize(String object) +// { +// LVBuffer buffer = new LVBuffer(); +// serTo(object, buffer); +// return buffer.toSlice(); +// } + + @Override + public void serTo(String str, LengthValueBuffer buffer) + { + buffer.setObjectWithValue(str.getBytes()); + } + + @Override + public void reset() + { + if (buffer != null) { + buffer.reset(); + } + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableBlocksStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableBlocksStream.java new file mode 100644 index 0000000000..ffed723ddb --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableBlocksStream.java @@ -0,0 +1,133 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; + +public class WindowableBlocksStream extends BlocksStream implements WindowableByteStream +{ + private static final Logger logger = LoggerFactory.getLogger(WindowableBlocksStream.class); + /** + * Map from windowId to blockIds + */ + protected SetMultimap windowToBlockIds = HashMultimap.create(); + + /** + * set of all free blockIds. + */ + protected Set freeBlockIds = Sets.newHashSet(); + + // max block index; valid maxBlockIndex should >= 0 + protected int maxBlockIndex = 0; + + protected long currentWindowId; + + public WindowableBlocksStream() + { + super(); + } + + public WindowableBlocksStream(int blockCapacity) + { + super(blockCapacity); + } + + /** + * + * windowToBlockIds.put(currentWindowId, currentBlockIndex); + */ + @Override + public void write(byte[] data, final int offset, final int length) + { + super.write(data, offset, length); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + moveToNextWindow(); + } + + /** + * make sure different windows will not use same block move to next block if + * current block already used. + */ + protected void moveToNextWindow() + { + //use current block if it hasn't be used, else, move to next block + FixedBlock block = getOrCreateCurrentBlock(); + if (!block.isClear()) { + throw new RuntimeException("Current block not clear, should NOT move to next window. Please call toSlice() to output data first"); + } + if (block.size() > 0) { + moveToNextBlock(); + } + } + + /** + * This method try to use the free block first. Allocate new block if there + * are no any free block + * + * @return The previous block + */ + @Override + protected FixedBlock moveToNextBlock() + { + FixedBlock previousBlock = currentBlock; + if (!freeBlockIds.isEmpty()) { + currentBlockIndex = freeBlockIds.iterator().next(); + freeBlockIds.remove(currentBlockIndex); + currentBlock = this.blocks.get(currentBlockIndex); + } else { + currentBlockIndex = ++maxBlockIndex; + currentBlock = getOrCreateCurrentBlock(); + } + + windowToBlockIds.put(currentWindowId, currentBlockIndex); + + return previousBlock; + } + + /** + * probably need to call this method. call beginWindow(long) should be enough + */ + @Override + public void endWindow() + { + } + + @Override + public void resetUpToWindow(long windowId) + { + Set winIds = Sets.newHashSet(windowToBlockIds.keySet()); + int removedSize = 0; + for (long winId : winIds) { + if (winId <= windowId) { + Set removedBlockIds = windowToBlockIds.removeAll(winId); + + for(int blockId : removedBlockIds) { + removedSize += blocks.get(blockId).size(); + Block theBlock = blocks.get(blockId); + theBlock.reset(); + if(theBlock == currentBlock) { + //the client code could ask reset up to current window + //but the reset block should not be current block. current block should be reassigned. + moveToNextBlock(); + } + logger.debug("reset block: {}, currentBlock: {}", blockId, theBlock); + } + + freeBlockIds.addAll(removedBlockIds); + } + } + + size -= removedSize; + } + +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableByteStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableByteStream.java new file mode 100644 index 0000000000..3d0f94fae1 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableByteStream.java @@ -0,0 +1,5 @@ +package org.apache.apex.malhar.lib.utils.serde; + +public interface WindowableByteStream extends ByteStream, ResetableWindowListener +{ +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BufferStreamTester.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BufferStreamTester.java new file mode 100644 index 0000000000..46ef876a55 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BufferStreamTester.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import com.datatorrent.netlet.util.Slice; + +public class BufferStreamTester +{ + private static final Logger logger = LoggerFactory.getLogger(BufferStreamTester.class); + + protected Random random = new Random(); + + @Test + public void testWindowableBlockStream() + { + WindowableBlocksStream bs = new WindowableBlocksStream(); + List totalList = Lists.newArrayList(); + List slices = Lists.newArrayList(); + + for (int windowId = 0; windowId < 10; ++windowId) { + logger.info("window Id: {}", windowId); + + List list = generateList(); + totalList.addAll(list); + + bs.beginWindow(windowId); + writeToByteStream(bs, list, slices); + bs.endWindow(); + + if(windowId % 2 != 0) { + verify(totalList, slices); + + bs.resetUpToWindow(windowId); + totalList.clear(); + slices.clear(); + } + } + } + + @Test + public void testBlockStream() + { + BlocksStream bs = new BlocksStream(); + List totalList = Lists.newArrayList(); + List slices = Lists.newArrayList(); + + for (int tryTime = 0; tryTime < 10; ++tryTime) { + List list = generateList(); + totalList.addAll(list); + + writeToByteStream(bs, list, slices); + + if (tryTime % 2 != 0) { + verify(totalList, slices); + + bs.reset(); + totalList.clear(); + slices.clear(); + } + + } + } + + protected void writeToByteStream(ByteStream bs, List list, List slices) + { + for (byte[] bytes : list) { + int times = random.nextInt(100) + 1; + int remainLen = bytes.length; + int offset = 0; + while (times > 0 && remainLen > 0) { + int avgSubLen = remainLen / times; + times--; + if (avgSubLen == 0) { + bs.write(bytes, offset, remainLen); + break; + } + + int writeLen = remainLen; + if (times != 0) { + int subLen = random.nextInt(avgSubLen * 2); + writeLen = Math.min(subLen, remainLen); + } + bs.write(bytes, offset, writeLen); + + offset += writeLen; + remainLen -= writeLen; + } + slices.add(bs.toSlice()); + } + } + + protected void verify(List list, List slices) + { + //verify + Assert.assertTrue("size not equal.", list.size() == slices.size()); + + for (int i = 0; i < list.size(); ++i) { + byte[] bytes = list.get(i); + byte[] newBytes = slices.get(i).toByteArray(); + if (!Arrays.equals(bytes, newBytes)) { + Assert.assertArrayEquals(bytes, newBytes); + } + } + } + + protected List generateList() + { + List list = Lists.newArrayList(); + int size = random.nextInt(10000) + 1; + for (int i = 0; i < size; i++) { + list.add(generateByteArray()); + } + return list; + } + + protected byte[] generateByteArray() + { + int len = random.nextInt(10000) + 1; + byte[] bytes = new byte[len]; + random.nextBytes(bytes); + return bytes; + } +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerialToLVBufferTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerialToLVBufferTest.java new file mode 100644 index 0000000000..f2465276c1 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerialToLVBufferTest.java @@ -0,0 +1,140 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.datatorrent.netlet.util.Slice; + +public class SerialToLVBufferTest +{ + protected final int charNum = 62; + protected String[] testData = null; + protected final Random random = new Random(); + + @Before + public void generateTestData() + { + int size = random.nextInt(10000) + 1; + testData = new String[size]; + for(int i=0; i(String.class), new StringArraySerdeVerifier()); + } + + + @Test + public void testSerdeCollection() + { + SerdeCollectionWithLVBuffer> listSerde = new SerdeCollectionWithLVBuffer>(String.class); + listSerde.setCollectionClass(ArrayList.class); + testSerde(testData, listSerde, new StringListSerdeVerifier()); + } + + + public void testSerde(String[] strs, SerToLVBuffer serde, SerdeVerifier verifier) + { + LengthValueBuffer lvBuffer = new LengthValueBuffer(); + + for (int i = 0; i < 10; ++i) { + lvBuffer.beginWindow(i); + verifier.verifySerde(strs, serde, lvBuffer); + lvBuffer.endWindow(); + if (i % 3 == 0) { + lvBuffer.resetUpToWindow(i); + } + if (i % 4 == 0) { + lvBuffer.reset(); + } + } + lvBuffer.release(); + } + + public static interface SerdeVerifier + { + public void verifySerde(String[] datas, SerToLVBuffer serde, LengthValueBuffer lvBuffer); + } + + public static class StringSerdeVerifier implements SerdeVerifier + { + @Override + public void verifySerde(String[] datas, SerToLVBuffer serde, LengthValueBuffer lvBuffer) + { + for (String str : datas) { + Slice slice = serde.serialize(str); + Assert.assertTrue("serialize with LVBuffer failed, String: " + str, str.equals(serde.deserialize(slice))); + + serde.serTo(str, lvBuffer); + Assert.assertTrue("serTo with LVBuffer failed, String: " + str, + str.equals(serde.deserialize(lvBuffer.toSlice()))); + } + } + } + + public static class StringArraySerdeVerifier implements SerdeVerifier + { + @Override + public void verifySerde(String[] datas, SerToLVBuffer serde, LengthValueBuffer lvBuffer) + { + Slice slice = serde.serialize(datas); + String[] newStrs = serde.deserialize(slice); + Assert.assertArrayEquals("serialize array failed.", datas, newStrs); + + serde.serTo(datas, lvBuffer); + Assert.assertArrayEquals("serTo array failed.", datas, serde.deserialize(lvBuffer.toSlice())); + } + } + + public static class StringListSerdeVerifier implements SerdeVerifier> + { + @Override + public void verifySerde(String[] datas, SerToLVBuffer> serdeList, LengthValueBuffer lvBuffer) + { + List list = Arrays.asList(datas); + + Slice slice = serdeList.serialize(list); + List newStrs = serdeList.deserialize(slice); + Assert.assertArrayEquals("serialize list failed.", datas, newStrs.toArray(new String[0])); + + serdeList.serTo(list, lvBuffer); + newStrs = serdeList.deserialize(lvBuffer.toSlice()); + Assert.assertArrayEquals("serTo array failed.", datas, newStrs.toArray(new String[0])); + lvBuffer.reset(); + } + } + +} From f718af38d890e3571a690287f171b3bc7c1a301b Mon Sep 17 00:00:00 2001 From: brightchen Date: Wed, 31 Aug 2016 11:09:49 -0700 Subject: [PATCH 2/5] APEXMALHAR-2190 #resolve #comment Use reusable buffer to serial spillable data structure --- .../spillable/SpillableBenchmarkApp.java | 71 +++++++ .../spillable/SpillableTestInputOperator.java | 46 ++++ .../spillable/SpillableTestOperator.java | 179 ++++++++++++++++ .../SpillableBenchmarkAppTester.java | 64 ++++++ .../spillable/SpillableDSBenchmarkTest.java | 198 ++++++++++++++++++ .../spillable/TimeBasedPriorityQueue.java | 13 +- .../apex/malhar/lib/utils/serde/Block.java | 10 +- .../malhar/lib/utils/serde/BlocksStream.java | 17 +- .../malhar/lib/utils/serde/ByteStream.java | 4 +- .../lib/utils/serde/LengthValueBuffer.java | 11 +- .../lib/utils/serde/SerdeListSlice.java | 7 +- .../serde/SerdeListSliceWithLVBuffer.java | 8 +- .../lib/utils/serde/SerdeLongSlice.java | 46 ++++ .../utils/serde/WindowableBlocksStream.java | 10 + .../state/spillable/SpillableTestUtils.java | 4 +- 15 files changed, 672 insertions(+), 16 deletions(-) create mode 100644 benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java create mode 100644 benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java create mode 100644 benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java create mode 100644 benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java create mode 100644 benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java new file mode 100644 index 0000000000..9cdc636a1f --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.fileaccess.TFileImpl; + +@ApplicationAnnotation(name = "SpillableBenchmarkApp") +public class SpillableBenchmarkApp implements StreamingApplication +{ + protected final String PROP_STORE_PATH = "dt.application.SpillableBenchmarkApp.storeBasePath"; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Create ActiveMQStringSinglePortOutputOperator + SpillableTestInputOperator input = new SpillableTestInputOperator(); + input.batchSize = 100; + input.sleepBetweenBatch = 0; + input = dag.addOperator("input", input); + + SpillableTestOperator testOperator = new SpillableTestOperator(); + testOperator.store = createStore(conf); + testOperator.shutdownCount = -1; + testOperator = dag.addOperator("test", testOperator ); + + + // Connect ports + dag.addStream("stream", input.output, testOperator.input );//.setLocality(DAG.Locality.CONTAINER_LOCAL); + //dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1); //use normal + //dag.setAttribute(testOperator, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 2); + } + + + public ManagedStateSpillableStateStore createStore(Configuration conf) + { + String basePath = getStoreBasePath(conf); + ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore(); + ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath); + return store; + } + + public String getStoreBasePath(Configuration conf) + { + return Preconditions.checkNotNull(conf.get(PROP_STORE_PATH), + "base path should be specified in the properties.xml"); + } +} diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java new file mode 100644 index 0000000000..afcf3bc82d --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public class SpillableTestInputOperator extends BaseOperator implements InputOperator +{ + public final transient DefaultOutputPort output = new DefaultOutputPort(); + public long count = 0; + public int batchSize = 100; + public int sleepBetweenBatch = 1; + + @Override + public void emitTuples() + { + for (int i = 0; i < batchSize; ++i) { + output.emit("" + ++count); + } + if (sleepBetweenBatch > 0) { + try { + Thread.sleep(sleepBetweenBatch); + } catch (Exception e) { + //ignore + } + } + } +} \ No newline at end of file diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java new file mode 100644 index 0000000000..14070d6da6 --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeLongSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.common.util.BaseOperator; + +public class SpillableTestOperator extends BaseOperator implements Operator.CheckpointNotificationListener +{ + public static transient final Logger logger = LoggerFactory.getLogger(SpillableTestOperator.class); + + public static final byte[] ID1 = new byte[] { (byte)1 }; + public static final byte[] ID2 = new byte[] { (byte)2 }; + public static final byte[] ID3 = new byte[] { (byte)3 }; + + public SpillableByteArrayListMultimapImpl multiMap; + + public ManagedStateSpillableStateStore store; + + public long totalCount = 0; + public transient long countInWindow; + public long minWinId = -1; + public long committedWinId = -1; + public long windowId; + + public SpillableByteMapImpl windowToCount; + + public long shutdownCount = -1; + + public final transient DefaultInputPort input = new DefaultInputPort() + { + @Override + public void process(String tuple) + { + processTuple(tuple); + } + }; + + public void processTuple(String tuple) + { + if(++totalCount == shutdownCount) + throw new RuntimeException("Test recovery. count = " + totalCount); + countInWindow++; + multiMap.put(""+windowId, tuple); + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + if(windowToCount == null) { + windowToCount = createWindowToCountMap(store); + } + if(multiMap == null) { + multiMap = createMultimap(store); + } + + store.setup(context); + multiMap.setup(context); + + checkData(); + } + + public void checkData() + { + logger.info("checkData(): totalCount: {}; minWinId: {}; committedWinId: {}; curWinId: {}", totalCount, this.minWinId, committedWinId, this.windowId); + for(long winId = Math.max(committedWinId+1, minWinId); winId < this.windowId; ++winId) { + Long count = this.windowToCount.get(winId); + SpillableArrayListImpl datas = (SpillableArrayListImpl)multiMap.get("" + winId); + if((datas == null && count != null) || (datas != null && count == null)) { + logger.error("datas: {}; count: {}", datas, count); + } else if(datas == null && count == null) { + logger.error("Both datas and count are null. probably something wrong."); + } else { + int dataSize = datas.size(); + if((long)count != (long)dataSize) { + logger.error("data size not equal: window Id: {}; datas size: {}; count: {}", winId, dataSize, count); + } + } + } + } + + + /** + * {@inheritDoc} + */ + @Override + public void beginWindow(long windowId) + { + store.beginWindow(windowId); + multiMap.beginWindow(windowId); + if(minWinId < 0) { + minWinId = windowId; + } + + this.windowId = windowId; + countInWindow = 0; + } + + @Override + public void endWindow() + { + multiMap.endWindow(); + windowToCount.put(windowId, countInWindow); + windowToCount.endWindow(); + store.endWindow(); + + if(windowId % 10 == 0) { + long startTime = System.currentTimeMillis(); + checkData(); + logger.info("checkData() took {} millis.", System.currentTimeMillis() - startTime); + } + } + + @Override + public void beforeCheckpoint(long windowId) + { + store.beforeCheckpoint(windowId); + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + this.committedWinId = windowId; + store.committed(windowId); + } + + public static SpillableByteArrayListMultimapImpl createMultimap(SpillableStateStore store) + { + return new SpillableByteArrayListMultimapImpl(store, ID1, 0L, new SerdeStringSlice(), + new SerdeStringSlice()); + } + + public static SpillableByteMapImpl createMap(SpillableStateStore store) + { + return new SpillableByteMapImpl(store, ID2, 0L, new SerdeStringSlice(), + new SerdeStringSlice()); + } + + public static SpillableByteMapImpl createWindowToCountMap(SpillableStateStore store) + { + return new SpillableByteMapImpl(store, ID3, 0L, new SerdeLongSlice(), + new SerdeLongSlice()); + } +} diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java new file mode 100644 index 0000000000..8c1f6cf3e9 --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + +public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp +{ + public static final String basePath = "target/temp"; + @Test + public void test() throws Exception + { + Configuration conf = new Configuration(false); + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + super.populateDAG(dag, conf); + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(60000); + + lc.shutdown(); + } + + @Override + public String getStoreBasePath(Configuration conf) + { + return basePath; + } +} diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java new file mode 100644 index 0000000000..a2f3d8320c --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.Bucket; +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.google.common.collect.Maps; + +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + + +public class SpillableDSBenchmarkTest +{ + public static final transient Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class); + protected static final transient int loopCount = 100000000; + protected static final transient long oneMB = 1024*1024; + protected static final transient int keySize = 1000000; + protected static final transient int valueSize = 100000000; + + protected final transient Random random = new Random(); + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + public static class OptimisedStateStore extends ManagedStateSpillableStateStore + { + protected long windowId; + + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + this.windowId = windowId; + } + + @Override + public void endWindow() + { + super.endWindow(); + beforeCheckpoint(this.windowId); + } + + /** + * - beforeCheckpoint() and other process method should be in same thread, + * and no need lock + */ + @Override + public void beforeCheckpoint(long windowId) + { + Map> flashData = Maps.newHashMap(); + + for (Bucket bucket : buckets) { + if (bucket != null) { + Map flashDataForBucket = bucket.checkpoint(windowId); + if (!flashDataForBucket.isEmpty()) { + flashData.put(bucket.getBucketId(), flashDataForBucket); + } + } + } + if (!flashData.isEmpty()) { + try { + getCheckpointManager().save(flashData, operatorContext.getId(), windowId, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + + flashData.clear(); + } + } + + } + + @Test + public void testSpillableMutimap() + { + testSpillableMutimap(true); + } + + public void testSpillableMutimap(boolean useLvBuffer) + { + byte[] ID1 = new byte[]{(byte)1}; + OptimisedStateStore store = new OptimisedStateStore(); + ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath("target/temp"); + + SerdeStringSlice keySerde = createKeySerde(); + ; + SerdeStringSlice valueSerde = createValueSerde(); + ; + + SpillableByteArrayListMultimapImpl multiMap = new SpillableByteArrayListMultimapImpl( + store, ID1, 0L, keySerde, valueSerde); + + store.setup(testMeta.operatorContext); + multiMap.setup(testMeta.operatorContext); + + final long startTime = System.currentTimeMillis(); + + long windowId = 0; + store.beginWindow(++windowId); + multiMap.beginWindow(windowId); + + int outputTimes = 0; + for (int i = 0; i < loopCount; ++i) { + putEntry(multiMap); + + if (i % 100000 == 0) { + multiMap.endWindow(); + store.endWindow(); + + //NOTES: it will great impact the performance if the size of buffer is too large + resetBuffer(); + + //next window + store.beginWindow(++windowId); + multiMap.beginWindow(windowId); + } + + long spentTime = System.currentTimeMillis() - startTime; + if (spentTime > outputTimes * 60000) { + ++outputTimes; + logger.info("Spent {} mills for {} operation. average: {}", spentTime, i, i / spentTime); + checkEnvironment(); + } + + } + long spentTime = System.currentTimeMillis() - startTime; + + logger.info("Spent {} mills for {} operation. average: {}", spentTime, loopCount, + loopCount / spentTime); + } + + /** + * put the entry into the map + * @param multiMap + */ + public void putEntry(SpillableByteArrayListMultimapImpl multiMap) + { + multiMap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize))); + } + + public void checkEnvironment() + { + Runtime runtime = Runtime.getRuntime(); + + long maxMemory = runtime.maxMemory(); + long allocatedMemory = runtime.totalMemory(); + long freeMemory = runtime.freeMemory(); + + logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory / oneMB, + allocatedMemory / oneMB, maxMemory / oneMB); + + Assert.assertTrue("Used up all memory.", maxMemory - allocatedMemory > oneMB); + } + + protected SerdeStringSlice createKeySerde() + { + return new SerdeStringSlice(); + } + + protected SerdeStringSlice createValueSerde() + { + return new SerdeStringSlice(); + } + + protected void resetBuffer() + { + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java index 025c501cfd..f28e0b20f6 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java @@ -115,8 +115,17 @@ public int compareTo(TimeWrapper timeWrapper) } else if (this.timestamp > timeWrapper.getTimestamp()) { return 1; } - - return 0; + + /** + * NOTE: the following use the equals() to implement the compareTo() for key. + * it should be OK as the compareTo() only used by TimeBasedPriorityQueue.sortedTimestamp, + * which only care about the order of time ( the order for key doesn't matter ). + * But would cause problem if add other function which depended on the order of the key. + * + * Add compare by hashCode when not equals in order to compatible with the interface for most cases. + * Anyway, the order of key is not guaranteed. And we should not return 0 if not equals + */ + return key.equals(timeWrapper.key) ? 0 : (hashCode() - timeWrapper.hashCode() <= 0 ? -1 : 1); } @Override diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java index d9a46e9569..03193ee563 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java @@ -77,8 +77,6 @@ protected void checkOrReallocateBuffer(int length) //calculate the new capacity capacity = (size + length) * 2; - logger.info("Going to assign buffer size: {}", capacity); - byte[] oldBuffer = buffer; buffer = new byte[capacity]; @@ -142,11 +140,17 @@ public boolean hasEnoughSpace(int length) return size + length < capacity; } - public int size() + public long size() { return size; } + @Override + public long capacity() + { + return capacity; + } + public boolean isFresh() { return (size == 0 && objectBeginOffset == 0 && slices.isEmpty() ); diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java index f25ca6b4e8..199f088d02 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java @@ -46,7 +46,7 @@ public class BlocksStream implements ByteStream protected Map blocks = Maps.newHashMap(); //the index of current block, valid block index should >= 0 protected int currentBlockIndex = 0; - protected int size = 0; + protected long size = 0; protected FixedBlock currentBlock; @@ -118,16 +118,29 @@ protected FixedBlock getOrCreateCurrentBlock() if (block == null) { block = new FixedBlock(blockCapacity); blocks.put(currentBlockIndex, block); + if (blocks.size() % 50 == 0) { + logger.info("blocks: {}, size of each block: {}", blocks.size(), blockCapacity); + } } return block; } @Override - public int size() + public long size() { return size; } + @Override + public long capacity() + { + long capacity = 0; + for (FixedBlock block : blocks.values()) { + capacity += block.capacity(); + } + return capacity; + } + /** * * this is the last call which represent the end of an object diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ByteStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ByteStream.java index 2984167985..a07d8859bb 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ByteStream.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ByteStream.java @@ -26,8 +26,10 @@ public interface ByteStream void write(byte[] data, final int offset, final int length); - int size(); + long size(); + long capacity(); + Slice toSlice(); void reset(); diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java index 62878f63fb..7a7bdd629e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java @@ -20,7 +20,6 @@ import java.util.Map; -import org.apache.apex.malhar.lib.state.spillable.WindowListener; import org.apache.commons.lang3.mutable.MutableInt; import com.google.common.collect.Maps; @@ -103,7 +102,8 @@ public void setObjectWithValue(byte[] value) public int markPlaceHolderForLength() { try { - int offset = windowableByteStream.size(); + //the size/capacity of each block is int + int offset = (int)windowableByteStream.size(); windowableByteStream.write(lengthPlaceHolder); return offset; } catch (Exception e) { @@ -111,10 +111,15 @@ public int markPlaceHolderForLength() } } - public int getSize() + public long size() { return windowableByteStream.size(); } + + public long capacity() + { + return windowableByteStream.capacity(); + } /** * diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java index 130835f6a9..6216fb483d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java @@ -43,6 +43,7 @@ public class SerdeListSlice implements Serde, Slice> protected SerdeListSlice() { // for Kryo + throw new RuntimeException("should only called by Kryo"); } /** @@ -91,12 +92,14 @@ public List deserialize(Slice slice, MutableInt offset) int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset); List list = Lists.newArrayListWithCapacity(numElements); sliceOffset.subtract(slice.offset); - +try{ for (int index = 0; index < numElements; index++) { T object = serde.deserialize(slice, sliceOffset); list.add(object); } - +}catch(NullPointerException e) { + e.printStackTrace(); +} offset.setValue(sliceOffset.intValue()); return list; } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java index 9fc7dc7146..6649ecc825 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java @@ -26,18 +26,24 @@ import com.datatorrent.netlet.util.Slice; +/** + * The serialize was implemented by this class, the deserialize was inherited from super class + * + * @param The type of serializer for item + */ public class SerdeListSliceWithLVBuffer extends SerdeListSlice implements SerToLVBuffer> { protected SerToLVBuffer itemSerTo; protected LengthValueBuffer buffer; - protected SerdeListSliceWithLVBuffer() + private SerdeListSliceWithLVBuffer() { // for Kryo } public SerdeListSliceWithLVBuffer(@NotNull SerToLVBuffer serde, LengthValueBuffer buffer) { + super(serde); this.itemSerTo = Preconditions.checkNotNull(serde); this.buffer = Preconditions.checkNotNull(buffer); } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java new file mode 100644 index 0000000000..2327d761dd --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +public class SerdeLongSlice implements Serde +{ + @Override + public Slice serialize(Long object) + { + return new Slice(GPOUtils.serializeLong(object)); + } + + @Override + public Long deserialize(Slice slice, MutableInt offset) + { + return GPOUtils.deserializeLong(slice.buffer, offset); + } + + @Override + public Long deserialize(Slice object) + { + return deserialize(object, new MutableInt(0)); + } +} + diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableBlocksStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableBlocksStream.java index ffed723ddb..fd9274b13d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableBlocksStream.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowableBlocksStream.java @@ -130,4 +130,14 @@ public void resetUpToWindow(long windowId) size -= removedSize; } + @Override + public void reset() + { + super.reset(); + + //all blocks are free now except the current one + freeBlockIds.addAll(blocks.keySet()); + freeBlockIds.remove(currentBlockIndex); + + } } diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java index 00ea58d2e5..903d2fc394 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java @@ -51,10 +51,10 @@ private SpillableTestUtils() //Shouldn't instantiate this } - static class TestMeta extends TestWatcher + public static class TestMeta extends TestWatcher { ManagedStateSpillableStateStore store; - Context.OperatorContext operatorContext; + public Context.OperatorContext operatorContext; String applicationPath; @Override From 0e737c20cb665e77e4e3c0684fe36dd4d7949ceb Mon Sep 17 00:00:00 2001 From: brightchen Date: Wed, 31 Aug 2016 11:14:17 -0700 Subject: [PATCH 3/5] APEXMALHAR-2182 benchmark for spillable data structure --- benchmark/pom.xml | 5 + .../spillable/SpillableDSBenchmarkTest.java | 98 +++++++++++++------ benchmark/src/test/resources/log4j.properties | 2 + 3 files changed, 76 insertions(+), 29 deletions(-) diff --git a/benchmark/pom.xml b/benchmark/pom.xml index 239d9faddc..d6c405a4ec 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -595,6 +595,11 @@ + + joda-time + joda-time + 2.9.1 + diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java index a2f3d8320c..2aa9d0707d 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java @@ -23,6 +23,7 @@ import java.util.Random; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; @@ -30,9 +31,12 @@ import org.apache.apex.malhar.lib.state.managed.Bucket; import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl; import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.LengthValueBuffer; import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringWithLVBuffer; import com.google.common.collect.Maps; @@ -45,11 +49,21 @@ public class SpillableDSBenchmarkTest public static final transient Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class); protected static final transient int loopCount = 100000000; protected static final transient long oneMB = 1024*1024; - protected static final transient int keySize = 1000000; - protected static final transient int valueSize = 100000000; + protected static final transient int keySize = 100000; + protected static final transient int valueSize = 100000; + protected static final transient int valuesPerKey = 100; + protected static final int maxKeyLength = 100; + protected static final int maxValueLength = 1000; + + protected static final int tuplesPerWindow = 10000; + protected static final int checkPointWindows = 10; protected final transient Random random = new Random(); + protected String[] keys; + protected String[] values; + protected LengthValueBuffer buffer = new LengthValueBuffer(); + @Rule public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); @@ -67,7 +81,6 @@ public void beginWindow(long windowId) public void endWindow() { super.endWindow(); - beforeCheckpoint(this.windowId); } /** @@ -97,61 +110,72 @@ public void beforeCheckpoint(long windowId) flashData.clear(); } } - } - @Test - public void testSpillableMutimap() + @Before + public void setup() { - testSpillableMutimap(true); + keys = new String[keySize]; + for (int i = 0; i < keys.length; ++i) { + keys[i] = this.randomString(maxKeyLength); + } + + values = new String[valueSize]; + for (int i = 0; i < values.length; ++i) { + values[i] = this.randomString(maxValueLength); + } } + + - public void testSpillableMutimap(boolean useLvBuffer) + @Test + public void testSpillableMap() { byte[] ID1 = new byte[]{(byte)1}; OptimisedStateStore store = new OptimisedStateStore(); ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath("target/temp"); SerdeStringSlice keySerde = createKeySerde(); - ; SerdeStringSlice valueSerde = createValueSerde(); - ; - SpillableByteArrayListMultimapImpl multiMap = new SpillableByteArrayListMultimapImpl( - store, ID1, 0L, keySerde, valueSerde); +// SpillableByteArrayListMultimapImpl map = new SpillableByteArrayListMultimapImpl( +// store, ID1, 0L, keySerde, valueSerde, buffer); + + SpillableByteMapImpl map = new SpillableByteMapImpl(store, ID1, 0L, keySerde, valueSerde); store.setup(testMeta.operatorContext); - multiMap.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); final long startTime = System.currentTimeMillis(); long windowId = 0; store.beginWindow(++windowId); - multiMap.beginWindow(windowId); + map.beginWindow(windowId); int outputTimes = 0; for (int i = 0; i < loopCount; ++i) { - putEntry(multiMap); + putEntry(map); - if (i % 100000 == 0) { - multiMap.endWindow(); + if (i % tuplesPerWindow == 0) { + map.endWindow(); store.endWindow(); - //NOTES: it will great impact the performance if the size of buffer is too large - resetBuffer(); - + if(i % (tuplesPerWindow * checkPointWindows) == 0) { + store.beforeCheckpoint(windowId); + resetBuffer(); + } + //next window store.beginWindow(++windowId); - multiMap.beginWindow(windowId); + map.beginWindow(windowId); } long spentTime = System.currentTimeMillis() - startTime; - if (spentTime > outputTimes * 60000) { + if (spentTime > outputTimes * 5000) { ++outputTimes; - logger.info("Spent {} mills for {} operation. average: {}", spentTime, i, i / spentTime); + logger.info("Spent {} mills for {} operation. average: {}, buffer size: {}, buffer capacity: {}", spentTime, i, i / spentTime, buffer.size(), buffer.capacity()); checkEnvironment(); } - } long spentTime = System.currentTimeMillis() - startTime; @@ -165,9 +189,26 @@ public void testSpillableMutimap(boolean useLvBuffer) */ public void putEntry(SpillableByteArrayListMultimapImpl multiMap) { - multiMap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize))); + multiMap.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]); + } + + public void putEntry(SpillableByteMapImpl map) + { + map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]); } + public static final String characters = "0123456789ABCDEFGHIJKLMNOPKRSTUVWXYZabcdefghijklmopqrstuvwxyz"; + + protected static final char[] text = new char[Math.max(maxKeyLength, maxValueLength)]; + + public String randomString(int length) + { + for (int i = 0; i < length; i++) { + text[i] = characters.charAt(random.nextInt(characters.length())); + } + return new String(text, 0, length); + } + public void checkEnvironment() { Runtime runtime = Runtime.getRuntime(); @@ -178,21 +219,20 @@ public void checkEnvironment() logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory / oneMB, allocatedMemory / oneMB, maxMemory / oneMB); - - Assert.assertTrue("Used up all memory.", maxMemory - allocatedMemory > oneMB); } protected SerdeStringSlice createKeySerde() { - return new SerdeStringSlice(); + return new SerdeStringWithLVBuffer(buffer); } protected SerdeStringSlice createValueSerde() { - return new SerdeStringSlice(); + return new SerdeStringWithLVBuffer(buffer); } protected void resetBuffer() { + buffer.reset(); } } diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties index cf0d19ec1b..3fc0120e0f 100644 --- a/benchmark/src/test/resources/log4j.properties +++ b/benchmark/src/test/resources/log4j.properties @@ -41,3 +41,5 @@ log4j.logger.org=info #log4j.logger.org.apache.commons.beanutils=warn log4j.logger.com.datatorrent=debug log4j.logger.org.apache.apex=debug +log4j.logger.org.apache.apex.malhar.lib.state.managed=info +log4j.logger.com.datatorrent.common.util.FSStorageAgent=info From 58cc3c274bfe0fb8b82c353a1b0b32ef504efb42 Mon Sep 17 00:00:00 2001 From: brightchen Date: Wed, 31 Aug 2016 14:01:45 -0700 Subject: [PATCH 4/5] changing --- .../spillable/SpillableDSBenchmarkTest.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java index 2aa9d0707d..620da2b464 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Random; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -34,9 +33,7 @@ import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl; import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.LengthValueBuffer; import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringWithLVBuffer; import com.google.common.collect.Maps; @@ -62,7 +59,6 @@ public class SpillableDSBenchmarkTest protected String[] keys; protected String[] values; - protected LengthValueBuffer buffer = new LengthValueBuffer(); @Rule public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); @@ -162,7 +158,6 @@ public void testSpillableMap() if(i % (tuplesPerWindow * checkPointWindows) == 0) { store.beforeCheckpoint(windowId); - resetBuffer(); } //next window @@ -173,7 +168,7 @@ public void testSpillableMap() long spentTime = System.currentTimeMillis() - startTime; if (spentTime > outputTimes * 5000) { ++outputTimes; - logger.info("Spent {} mills for {} operation. average: {}, buffer size: {}, buffer capacity: {}", spentTime, i, i / spentTime, buffer.size(), buffer.capacity()); + logger.info("Spent {} mills for {} operation. average: {}", spentTime, i, i / spentTime); checkEnvironment(); } } @@ -223,16 +218,12 @@ public void checkEnvironment() protected SerdeStringSlice createKeySerde() { - return new SerdeStringWithLVBuffer(buffer); + return new SerdeStringSlice(); } protected SerdeStringSlice createValueSerde() { - return new SerdeStringWithLVBuffer(buffer); + return new SerdeStringSlice(); } - protected void resetBuffer() - { - buffer.reset(); - } } From fe36a17413446924d9a24d289e92b2738718d46f Mon Sep 17 00:00:00 2001 From: brightchen Date: Thu, 1 Sep 2016 13:26:09 -0700 Subject: [PATCH 5/5] APEXMALHAR-2190 #resolve #comment Use reusable buffer to serial spillable data structure --- .../spillable/SpillableDSBenchmarkTest.java | 41 ++++--- .../spillable/SpillableArrayListImpl.java | 10 +- .../state/spillable/SpillableByteMapImpl.java | 63 +++++++++-- .../utils/serde/AbstractSerializeBuffer.java | 105 ++++++++++++++++++ .../malhar/lib/utils/serde/BlocksStream.java | 6 +- .../lib/utils/serde/BytesPrefixBuffer.java | 56 ++++++++++ .../lib/utils/serde/CompositeSerializer.java | 43 +++++++ .../lib/utils/serde/LengthValueBuffer.java | 66 +---------- ...VBuffer.java => SerToSerializeBuffer.java} | 4 +- ...ava => SerdeArrayWithSerializeBuffer.java} | 27 +++-- ...> SerdeCollectionWithSerializeBuffer.java} | 27 +++-- ...=> SerdeListSliceWithSerializeBuffer.java} | 16 ++- ...va => SerdeStringWithSerializeBuffer.java} | 21 +--- .../lib/utils/serde/SerializeBuffer.java | 51 +++++++++ .../lib/utils/serde/SerialToLVBufferTest.java | 16 +-- 15 files changed, 404 insertions(+), 148 deletions(-) create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AbstractSerializeBuffer.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BytesPrefixBuffer.java create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CompositeSerializer.java rename library/src/main/java/org/apache/apex/malhar/lib/utils/serde/{SerToLVBuffer.java => SerToSerializeBuffer.java} (50%) rename library/src/main/java/org/apache/apex/malhar/lib/utils/serde/{SerdeArrayWithLVBuffer.java => SerdeArrayWithSerializeBuffer.java} (77%) rename library/src/main/java/org/apache/apex/malhar/lib/utils/serde/{SerdeCollectionWithLVBuffer.java => SerdeCollectionWithSerializeBuffer.java} (81%) rename library/src/main/java/org/apache/apex/malhar/lib/utils/serde/{SerdeListSliceWithLVBuffer.java => SerdeListSliceWithSerializeBuffer.java} (74%) rename library/src/main/java/org/apache/apex/malhar/lib/utils/serde/{SerdeStringWithLVBuffer.java => SerdeStringWithSerializeBuffer.java} (73%) create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializeBuffer.java diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java index 620da2b464..ce9771b632 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Random; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -33,7 +34,10 @@ import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl; import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.BytesPrefixBuffer; +import org.apache.apex.malhar.lib.utils.serde.LengthValueBuffer; import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringWithSerializeBuffer; import com.google.common.collect.Maps; @@ -48,7 +52,6 @@ public class SpillableDSBenchmarkTest protected static final transient long oneMB = 1024*1024; protected static final transient int keySize = 100000; protected static final transient int valueSize = 100000; - protected static final transient int valuesPerKey = 100; protected static final int maxKeyLength = 100; protected static final int maxValueLength = 1000; @@ -57,6 +60,8 @@ public class SpillableDSBenchmarkTest protected final transient Random random = new Random(); + protected final LengthValueBuffer buffer = new LengthValueBuffer(); + protected final BytesPrefixBuffer keyBuffer = new BytesPrefixBuffer(); protected String[] keys; protected String[] values; @@ -131,14 +136,10 @@ public void testSpillableMap() OptimisedStateStore store = new OptimisedStateStore(); ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath("target/temp"); - SerdeStringSlice keySerde = createKeySerde(); + SerdeStringWithSerializeBuffer keySerde = createKeySerde(); SerdeStringSlice valueSerde = createValueSerde(); - -// SpillableByteArrayListMultimapImpl map = new SpillableByteArrayListMultimapImpl( -// store, ID1, 0L, keySerde, valueSerde, buffer); - - SpillableByteMapImpl map = new SpillableByteMapImpl(store, ID1, 0L, keySerde, valueSerde); + SpillableByteMapImpl map = new SpillableByteMapImpl(store, ID1, 0L, keySerde, valueSerde, keyBuffer); store.setup(testMeta.operatorContext); map.setup(testMeta.operatorContext); @@ -158,6 +159,12 @@ public void testSpillableMap() if(i % (tuplesPerWindow * checkPointWindows) == 0) { store.beforeCheckpoint(windowId); + + //clear the buffer + buffer.reset(); + + keyBuffer.reset(); + //map.resetBuffer(); } //next window @@ -168,7 +175,7 @@ public void testSpillableMap() long spentTime = System.currentTimeMillis() - startTime; if (spentTime > outputTimes * 5000) { ++outputTimes; - logger.info("Spent {} mills for {} operation. average: {}", spentTime, i, i / spentTime); + logger.info("Spent {} mills for {} operation. average: {}, key buffer capacity: {}, value buffer capacity: {}", spentTime, i, i / spentTime, keyBuffer.capacity(), buffer.capacity()); checkEnvironment(); } } @@ -208,22 +215,24 @@ public void checkEnvironment() { Runtime runtime = Runtime.getRuntime(); - long maxMemory = runtime.maxMemory(); - long allocatedMemory = runtime.totalMemory(); - long freeMemory = runtime.freeMemory(); + long maxMemory = runtime.maxMemory()/oneMB; + long allocatedMemory = runtime.totalMemory()/oneMB; + long freeMemory = runtime.freeMemory()/oneMB; + + logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory, + allocatedMemory, maxMemory); - logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory / oneMB, - allocatedMemory / oneMB, maxMemory / oneMB); + Assert.assertFalse("Run out of memory.", allocatedMemory == maxMemory && freeMemory < 200); } - protected SerdeStringSlice createKeySerde() + protected SerdeStringWithSerializeBuffer createKeySerde() { - return new SerdeStringSlice(); + return new SerdeStringWithSerializeBuffer(); } protected SerdeStringSlice createValueSerde() { - return new SerdeStringSlice(); + return new SerdeStringWithSerializeBuffer(buffer); } } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java index a648e4349c..11eff8d6d4 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java @@ -26,10 +26,10 @@ import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.utils.serde.LengthValueBuffer; -import org.apache.apex.malhar.lib.utils.serde.SerToLVBuffer; +import org.apache.apex.malhar.lib.utils.serde.SerToSerializeBuffer; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdeListSliceWithLVBuffer; +import org.apache.apex.malhar.lib.utils.serde.SerdeListSliceWithSerializeBuffer; import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.kryo.DefaultSerializer; @@ -65,7 +65,7 @@ public class SpillableArrayListImpl implements Spillable.SpillableArrayList valueSerde; + protected transient SerdeListSliceWithSerializeBuffer valueSerde; protected transient LengthValueBuffer buffer; private SpillableArrayListImpl() @@ -129,11 +129,11 @@ public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, @NotNull Sp this.store = Preconditions.checkNotNull(store); this.serde = Preconditions.checkNotNull(serde); - if (!(serde instanceof SerToLVBuffer)) { + if (!(serde instanceof SerToSerializeBuffer)) { throw new IllegalArgumentException("Invalid serde, expect instanceof SerToLVBuffer"); } - valueSerde = new SerdeListSliceWithLVBuffer((SerToLVBuffer)serde, buffer); + valueSerde = new SerdeListSliceWithSerializeBuffer((SerToSerializeBuffer)serde, buffer); Preconditions.checkArgument(this.batchSize > 0); this.batchSize = batchSize; diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java index da313ee09a..aebabc22dc 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java @@ -26,6 +26,9 @@ import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.utils.serde.BytesPrefixBuffer; +import org.apache.apex.malhar.lib.utils.serde.LengthValueBuffer; +import org.apache.apex.malhar.lib.utils.serde.SerToSerializeBuffer; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.SliceUtils; import org.apache.commons.lang3.ArrayUtils; @@ -57,13 +60,20 @@ public class SpillableByteMapImpl implements Spillable.SpillableByteMap serdeKey; + //serKeyToBuffer can reuse buffer + protected SerToSerializeBuffer serKeyToBuffer; + @NotNull private Serde serdeValue; private int size = 0; + @NotNull + protected BytesPrefixBuffer buffer; + private SpillableByteMapImpl() { //for kryo @@ -88,6 +98,19 @@ public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long b this.serdeValue = Preconditions.checkNotNull(serdeValue); } + public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, SerToSerializeBuffer serKeyToBuffer, + Serde serdeValue, @NotNull BytesPrefixBuffer buffer) + { + this.store = Preconditions.checkNotNull(store); + this.identifier = Preconditions.checkNotNull(identifier); + this.bucket = bucket; + this.serKeyToBuffer = Preconditions.checkNotNull(serKeyToBuffer); + this.serdeValue = Preconditions.checkNotNull(serdeValue); + this.buffer = buffer; + this.buffer.setPrefix(identifier); + } + + public SpillableStateStore getStore() { return this.store; @@ -132,7 +155,7 @@ public V get(Object o) return val; } - Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key))); + Slice valSlice = store.getSync(bucket, serializeKey(key)); if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { return null; @@ -142,6 +165,16 @@ public V get(Object o) return serdeValue.deserialize(valSlice, tempOffset); } + protected Slice serializeKey(K key) + { + if( this.serKeyToBuffer != null) { + serKeyToBuffer.serTo(key, buffer); + return buffer.toSlice(); + } + + return SliceUtils.concatenate(identifier, serdeKey.serialize(key)); + } + @Override public V put(K k, V v) { @@ -215,19 +248,27 @@ public void beginWindow(long windowId) @Override public void endWindow() { - for (K key: cache.getChangedKeys()) { - store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), - serdeValue.serialize(cache.get(key))); - } - - for (K key: cache.getRemovedKeys()) { - store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), - new Slice(ArrayUtils.EMPTY_BYTE_ARRAY)); + if (serKeyToBuffer != null) { + for (K key : cache.getChangedKeys()) { + serKeyToBuffer.serTo(key, buffer); + store.put(this.bucket, buffer.toSlice(), serdeValue.serialize(cache.get(key))); + } + } else { + for (K key : cache.getChangedKeys()) { + store.put(this.bucket, serdeKey.serialize(key), serdeValue.serialize(cache.get(key))); + } } - + cache.endWindow(); } + public void resetBuffer() + { + if (buffer != null) { + buffer.reset(); + } + } + @Override public void teardown() { diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AbstractSerializeBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AbstractSerializeBuffer.java new file mode 100644 index 0000000000..85e90da01b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AbstractSerializeBuffer.java @@ -0,0 +1,105 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import com.datatorrent.netlet.util.Slice; + +public abstract class AbstractSerializeBuffer implements SerializeBuffer, ResetableWindowListener +{ + protected WindowableByteStream windowableByteStream; + + /** + * write value. it could be part of the value + * @param value + */ + @Override + public void write(byte[] value) + { + windowableByteStream.write(value); + } + + /** + * write value. it could be part of the value + * + * @param value + * @param offset + * @param length + */ + @Override + public void write(byte[] value, int offset, int length) + { + windowableByteStream.write(value, offset, length); + } + + + @Override + public void setObjectByValue(byte[] value) + { + setObjectByValue(value, 0, value.length); + } + + public long size() + { + return windowableByteStream.size(); + } + + public long capacity() + { + return windowableByteStream.capacity(); + } + + /** + * This method should be called only the whole object has been written + * @return The slice which represents the object + */ + public Slice toSlice() + { + return windowableByteStream.toSlice(); + } + + + /** + * reset the environment to reuse the resource. + */ + public void reset() + { + windowableByteStream.reset(); + } + + + @Override + public void beginWindow(long windowId) + { + windowableByteStream.beginWindow(windowId); + } + + @Override + public void endWindow() + { + windowableByteStream.endWindow(); + } + + /** + * reset for all windows which window id less or equal input windowId + * this interface doesn't enforce to call reset window for each windows. Several windows can be reset at the same time. + * @param windowId + */ + public void resetUpToWindow(long windowId) + { + windowableByteStream.resetUpToWindow(windowId); + } + + public void release() + { + reset(); + windowableByteStream.release(); + } + + public WindowableByteStream createWindowableByteStream() + { + return new WindowableBlocksStream(); + } + + public WindowableByteStream createWindowableByteStream(int capacity) + { + return new WindowableBlocksStream(capacity); + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java index 199f088d02..ccba040c34 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlocksStream.java @@ -86,7 +86,7 @@ public void write(byte[] data, final int offset, final int length) previousBlock = moveToNextBlock(); if(!currentBlock.isFresh()) { - throw new RuntimeException("New block is not flash."); + throw new RuntimeException("New block is not fresh."); } if(!previousBlock.isClear()) { previousBlock.moveLastObjectDataTo(currentBlock); @@ -107,7 +107,7 @@ protected FixedBlock moveToNextBlock() ++currentBlockIndex; currentBlock = getOrCreateCurrentBlock(); if (!currentBlock.isFresh()) { - throw new RuntimeException("Assigned non flash block."); + throw new RuntimeException("Assigned non fresh block."); } return previousBlock; } @@ -119,7 +119,7 @@ protected FixedBlock getOrCreateCurrentBlock() block = new FixedBlock(blockCapacity); blocks.put(currentBlockIndex, block); if (blocks.size() % 50 == 0) { - logger.info("blocks: {}, size of each block: {}", blocks.size(), blockCapacity); + logger.info("Assigned blocks: {}, size of each block: {}", blocks.size(), blockCapacity); } } return block; diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BytesPrefixBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BytesPrefixBuffer.java new file mode 100644 index 0000000000..e77769a05c --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BytesPrefixBuffer.java @@ -0,0 +1,56 @@ +package org.apache.apex.malhar.lib.utils.serde; + +/** + * Unlike LengthValueBuffer which start with the length, + * This serialize buffer prefix with a fixed byte array. + * This SerializeBuffer can be used for key which prefixed by identifier + * + */ +public class BytesPrefixBuffer extends AbstractSerializeBuffer +{ + protected byte[] prefix; + + public BytesPrefixBuffer() + { + windowableByteStream = createWindowableByteStream(); + } + + public BytesPrefixBuffer(byte[] prefix) + { + setPrefix(prefix); + windowableByteStream = createWindowableByteStream(); + } + + public BytesPrefixBuffer(byte[] prefix, int capacity) + { + windowableByteStream = createWindowableByteStream(capacity); + } + + /** + * set value and length. the input value is value only, it doesn't include + * length information. + * + * @param value + * @param offset + * @param length + */ + @Override + public void setObjectByValue(byte[] value, int offset, int length) + { + if (prefix != null && prefix.length > 0) { + write(prefix); + } + write(value, offset, length); + } + + public byte[] getPrefix() + { + return prefix; + } + + public void setPrefix(byte[] prefix) + { + this.prefix = prefix; + } + +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CompositeSerializer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CompositeSerializer.java new file mode 100644 index 0000000000..c20a726f82 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CompositeSerializer.java @@ -0,0 +1,43 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.netlet.util.Slice; + +public class CompositeSerializer +{ + protected SerToSerializeBuffer serializer1; + protected SerToSerializeBuffer serializer2; + + @NotNull + protected LengthValueBuffer buffer; + + //for Kyro + protected CompositeSerializer() + { + } + + public CompositeSerializer(SerToSerializeBuffer serializer1, SerToSerializeBuffer serializer2, @NotNull LengthValueBuffer buffer) + { + this.serializer1 = serializer1; + this.serializer2 = serializer2; + this.buffer = buffer; + } + + public Slice serialize(T1 value1, T2 value2) + { + serializeFirst(value1); + serializeSecond(value2); + return buffer.toSlice(); + } + + protected void serializeFirst(T1 value1) + { + serializer1.serTo(value1, buffer); + } + + protected void serializeSecond(T2 value2) + { + serializer2.serTo(value2, buffer); + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java index 7a7bdd629e..e67a1ffa52 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LengthValueBuffer.java @@ -32,9 +32,8 @@ * serialize * */ -public class LengthValueBuffer implements ResetableWindowListener +public class LengthValueBuffer extends AbstractSerializeBuffer { - protected WindowableByteStream windowableByteStream; protected Map placeHolderIdentifierToValue = Maps.newHashMap(); public LengthValueBuffer() @@ -60,18 +59,6 @@ public void setObjectLength(int length) } } - /** - * only set value. - * - * @param value - * @param offset - * @param length - */ - public void setObjectValue(byte[] value, int offset, int length) - { - windowableByteStream.write(value, offset, length); - } - /** * set value and length. the input value is value only, it doesn't include * length information. @@ -80,16 +67,13 @@ public void setObjectValue(byte[] value, int offset, int length) * @param offset * @param length */ - public void setObjectWithValue(byte[] value, int offset, int length) + @Override + public void setObjectByValue(byte[] value, int offset, int length) { setObjectLength(length); - setObjectValue(value, offset, length); + write(value, offset, length); } - public void setObjectWithValue(byte[] value) - { - setObjectWithValue(value, 0, value.length); - } /** * mark place hold for length. In some case, we don't know the length until @@ -111,16 +95,6 @@ public int markPlaceHolderForLength() } } - public long size() - { - return windowableByteStream.size(); - } - - public long capacity() - { - return windowableByteStream.capacity(); - } - /** * * @param placeHolderId @@ -155,25 +129,13 @@ public Slice toSlice() /** * reset the environment to reuse the resource. */ + @Override public void reset() { - windowableByteStream.reset(); + super.reset(); placeHolderIdentifierToValue.clear(); } - - @Override - public void beginWindow(long windowId) - { - windowableByteStream.beginWindow(windowId); - } - - @Override - public void endWindow() - { - windowableByteStream.endWindow(); - } - /** * reset for all windows which window id less or equal input windowId * this interface doesn't enforce to call reset window for each windows. Several windows can be reset at the same time. @@ -183,20 +145,4 @@ public void resetUpToWindow(long windowId) { windowableByteStream.resetUpToWindow(windowId); } - - public void release() - { - reset(); - windowableByteStream.release(); - } - - public WindowableByteStream createWindowableByteStream() - { - return new WindowableBlocksStream(); - } - - public WindowableByteStream createWindowableByteStream(int capacity) - { - return new WindowableBlocksStream(capacity); - } } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToSerializeBuffer.java similarity index 50% rename from library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToLVBuffer.java rename to library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToSerializeBuffer.java index 6823bc9d37..020474b618 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToLVBuffer.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerToSerializeBuffer.java @@ -2,9 +2,9 @@ import com.datatorrent.netlet.util.Slice; -public interface SerToLVBuffer extends Serde +public interface SerToSerializeBuffer extends Serde { - void serTo(T object, LengthValueBuffer buffer); + void serTo(T object, SerializeBuffer buffer); void reset(); } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithSerializeBuffer.java similarity index 77% rename from library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithLVBuffer.java rename to library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithSerializeBuffer.java index f7cb8f9df7..4cfb4da986 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithLVBuffer.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeArrayWithSerializeBuffer.java @@ -23,28 +23,28 @@ import com.datatorrent.lib.appdata.gpo.GPOUtils; import com.datatorrent.netlet.util.Slice; -public class SerdeArrayWithLVBuffer implements SerToLVBuffer +public class SerdeArrayWithSerializeBuffer implements SerToSerializeBuffer { protected Class clazz; protected LengthValueBuffer buffer; - protected SerToLVBuffer itemSerde; + protected SerToSerializeBuffer itemSerde; - protected SerdeArrayWithLVBuffer() + protected SerdeArrayWithSerializeBuffer() { } - public SerdeArrayWithLVBuffer(Class clazz) + public SerdeArrayWithSerializeBuffer(Class clazz) { this.clazz = clazz; } - public SerdeArrayWithLVBuffer(Class clazz, LengthValueBuffer buffer) + public SerdeArrayWithSerializeBuffer(Class clazz, LengthValueBuffer buffer) { this.clazz = clazz; this.buffer = buffer; } - public SerdeArrayWithLVBuffer(SerToLVBuffer itemSerde, LengthValueBuffer buffer) + public SerdeArrayWithSerializeBuffer(SerToSerializeBuffer itemSerde, LengthValueBuffer buffer) { this.itemSerde = itemSerde; this.buffer = buffer; @@ -61,27 +61,32 @@ public Slice serialize(T[] objects) } @Override - public void serTo(T[] objects, LengthValueBuffer buffer) + public void serTo(T[] objects, SerializeBuffer buffer) { if (objects.length == 0) { return; } - buffer.setObjectLength(objects.length); - SerToLVBuffer serializer = getItemSerToLVBuffer(); + + //For LengthValueBuffer, need to set the size of the array + if (buffer instanceof LengthValueBuffer) { + ((LengthValueBuffer)buffer).setObjectLength(objects.length); + } + + SerToSerializeBuffer serializer = getItemSerToLVBuffer(); for (T object : objects) { serializer.serTo(object, buffer); } } @SuppressWarnings("unchecked") - protected SerToLVBuffer getItemSerToLVBuffer() + protected SerToSerializeBuffer getItemSerToLVBuffer() { if (itemSerde != null) { return itemSerde; } if (String.class.equals(clazz)) { - itemSerde = (SerToLVBuffer)new SerdeStringWithLVBuffer(); + itemSerde = (SerToSerializeBuffer)new SerdeStringWithSerializeBuffer(); return itemSerde; } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithSerializeBuffer.java similarity index 81% rename from library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithLVBuffer.java rename to library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithSerializeBuffer.java index b85d4283e0..3cd81ec98a 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithLVBuffer.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionWithSerializeBuffer.java @@ -25,29 +25,29 @@ import com.datatorrent.lib.appdata.gpo.GPOUtils; import com.datatorrent.netlet.util.Slice; -public class SerdeCollectionWithLVBuffer> implements SerToLVBuffer +public class SerdeCollectionWithSerializeBuffer> implements SerToSerializeBuffer { protected Class itemClass; protected LengthValueBuffer buffer; - protected SerToLVBuffer itemSerde; + protected SerToSerializeBuffer itemSerde; protected Class collectionClass; - protected SerdeCollectionWithLVBuffer() + protected SerdeCollectionWithSerializeBuffer() { } - public SerdeCollectionWithLVBuffer(Class clazz) + public SerdeCollectionWithSerializeBuffer(Class clazz) { this.itemClass = clazz; } - public SerdeCollectionWithLVBuffer(Class itemClass, LengthValueBuffer buffer) + public SerdeCollectionWithSerializeBuffer(Class itemClass, LengthValueBuffer buffer) { this.itemClass = itemClass; this.buffer = buffer; } - public SerdeCollectionWithLVBuffer(SerToLVBuffer itemSerde, LengthValueBuffer buffer) + public SerdeCollectionWithSerializeBuffer(SerToSerializeBuffer itemSerde, LengthValueBuffer buffer) { this.itemSerde = itemSerde; this.buffer = buffer; @@ -64,27 +64,32 @@ public Slice serialize(C objects) } @Override - public void serTo(C objects, LengthValueBuffer buffer) + public void serTo(C objects, SerializeBuffer buffer) { if (objects.size() == 0) { return; } - buffer.setObjectLength(objects.size()); - SerToLVBuffer serializer = getItemSerToLVBuffer(); + + //For LengthValueBuffer, need to set the size + if (buffer instanceof LengthValueBuffer) { + ((LengthValueBuffer)buffer).setObjectLength(objects.size()); + } + + SerToSerializeBuffer serializer = getItemSerToLVBuffer(); for (T object : objects) { serializer.serTo(object, buffer); } } @SuppressWarnings("unchecked") - protected SerToLVBuffer getItemSerToLVBuffer() + protected SerToSerializeBuffer getItemSerToLVBuffer() { if (itemSerde != null) { return itemSerde; } if (String.class.equals(itemClass)) { - itemSerde = (SerToLVBuffer)new SerdeStringWithLVBuffer(); + itemSerde = (SerToSerializeBuffer)new SerdeStringWithSerializeBuffer(); return itemSerde; } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithSerializeBuffer.java similarity index 74% rename from library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java rename to library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithSerializeBuffer.java index 6649ecc825..e1f4e7921b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithLVBuffer.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceWithSerializeBuffer.java @@ -31,17 +31,17 @@ * * @param The type of serializer for item */ -public class SerdeListSliceWithLVBuffer extends SerdeListSlice implements SerToLVBuffer> +public class SerdeListSliceWithSerializeBuffer extends SerdeListSlice implements SerToSerializeBuffer> { - protected SerToLVBuffer itemSerTo; + protected SerToSerializeBuffer itemSerTo; protected LengthValueBuffer buffer; - private SerdeListSliceWithLVBuffer() + private SerdeListSliceWithSerializeBuffer() { // for Kryo } - public SerdeListSliceWithLVBuffer(@NotNull SerToLVBuffer serde, LengthValueBuffer buffer) + public SerdeListSliceWithSerializeBuffer(@NotNull SerToSerializeBuffer serde, LengthValueBuffer buffer) { super(serde); this.itemSerTo = Preconditions.checkNotNull(serde); @@ -56,9 +56,13 @@ public Slice serialize(List objects) } @Override - public void serTo(List objects, LengthValueBuffer buffer) + public void serTo(List objects, SerializeBuffer buffer) { - buffer.setObjectLength(objects.size()); + //For LengthValueBuffer, need to set the size + if (buffer instanceof LengthValueBuffer) { + ((LengthValueBuffer)buffer).setObjectLength(objects.size()); + } + for (T object : objects) { itemSerTo.serTo(object, buffer);; } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithLVBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithSerializeBuffer.java similarity index 73% rename from library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithLVBuffer.java rename to library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithSerializeBuffer.java index 43dc0f9fa5..aaa3122a3c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithLVBuffer.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringWithSerializeBuffer.java @@ -22,19 +22,19 @@ import com.datatorrent.netlet.util.Slice; -public class SerdeStringWithLVBuffer extends SerdeStringSlice implements SerToLVBuffer +public class SerdeStringWithSerializeBuffer extends SerdeStringSlice implements SerToSerializeBuffer { //implement with shared buff - protected LengthValueBuffer buffer; + protected SerializeBuffer buffer; /** * if don't use SerdeStringWithLVBuffer.serialize(String), can ignore LVBuffer */ - public SerdeStringWithLVBuffer() + public SerdeStringWithSerializeBuffer() { } - public SerdeStringWithLVBuffer(LengthValueBuffer buffer) + public SerdeStringWithSerializeBuffer(SerializeBuffer buffer) { this.buffer = Preconditions.checkNotNull(buffer); } @@ -49,19 +49,10 @@ public Slice serialize(String object) return buffer.toSlice(); } -// implement with tmp buffer -// @Override -// public Slice serialize(String object) -// { -// LVBuffer buffer = new LVBuffer(); -// serTo(object, buffer); -// return buffer.toSlice(); -// } - @Override - public void serTo(String str, LengthValueBuffer buffer) + public void serTo(String str, SerializeBuffer buffer) { - buffer.setObjectWithValue(str.getBytes()); + buffer.setObjectByValue(str.getBytes()); } @Override diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializeBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializeBuffer.java new file mode 100644 index 0000000000..2e19e614cf --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializeBuffer.java @@ -0,0 +1,51 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import com.datatorrent.netlet.util.Slice; + +public interface SerializeBuffer +{ + /** + * write value. it could be part of the value + * @param value + */ + public void write(byte[] value); + + + /** + * write value. it could be part of the value + * + * @param value + * @param offset + * @param length + */ + public void write(byte[] value, int offset, int length); + + + /** + * set value and length. the input value is value only, it doesn't include + * length information. + * + * @param value + * @param offset + * @param length + */ + public void setObjectByValue(byte[] value, int offset, int length); + + + public void setObjectByValue(byte[] value); + + /** + * reset the environment to reuse the resource. + */ + public void reset(); + + public void release(); + + /** + * This method should be called only the whole object has been written + * @return The slice which represents the object + */ + public Slice toSlice(); + + +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerialToLVBufferTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerialToLVBufferTest.java index f2465276c1..4735a2ad27 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerialToLVBufferTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerialToLVBufferTest.java @@ -47,26 +47,26 @@ else if(value < 36) { @Test public void testSerdeString() { - testSerde(testData, new SerdeStringWithLVBuffer(), new StringSerdeVerifier()); + testSerde(testData, new SerdeStringWithSerializeBuffer(), new StringSerdeVerifier()); } @Test public void testSerdeArray() { - testSerde(testData, new SerdeArrayWithLVBuffer(String.class), new StringArraySerdeVerifier()); + testSerde(testData, new SerdeArrayWithSerializeBuffer(String.class), new StringArraySerdeVerifier()); } @Test public void testSerdeCollection() { - SerdeCollectionWithLVBuffer> listSerde = new SerdeCollectionWithLVBuffer>(String.class); + SerdeCollectionWithSerializeBuffer> listSerde = new SerdeCollectionWithSerializeBuffer>(String.class); listSerde.setCollectionClass(ArrayList.class); testSerde(testData, listSerde, new StringListSerdeVerifier()); } - public void testSerde(String[] strs, SerToLVBuffer serde, SerdeVerifier verifier) + public void testSerde(String[] strs, SerToSerializeBuffer serde, SerdeVerifier verifier) { LengthValueBuffer lvBuffer = new LengthValueBuffer(); @@ -86,13 +86,13 @@ public void testSerde(String[] strs, SerToLVBuffer serde, SerdeVerifier { - public void verifySerde(String[] datas, SerToLVBuffer serde, LengthValueBuffer lvBuffer); + public void verifySerde(String[] datas, SerToSerializeBuffer serde, LengthValueBuffer lvBuffer); } public static class StringSerdeVerifier implements SerdeVerifier { @Override - public void verifySerde(String[] datas, SerToLVBuffer serde, LengthValueBuffer lvBuffer) + public void verifySerde(String[] datas, SerToSerializeBuffer serde, LengthValueBuffer lvBuffer) { for (String str : datas) { Slice slice = serde.serialize(str); @@ -108,7 +108,7 @@ public void verifySerde(String[] datas, SerToLVBuffer serde, LengthValue public static class StringArraySerdeVerifier implements SerdeVerifier { @Override - public void verifySerde(String[] datas, SerToLVBuffer serde, LengthValueBuffer lvBuffer) + public void verifySerde(String[] datas, SerToSerializeBuffer serde, LengthValueBuffer lvBuffer) { Slice slice = serde.serialize(datas); String[] newStrs = serde.deserialize(slice); @@ -122,7 +122,7 @@ public void verifySerde(String[] datas, SerToLVBuffer serde, LengthVal public static class StringListSerdeVerifier implements SerdeVerifier> { @Override - public void verifySerde(String[] datas, SerToLVBuffer> serdeList, LengthValueBuffer lvBuffer) + public void verifySerde(String[] datas, SerToSerializeBuffer> serdeList, LengthValueBuffer lvBuffer) { List list = Arrays.asList(datas);