Skip to content

Commit

Permalink
[FLINK-1371] [runtime] Fix KryoSerializer to not swallow EOFExceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jan 8, 2015
1 parent d9cb5b7 commit 19066b5
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 261 deletions.
Expand Up @@ -30,6 +30,7 @@


import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;


public class KryoSerializer<T> extends TypeSerializer<T> { public class KryoSerializer<T> extends TypeSerializer<T> {
Expand Down Expand Up @@ -114,8 +115,19 @@ public void serialize(T record, DataOutputView target) throws IOException {
previousOut = target; previousOut = target;
} }


kryo.writeClassAndObject(output, record); try {
output.flush(); kryo.writeClassAndObject(output, record);
output.flush();
}
catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
}
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand Down Expand Up @@ -173,4 +185,13 @@ private void checkKryoInitialized() {
this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
} }
} }

// --------------------------------------------------------------------------------------------
// for testing
// --------------------------------------------------------------------------------------------

Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
} }
Expand Up @@ -43,6 +43,7 @@ public WritableSerializer(Class<T> typeClass) {
this.typeClass = typeClass; this.typeClass = typeClass;
} }


@SuppressWarnings("unchecked")
@Override @Override
public T createInstance() { public T createInstance() {
if(typeClass == NullWritable.class) { if(typeClass == NullWritable.class) {
Expand Down
Expand Up @@ -18,17 +18,20 @@


package org.apache.flink.api.java.typeutils.runtime; package org.apache.flink.api.java.typeutils.runtime;


import static org.junit.Assert.*;


import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.junit.Test; import org.junit.Test;
import org.apache.flink.api.common.typeutils.TypeSerializer;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Random;


@SuppressWarnings("unchecked")
public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {

@Test @Test
public void testJavaList(){ public void testJavaList(){
Collection<Integer> a = new ArrayList<Integer>(); Collection<Integer> a = new ArrayList<Integer>();
Expand Down Expand Up @@ -67,4 +70,44 @@ private void fillCollection(Collection<Integer> coll){
protected <T> TypeSerializer<T> createSerializer(Class<T> type) { protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
return new KryoSerializer<T>(type); return new KryoSerializer<T>(type);
} }

/**
* Make sure that the kryo serializer forwards EOF exceptions properly
*/
@Test
public void testForwardEOFException() {
try {
// construct a long string
String str;
{
char[] charData = new char[40000];
Random rnd = new Random();

for (int i = 0; i < charData.length; i++) {
charData[i] = (char) rnd.nextInt(10000);
}

str = new String(charData);
}

// construct a memory target that is too small for the string
TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000);
KryoSerializer<String> serializer = new KryoSerializer<String>(String.class);

try {
serializer.serialize(str, target);
fail("should throw a java.io.EOFException");
}
catch (java.io.EOFException e) {
// that is how we like it
}
catch (Exception e) {
fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
} }

0 comments on commit 19066b5

Please sign in to comment.