From c210dc212a8ac2ecd3d83ce21e72960f35c2e9db Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Tue, 16 Aug 2016 18:46:14 -0700 Subject: [PATCH] APEXCORE-502 Unnecessary byte array copy in DefaultKryoStreamCodec.toByteArray --- .../plan/logical/DefaultKryoStreamCodec.java | 37 ++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java index 578a9d70ff..8d0532b9ff 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java @@ -18,17 +18,12 @@ */ package com.datatorrent.stram.plan.logical; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import com.google.common.base.Throwables; import com.datatorrent.api.StreamCodec; import com.datatorrent.common.util.SerializableObject; @@ -56,37 +51,29 @@ public DefaultKryoStreamCodec() @Override public Object fromByteArray(Slice fragment) { + final Input input = new Input(fragment.buffer, fragment.offset, fragment.length); try { - ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length); - Input input = new Input(is); - Object returnObject = kryo.readClassAndObject(input); - is.close(); - return returnObject; - } catch (IOException e) { - throw Throwables.propagate(e); + return kryo.readClassAndObject(input); + } finally { + input.close(); } } @Override - public Slice toByteArray(T info) + public Slice toByteArray(T o) { - Slice slice = null; + final Output output = new Output(32, -1); try { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - Output output = new Output(os); - kryo.writeClassAndObject(output, info); - output.flush(); - slice = new Slice(os.toByteArray(), 0, os.toByteArray().length); - os.close(); - } catch (IOException e) { - throw Throwables.propagate(e); + kryo.writeClassAndObject(output, o); + } finally { + output.close(); } - return slice; + return new Slice(output.getBuffer(), 0, output.position()); } @Override - public int getPartition(T t) + public int getPartition(T o) { - return t.hashCode(); + return o.hashCode(); } }