From 1f770d400ca9e5506199783551c0780a1c46f77e Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Thu, 8 Sep 2016 11:21:41 +0300 Subject: [PATCH] Changes in AvroCoder serialization so it can serialize in Kryo --- sdks/java/core/pom.xml | 7 + .../org/apache/beam/sdk/coders/AvroCoder.java | 126 ++++++++++-------- .../apache/beam/sdk/coders/AvroCoderTest.java | 33 +++++ 3 files changed, 112 insertions(+), 54 deletions(-) diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index aa0ad09ce59b..e189758371ce 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -578,5 +578,12 @@ google-cloud-dataflow-java-proto-library-all test + + + com.esotericsoftware.kryo + kryo + 2.21 + test + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 7894d14ccd1d..4f0239e6a512 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.Serializable; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; @@ -164,7 +163,9 @@ public Coder getCoder(TypeDescriptor typeDescriptor) { }; private final Class type; - private final transient Schema schema; + private transient Schema schema; + + private final String schemaStr; private final List nonDeterministicReasons; @@ -174,36 +175,16 @@ public Coder getCoder(TypeDescriptor typeDescriptor) { // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe, // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use // an inner coder. - private final transient ThreadLocal decoder; - private final transient ThreadLocal encoder; - private final transient ThreadLocal> writer; - private final transient ThreadLocal> reader; + private transient ThreadLocal memoizedDecoder; + private transient ThreadLocal memoizedEncoder; + private transient ThreadLocal> memoizedWriter; + private transient ThreadLocal> memoizedReader; protected AvroCoder(Class type, Schema schema) { this.type = type; this.schema = schema; - + this.schemaStr = schema.toString(); nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema); - - // Decoder and Encoder start off null for each thread. They are allocated and potentially - // reused inside encode/decode. - this.decoder = new ThreadLocal<>(); - this.encoder = new ThreadLocal<>(); - - // Reader and writer are allocated once per thread and are "final" for thread-local Coder - // instance. - this.reader = new ThreadLocal>() { - @Override - public DatumReader initialValue() { - return createDatumReader(); - } - }; - this.writer = new ThreadLocal>() { - @Override - public DatumWriter initialValue() { - return createDatumWriter(); - } - }; } /** @@ -246,33 +227,29 @@ public Class getType() { return type; } - private Object writeReplace() { - // When serialized by Java, instances of AvroCoder should be replaced by - // a SerializedAvroCoderProxy. - return new SerializedAvroCoderProxy<>(type, schema.toString()); - } - @Override public void encode(T value, OutputStream outStream, Context context) throws IOException { // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. + ThreadLocal encoder = getEncoder(); BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); // Save the potentially-new instance for reuse later. encoder.set(encoderInstance); - writer.get().write(value, encoderInstance); + getWriter().get().write(value, encoderInstance); // Direct binary encoder does not buffer any data and need not be flushed. } @Override public T decode(InputStream inStream, Context context) throws IOException { // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it. + ThreadLocal decoder = getDecoder(); BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); // Save the potentially-new instance for later. decoder.set(decoderInstance); - return reader.get().read(null, decoderInstance); + return getReader().get().read(null, decoderInstance); } @Override - public List> getCoderArguments() { + public List> getCoderArguments() { return null; } @@ -280,7 +257,7 @@ public List> getCoderArguments() { public CloudObject asCloudObject() { CloudObject result = super.asCloudObject(); addString(result, "type", type.getName()); - addString(result, "schema", schema.toString()); + addString(result, "schema", getSchema().toString()); return result; } @@ -306,9 +283,9 @@ public void verifyDeterministic() throws NonDeterministicException { @Deprecated public DatumReader createDatumReader() { if (type.equals(GenericRecord.class)) { - return new GenericDatumReader<>(schema); + return new GenericDatumReader<>(getSchema()); } else { - return new ReflectDatumReader<>(schema); + return new ReflectDatumReader<>(getSchema()); } } @@ -321,9 +298,9 @@ public DatumReader createDatumReader() { @Deprecated public DatumWriter createDatumWriter() { if (type.equals(GenericRecord.class)) { - return new GenericDatumWriter<>(schema); + return new GenericDatumWriter<>(getSchema()); } else { - return new ReflectDatumWriter<>(schema); + return new ReflectDatumWriter<>(getSchema()); } } @@ -331,28 +308,69 @@ public DatumWriter createDatumWriter() { * Returns the schema used by this coder. */ public Schema getSchema() { - return schema; + return getMemoizedSchema(); + } + + /** + * Get the memoized {@link BinaryDecoder}, possibly initializing it lazily. + */ + private ThreadLocal getDecoder() { + if (memoizedDecoder == null) { + memoizedDecoder = new ThreadLocal<>(); + } + return memoizedDecoder; + } + + /** + * Get the memoized {@link BinaryEncoder}, possibly initializing it lazily. + */ + private ThreadLocal getEncoder() { + if (memoizedEncoder == null) { + memoizedEncoder = new ThreadLocal<>(); + } + return memoizedEncoder; } /** - * Proxy to use in place of serializing the {@link AvroCoder}. This allows the fields - * to remain final. + * Get the memoized {@link DatumReader}, possibly initializing it lazily. */ - private static class SerializedAvroCoderProxy implements Serializable { - private final Class type; - private final String schemaStr; + private ThreadLocal> getReader() { + if (memoizedReader == null) { + memoizedReader = new ThreadLocal>() { + @Override + public DatumReader initialValue() { + return createDatumReader(); + } + }; + } + return memoizedReader; + } - public SerializedAvroCoderProxy(Class type, String schemaStr) { - this.type = type; - this.schemaStr = schemaStr; + /** + * Get the memoized {@link DatumWriter}, possibly initializing it lazily. + */ + private ThreadLocal> getWriter() { + if (memoizedWriter == null) { + memoizedWriter = new ThreadLocal>() { + @Override + public DatumWriter initialValue() { + return createDatumWriter(); + } + }; } + return memoizedWriter; + } - private Object readResolve() { - // When deserialized, instances of this object should be replaced by - // constructing an AvroCoder. + /** + * Get the {@link Schema}, possibly initializing it lazily by parsing {@link + * AvroCoder#schemaStr}. + */ + private Schema getMemoizedSchema() { + if (schema == null) { Schema.Parser parser = new Schema.Parser(); - return new AvroCoder(type, parser.parse(schemaStr)); + schema = parser.parse(schemaStr); } + return schema; } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index a97acfb5bcd4..04096fb0539b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -39,6 +39,10 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -172,6 +176,35 @@ public void testTransientFieldInitialization() throws Exception { CoderProperties.coderDecodeEncodeEqual(copied, value); } + /** + * Confirm that we can serialize and deserialize an AvroCoder object using Kryo. + * (BEAM-626). + * + * @throws Exception + */ + @Test + public void testKryoSerialization() throws Exception { + Pojo value = new Pojo("Hello", 42); + AvroCoder coder = AvroCoder.of(Pojo.class); + + //Kryo instantiation + Kryo kryo = new Kryo(); + kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy()); + + //Serialization of object + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + kryo.writeObject(output, coder); + output.close(); + + //De-serialization of object + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + Input input = new Input(bis); + AvroCoder copied = (AvroCoder) kryo.readObject(input, AvroCoder.class); + + CoderProperties.coderDecodeEncodeEqual(copied, value); + } + @Test public void testPojoEncoding() throws Exception { Pojo value = new Pojo("Hello", 42);