Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-746][CORE] Added Avro Serialization to Kryo #7004

Closed
wants to merge 13 commits into from
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<name>Spark Project Core</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<classifier>${avro.mapred.classifier}</classifier>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
18 changes: 17 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet

import org.apache.avro.Schema

import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -160,6 +162,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
set("spark.serializer", classOf[KryoSerializer].getName)
this
}
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a newline

* Use Kryo serialization and register the given set of Avro schemas so that the generic
* record serializer can decrease network IO
*/
def registerAvroSchemas(schemas: Schema*): SparkConf = schemas.foldLeft(this) { (conf, schema) =>
conf.set(avroSchemaKey(schema), schema.toString)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you start the body of the method on the next line, and wrap the entire method w/ braces?


/** Gets all the avro schemas in the configuration used in the generic Avro record serializer */
def getAvroSchema: Map[Long, String] =
getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) }
.map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) }
.toMap

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this method should be wrapped in braces since its multiline, (even though its one statement)

also, delete extra newline


/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.serializer

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer

import org.apache.commons.io.IOUtils
import org.apache.spark.io.CompressionCodec
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}

import scala.collection.mutable

import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._

/**
*
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete the empty comment (you could add a doc comment here, but doesn't seem necessary)

private[spark] object GenericAvroSerializer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be private[serializer]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since those constants were only be used in SparkConf, I just moved the entire object there.

final val avroSchemaNamespace = "avro.schema."
def avroSchemaKey(schema: Schema): String =
avroSchemaNamespace + SchemaNormalization.parsingFingerprint64(schema)
}

/**
* Custom serializer used for generic Avro records. If the user registers the schemas
* ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
* schema, as to reduce network IO.
* Actions like parsing or compressing schemas are computationally expensive so the serializer
* caches all previously seen values as to reduce the amount of work needed to do.
*/
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here: what are the types in schemas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, but I have also updated the comments.

extends KSerializer[GenericRecord] {

/** Used to reduce the amount of effort to compress the schema */
private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()

/** Reuses the same datum reader/writer since the same schema will be used many times */
private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()

/** Fingerprinting is very expensive so this alleviates most of the work */
private val fingerprintCache = new mutable.HashMap[Schema, Long]()
private val schemaCache = new mutable.HashMap[Long, Schema]()

/**
* Used to compress Schemas when they are being sent over the wire.
* The compression results are memoized to reduce the compression time since the
* same schema is compressed many times over
*/
def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, {
val bos = new ByteArrayOutputStream()
val out = new SnappyOutputStream(bos)
out.write(schema.toString.getBytes("UTF-8"))
out.close()
bos.toByteArray
})


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delete extra newline

/**
* Decompresses the schema into the actual in-memory object. Keeps an internal cache of already
* seen values so to limit the number of times that decompression has to be done.
*/
def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, {
val bis = new ByteArrayInputStream(schemaBytes.array())
val bytes = IOUtils.toByteArray(new SnappyInputStream(bis))
new Schema.Parser().parse(new String(bytes, "UTF-8"))
})

/**
* Serializes a record to the given output stream. It caches a lot of the internal data as
* to not redo work
*/
def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = {
val encoder = EncoderFactory.get.binaryEncoder(output, null)
val schema = datum.getSchema
val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
SchemaNormalization.parsingFingerprint64(schema)
})
schemas.get(fingerprint) match {
case Some(_) => {
output.writeBoolean(true)
output.writeLong(fingerprint)
}
case None => {
output.writeBoolean(false)
val compressedSchema = compress(schema)
output.writeInt(compressedSchema.length)
output.writeBytes(compressedSchema)

}
}

writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
.asInstanceOf[DatumWriter[R]]
.write(datum, encoder)
encoder.flush()
}

/**
* Deserializes generic records into their in-memory form. There is internal
* state to keep a cache of already seen schemas and datum readers.
*/
def deserializeDatum(input: KryoInput): GenericRecord = {
val schema = {
if (input.readBoolean()) {
val fingerprint = input.readLong()
schemaCache.getOrElseUpdate(fingerprint, {
schemas.get(fingerprint) match {
case Some(s) => new Schema.Parser().parse(s)
case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it'll be apparent in the stack trace, but what do you think about making the message was just a bit more clear, eg. "Error reading attempting to read avro data -- encountered an unknown fingerprint: $fingerprint, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context."

(I think that explanation is plausible anyway ...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why RuntimeException as opposed to a more specific exception (or just Exception)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it to throw a SparkException

}
})
} else {
val length = input.readInt()
decompress(ByteBuffer.wrap(input.readBytes(length)))
}
}
val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
.asInstanceOf[DatumReader[GenericRecord]]
.read(null, decoder)
}

override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
serializeDatum(datum, output)

override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord =
deserializeDatum(input)
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import org.apache.avro.generic.{GenericData, GenericRecord}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: order


import scala.reflect.ClassTag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import order, should be below scala imports


import com.esotericsoftware.kryo.{Kryo, KryoException}
Expand All @@ -32,6 +34,7 @@ import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, Roaring
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
Expand Down Expand Up @@ -73,6 +76,8 @@ class KryoSerializer(conf: SparkConf)
.split(',')
.filter(!_.isEmpty)

private val avroSchemas = conf.getAvroSchema

def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))

def newKryo(): Kryo = {
Expand All @@ -99,6 +104,9 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))

try {
// Use the default classloader when calling the user registrator.
Thread.currentThread.setContextClassLoader(classLoader)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.serializer

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}
import java.nio.ByteBuffer

import com.esotericsoftware.kryo.io.{Output, Input}
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.{SchemaBuilder, Schema}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.{SparkFunSuite, SharedSparkContext}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line between other imports and spark imports
and the ordering should have class imports ahead of package imports, not just alphabetic, so:

import com.esotericsoftware.kryo.io.{Output, Input}
import org.apache.avro.{SchemaBuilder, Schema}
import org.apache.avro.generic.GenericData.Record

import org.apache.spark.{SparkFunSuite, SharedSparkContext}
import org.apache.spark.io.CompressionCodec


class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val schema : Schema = SchemaBuilder
.record("testRecord").fields()
.requiredString("data")
.endRecord()
val record = new Record(schema)
record.put("data", "test data")

test("schema compression and decompression") {
val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
}

test("record serialization and deserialization") {
val genericSer = new GenericAvroSerializer(conf.getAvroSchema)

val outputStream = new ByteArrayOutputStream()
val output = new Output(outputStream)
genericSer.serializeDatum(record, output)
output.flush()
output.close()

val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
assert(genericSer.deserializeDatum(input) === record)
}

test("uses schema fingerprint to decrease message size") {
val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)

val output = new Output(new ByteArrayOutputStream())

val beginningNormalPosition = output.total()
genericSerFull.serializeDatum(record, output)
output.flush()
val normalLength = output.total - beginningNormalPosition

conf.registerAvroSchemas(schema)
val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
val beginningFingerprintPosition = output.total()
genericSerFinger.serializeDatum(record, output)
val fingerprintLength = output.total - beginningFingerprintPosition

assert(fingerprintLength < normalLength)
}

test("caches previously seen schemas") {
val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
val compressedSchema = genericSer.compress(schema)
val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema))

assert(compressedSchema.eq(genericSer.compress(schema)))
assert(decompressedScheam.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
}
}