From 8e3a0c0b09ff2b95d7792e69b573d85a29515cb3 Mon Sep 17 00:00:00 2001 From: brightchen Date: Mon, 31 Oct 2016 16:50:07 -0700 Subject: [PATCH] APEXMALHAR-2320 #resolve #comment use SerializationBuffer implement FSWindowDataManager.toSlice() --- .../malhar/lib/wal/FSWindowDataManager.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java index 6e8774e61e..4db5e17ddd 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java @@ -18,7 +18,6 @@ */ package org.apache.apex.malhar.lib.wal; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -38,6 +37,8 @@ import org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager; import org.apache.apex.malhar.lib.utils.FileContextUtils; +import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; +import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -45,7 +46,6 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -149,6 +149,8 @@ public class FSWindowDataManager implements WindowDataManager private transient FileContext fileContext; + private transient SerializationBuffer serializationBuffer; + public FSWindowDataManager() { kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); @@ -157,6 +159,7 @@ public FSWindowDataManager() @Override public void setup(Context.OperatorContext context) { + serializationBuffer = new SerializationBuffer(new WindowedBlockStream()); operatorId = context.getId(); if (isStatePathRelativeToAppPath) { @@ -416,7 +419,17 @@ public void save(Object object, long windowId) throws IOException byte[] windowIdBytes = Longs.toByteArray(windowId); writer.append(new Slice(windowIdBytes)); + + /** + * writer.append() will copy the data to the file output stream. + * So the data in the buffer is not needed any more, and it is safe to reset the serializationBuffer. + * + * And as the data in stream memory can be cleaned all at once. So don't need to separate data by different windows, + * so beginWindow() and endWindow() don't need to be called + */ writer.append(toSlice(object)); + serializationBuffer.reset(); + wal.beforeCheckpoint(windowId); wal.windowWalParts.put(windowId, writer.getCurrentPointer().getPartNum()); writer.rotateIfNecessary(); @@ -594,13 +607,8 @@ public void committed(long committedWindowId) throws IOException private Slice toSlice(Object object) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output output = new Output(baos); - kryo.writeClassAndObject(output, object); - output.close(); - byte[] bytes = baos.toByteArray(); - - return new Slice(bytes); + kryo.writeClassAndObject(serializationBuffer, object); + return serializationBuffer.toSlice(); } protected Object fromSlice(Slice slice)