Skip to content
This repository

Surrender control of input and output streams. #14

Merged
merged 2 commits into from almost 2 years ago

2 participants

Sam Ritchie P. Oscar Boykin
Sam Ritchie
Owner

Kryo relinquishes control! Hadoop for the win!

src/jvm/cascading/kryo/KryoSerializer.java
@@ -20,13 +21,28 @@ public KryoSerializer(KryoSerialization kryoSerialization) {
20 21
 
21 22
     public void open(OutputStream out) throws IOException {
22 23
         kryo = kryoSerialization.populatedKryo();
23  
-        output = new Output(KryoSerialization.OUTPUT_BUFFER_SIZE, KryoSerialization.MAX_OUTPUT_BUFFER_SIZE);
24  
-        output.setOutputStream(out);
  24
+
  25
+        if( out instanceof DataOutputStream)
  26
+            outputStream = (DataOutputStream) out;
  27
+        else
  28
+            outputStream = new DataOutputStream(out);
  29
+    }
  30
+
  31
+    // This is to prevent output from maintaining a giant internal buffer.
  32
+    public void tidyBuffer() {
  33
+        if (output.position() > KryoSerialization.SWITCH_LIMIT)
1
P. Oscar Boykin Collaborator
johnynek added a note July 19, 2012

This should be <.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
P. Oscar Boykin johnynek merged commit 735ab56 into from July 19, 2012
P. Oscar Boykin johnynek closed this July 19, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
46  src/jvm/cascading/kryo/KryoDeserializer.java
@@ -4,16 +4,17 @@
4 4
 import com.esotericsoftware.kryo.io.Input;
5 5
 import org.apache.hadoop.io.serializer.Deserializer;
6 6
 
  7
+import java.io.DataInputStream;
7 8
 import java.io.IOException;
8 9
 import java.io.InputStream;
9 10
 
10  
-/** User: sritchie Date: 12/1/11 Time: 3:15 PM */
11 11
 public class KryoDeserializer implements Deserializer<Object> {
12 12
 
13 13
     private Kryo kryo;
14 14
     private final KryoSerialization kryoSerialization;
15 15
     private final Class<Object> klass;
16  
-    private Input input;
  16
+
  17
+    private DataInputStream inputStream;
17 18
 
18 19
     public KryoDeserializer(KryoSerialization kryoSerialization, Class<Object> klass) {
19 20
         this.kryoSerialization =  kryoSerialization;
@@ -21,47 +22,28 @@ public KryoDeserializer(KryoSerialization kryoSerialization, Class<Object> klass
21 22
     }
22 23
 
23 24
     public void open(InputStream in) throws IOException {
24  
-        if (!in.markSupported())
25  
-            throw new IOException("InputStream must support `mark`.");
26  
-
27 25
         kryo = kryoSerialization.populatedKryo();
28  
-        input = new Input(in);
29  
-    }
30 26
 
31  
-    // As of version 2.16, Kryo overfills its internal buffer by slurping with wild
32  
-    // abandon from its wrapped input stream. The following method compensates for this
33  
-    // by resetting Input's position and limit after each slurp and moving the wrapped inputstream's
34  
-    // pointer forward manually.
  27
+        if( in instanceof DataInputStream)
  28
+            inputStream = (DataInputStream) in;
  29
+        else
  30
+            inputStream = new DataInputStream( in );
  31
+    }
35 32
 
36 33
     public Object deserialize(Object o) throws IOException {
37  
-        // Marking phase.
38  
-        InputStream inStream = input.getInputStream();
39  
-        inStream.mark(KryoSerialization.MAX_OUTPUT_BUFFER_SIZE);
40  
-
41  
-        // Resetting input's limit and position forces input to continue reading from its wrapped
42  
-        // stream at the marked position.
43  
-        input.setPosition(0);
44  
-        input.setLimit(0);
45  
-
46  
-        // Reading phase.
47  
-        Object obj = kryo.readObject(input, klass);
48  
-
49  
-        // Skipping phase.
50  
-        inStream.reset();
51  
-        int needToSkip = input.position();
52  
-        while(needToSkip > 0)
53  
-            needToSkip -= inStream.skip(needToSkip);
  34
+        byte[] bytes = new byte[inputStream.readInt()];
  35
+        inputStream.readFully( bytes );
54 36
 
55  
-        return obj;
  37
+        return kryo.readObject(new Input(bytes), klass);
56 38
     }
57 39
 
58 40
     // TODO: Bump the kryo version, add a kryo.reset();
59 41
     public void close() throws IOException {
60 42
         try {
61  
-            if( input != null )
62  
-                input.close();
  43
+            if( inputStream != null )
  44
+                inputStream.close();
63 45
         } finally {
64  
-            input = null;
  46
+            inputStream = null;
65 47
         }
66 48
     }
67 49
 }
1  src/jvm/cascading/kryo/KryoFactory.java
@@ -8,7 +8,6 @@
8 8
 import java.util.ArrayList;
9 9
 import java.util.List;
10 10
 
11  
-/** User: sritchie Date: 12/1/11 Time: 3:18 PM */
12 11
 public class KryoFactory {
13 12
     final Logger LOG = Logger.getLogger(KryoFactory.class);
14 13
     final Configuration conf;
6  src/jvm/cascading/kryo/KryoSerialization.java
@@ -8,11 +8,15 @@
8 8
 import org.apache.hadoop.io.serializer.Serializer;
9 9
 import org.objenesis.strategy.StdInstantiatorStrategy;
10 10
 
11  
-/** User: sritchie Date: 12/1/11 Time: 11:43 AM */
12 11
 public class KryoSerialization extends Configured implements Serialization<Object> {
13 12
     public static final int OUTPUT_BUFFER_SIZE = 1<<12;
14 13
     public static final int MAX_OUTPUT_BUFFER_SIZE = 1<<24;
15 14
 
  15
+    public static final int TIDY_FACTOR = 1<<4;
  16
+
  17
+    public static final int SWITCH_LIMIT = Math.max(
  18
+        KryoSerialization.MAX_OUTPUT_BUFFER_SIZE, KryoSerialization.MAX_OUTPUT_BUFFER_SIZE / TIDY_FACTOR);
  19
+
16 20
     Kryo kryo;
17 21
     KryoFactory factory;
18 22
 
42  src/jvm/cascading/kryo/KryoSerializer.java
@@ -4,15 +4,17 @@
4 4
 import com.esotericsoftware.kryo.io.Output;
5 5
 import org.apache.hadoop.io.serializer.Serializer;
6 6
 
  7
+import java.io.DataOutputStream;
7 8
 import java.io.IOException;
8 9
 import java.io.OutputStream;
9 10
 
10  
-/** User: sritchie Date: 12/1/11 Time: 11:57 AM */
11  
-@SuppressWarnings("FieldCanBeLocal")
12 11
 public class KryoSerializer implements Serializer<Object> {
13 12
     private Kryo kryo;
14 13
     private final KryoSerialization kryoSerialization;
15  
-    private Output output;
  14
+    private final Output output = new Output(
  15
+        KryoSerialization.OUTPUT_BUFFER_SIZE, KryoSerialization.MAX_OUTPUT_BUFFER_SIZE);
  16
+    private DataOutputStream outputStream;
  17
+    private int prevPosition;
16 18
 
17 19
     public KryoSerializer(KryoSerialization kryoSerialization) {
18 20
         this.kryoSerialization =  kryoSerialization;
@@ -20,13 +22,35 @@ public KryoSerializer(KryoSerialization kryoSerialization) {
20 22
 
21 23
     public void open(OutputStream out) throws IOException {
22 24
         kryo = kryoSerialization.populatedKryo();
23  
-        output = new Output(KryoSerialization.OUTPUT_BUFFER_SIZE, KryoSerialization.MAX_OUTPUT_BUFFER_SIZE);
24  
-        output.setOutputStream(out);
  25
+
  26
+        if( out instanceof DataOutputStream)
  27
+            outputStream = (DataOutputStream) out;
  28
+        else
  29
+            outputStream = new DataOutputStream(out);
  30
+    }
  31
+
  32
+    // We tidy prevent output from maintaining a giant internal buffer.
  33
+    public void tidyBuffer() {
  34
+        int currentPosition = output.position();
  35
+
  36
+        // If the previous serialized object was large (greater than the switch size) and the current
  37
+        // object falls below the switch size, reset the buffer to be small again. If both objects
  38
+        // are small (or large), no reallocation occurs.
  39
+        if (prevPosition > KryoSerialization.SWITCH_LIMIT && currentPosition <= KryoSerialization.SWITCH_LIMIT)
  40
+            output.setBuffer(new byte[KryoSerialization.OUTPUT_BUFFER_SIZE], KryoSerialization.MAX_OUTPUT_BUFFER_SIZE);
  41
+
  42
+        prevPosition = currentPosition;
  43
+        output.clear();
25 44
     }
26 45
 
27 46
     public void serialize(Object o) throws IOException {
28 47
         kryo.writeObject(output, o);
29  
-        output.flush();
  48
+        byte[] bytes = output.toBytes();
  49
+
  50
+        outputStream.writeInt(bytes.length);
  51
+        outputStream.write(bytes);
  52
+
  53
+        tidyBuffer();
30 54
     }
31 55
 
32 56
     // TODO: Bump the kryo version, add a kryo.reset();
@@ -34,11 +58,11 @@ public void close() throws IOException {
34 58
         kryo = null;
35 59
 
36 60
         try {
37  
-            if( output != null ) {
38  
-                output.close();
  61
+            if( outputStream != null ) {
  62
+                outputStream.close();
39 63
             }
40 64
         } finally {
41  
-            output = null;
  65
+            outputStream = null;
42 66
         }
43 67
     }
44 68
 }
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.