Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-746][CORE] Added Avro Serialization to Kryo #7004

Closed
wants to merge 13 commits into from
35 changes: 35 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,41 @@
<artifactId>py4j</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>${hadoop.deps.scope}</scope>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this included already?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avro and avro-mapred are already in the dependencyMangement of the parent POM, so nothing more than groupId, artifactId and perhaps scope should be included here. In particular, version should not generally be specified when it is already in the dependencyManagement. This is a frequent error in the Spark POMs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Mark's comments.

</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>${avro.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to the previous one, this one should only contain groupId / artifactId (everything else is inherited).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one thing you have to add here, though, is:

  <classifier>${avro.mapred.classifier}</classifier>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out, just fixed that.

<classifier>${avro.mapred.classifier}</classifier>
<scope>${hive.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ package org.apache.spark
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.avro.{Schema, SchemaNormalization}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import ordering (below scala.*, above org.apache.spark.*)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet

import org.apache.spark.serializer.GenericAvroSerializer.avroSchemaKey
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -161,6 +164,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a newline

* Use Kryo serialization and register the given set of Avro schemas so that the generic
* record serializer can decrease network IO
*/
def registerAvroSchema(schemas: Array[Schema]): SparkConf =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using varargs here? def registerAvroSchema(schemas: Schema*): SparkConf

then you could call using any of:

registerAvroSchema(schema)
registerAvroSchema(schemaOne, schemaTwo, schemaThree)
registerAvroSchema(schemaSeq: _*) // works w/ an array too

and maybe rename the method to plural, registerAvroSchemas

schemas.foldLeft(this) { (conf, schema) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a fold here? This seems confusing. Why not just loop over the schemas and call conf.set() for each?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, changed.

conf.set(avroSchemaKey(SchemaNormalization.parsingFingerprint64(schema)), schema.toString)
}

/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
settings.remove(key)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.serializer

import java.io.ByteArrayOutputStream
import java.util.zip.{Inflater, Deflater}

import scala.collection.mutable

import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._
import org.apache.avro.{Schema, SchemaNormalization}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: class imports go before more deeply nested packages (its not just alphabetic), so it should be:

import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._

I recommend using the plugin (not IntelliJ's builtin ordering) as described on the wiki: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports


import org.apache.spark.SparkConf

import GenericAvroSerializer._

object GenericAvroSerializer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[serializer]

def avroSchemaKey(implicit fingerprint: Long): String = s"avro.schema.$fingerprint"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason to make this implicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

}

/**
* Custom serializer used for generic Avro records. If the user registers the schemas
* ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
* schema, as to reduce network IO.
* Actions like parsing or compressing schemas are computationally expensive so the serializer
* caches all previously seen values as to reduce the amount of work needed to do.
*/
class GenericAvroSerializer(conf: SparkConf) extends KSerializer[GenericRecord] {

private val serializer = serialize()
private val deserializer = deserialize()

private def confSchema(implicit fingerprint: Long) = conf.getOption(avroSchemaKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also no reason to make this implicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


/**
* Used to compress Schemas when they are being sent over the wire.
* The compression results are memoized to reduce the compression time since the
* same schema is compressed many times over
*/
def compressor(): Schema => Array[Byte] = {
val cache = new mutable.HashMap[Schema, Array[Byte]]()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the value in having methods that return functions -- serialize() is only called once, which in turn will just call compressor() once. (Or am I overlooking something?)

How about just move this cache to the top level as compressorCache, and then move all the inner functions to the top level also? It would also make it easier to test each function. If you like, you could have separate helpers for the serialization & deserialization to keep things isolated (though imo just separate methods is fine).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These function closures like serialize() and compressor() acts like factories to initialize the internal cache and to populate their dependencies. The goal of this was to hide away all of the construction and to decrease the top level namespace usage. I can change this if you think it would be better, that was just my original goal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I find this a little confusing. My initial thoughts on reading this was that compressor() must be called multiple times, so that each one had its own cache -- and then I was trying to track down where you'd reuse the cache and where you'd use a new one. But actually one GenericAvroSerializer only has one compressCache, one writerCache, one decompressCache, etc.

I understand wanting to limit the scoping of things, so I think you could separate out the decompress & compress side into little helper classes if you like. Though honestly I don't think there aren't that many variables and functions, so IMO they could all be members of GenericAvroSerializer (with most of them private).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I will make them all private members of GenericAvroSerializer.

def compress(schema: Schema): Array[Byte] = cache.getOrElseUpdate(schema, {
val deflater = new Deflater(Deflater.BEST_COMPRESSION)
val schemaBytes = schema.toString.getBytes("UTF-8")
deflater.setInput(schemaBytes)
deflater.finish()
val buffer = Array.ofDim[Byte](schemaBytes.length)
val outputStream = new ByteArrayOutputStream(schemaBytes.length)
while(!deflater.finished()) {
val count = deflater.deflate(buffer)
outputStream.write(buffer, 0, count)
}
outputStream.close()
outputStream.toByteArray
})

compress
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delete extra newline

/**
* Decompresses the schema into the actual in-memory object. Keeps an internal cache of already
* seen values so to limit the number of times that decompression has to be done.
*/
def decompressor(): Array[Byte] => Schema = {
val cache = new mutable.HashMap[Array[Byte], Schema]()

def decompress(schemaBytes: Array[Byte]): Schema = cache.getOrElseUpdate(schemaBytes, {
val inflater = new Inflater()
inflater.setInput(schemaBytes)
val outputStream = new ByteArrayOutputStream(schemaBytes.length)
val tmpBuffer = Array.ofDim[Byte](1024)
while (!inflater.finished()) {
val count = inflater.inflate(tmpBuffer)
outputStream.write(tmpBuffer, 0, count)
}
inflater.end()
outputStream.close()
new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8"))
})

decompress
}

/**
* Serializes generic records into byte buffers. It keeps an internal cache of already seen
* schema as to reduce the amount of required work.
*/
def serialize(): (GenericRecord, KryoOutput) => Unit = {
val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
val schemaCache = new mutable.HashMap[Schema, Long]()
val compress = compressor()

def serialize[R <: GenericRecord](datum: R, schema: Schema, output: KryoOutput): Unit = {
val encoder = EncoderFactory.get.binaryEncoder(output, null)
writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
.asInstanceOf[DatumWriter[R]]
.write(datum, encoder)
encoder.flush()
}

def wrapDatum(datum: GenericRecord, output: KryoOutput): Unit = {
val schema = datum.getSchema
val fingerprint = schemaCache.getOrElseUpdate(schema, {
SchemaNormalization.parsingFingerprint64(schema)
})
confSchema(fingerprint) match {
case Some(_) => {
output.writeBoolean(true)
output.writeLong(fingerprint)
}
case None => {
output.writeBoolean(false)
val compressedSchema = compress(schema)
output.writeInt(compressedSchema.array.length)
output.writeBytes(compressedSchema.array)
}
}
serialize(datum, schema, output)
}
wrapDatum
}

/**
* Deserializes generic records into their in-memory form. There is internal
* state to keep a cache of already seen schemas and datum readers.
* @return
*/
def deserialize(): KryoInput => GenericRecord = {
val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
val schemaCache = new mutable.HashMap[Long, Schema]()
val decompress = decompressor()

def deserialize(input: KryoInput, schema: Schema): GenericRecord = {
val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
.asInstanceOf[DatumReader[GenericRecord]]
.read(null.asInstanceOf[GenericRecord], decoder)
}

def unwrapDatum(input: KryoInput): GenericRecord = {
val schema = {
if (input.readBoolean()) {
val fingerprint = input.readLong()
schemaCache.getOrElseUpdate(fingerprint, {
confSchema(fingerprint) match {
case Some(s) => new Schema.Parser().parse(s)
case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint")
}
})
} else {
val length = input.readInt()
decompress(input.readBytes(length))
}
}
deserialize(input, schema)
}
unwrapDatum
}

override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
serializer(datum, output)

override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord =
deserializer(input)
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import org.apache.avro.generic.{GenericData, GenericRecord}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: order


import scala.reflect.ClassTag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import order, should be below scala imports


import com.esotericsoftware.kryo.{Kryo, KryoException}
Expand Down Expand Up @@ -99,6 +101,9 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

kryo.register(classOf[GenericRecord], new GenericAvroSerializer(conf))
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(conf))

try {
// Use the default classloader when calling the user registrator.
Thread.currentThread.setContextClassLoader(classLoader)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.serializer

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}

import com.esotericsoftware.kryo.io.{Output, Input}
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.{SchemaBuilder, Schema}
import org.apache.spark.{SparkFunSuite, SharedSparkContext}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line between other imports and spark imports
and the ordering should have class imports ahead of package imports, not just alphabetic, so:

import com.esotericsoftware.kryo.io.{Output, Input}
import org.apache.avro.{SchemaBuilder, Schema}
import org.apache.avro.generic.GenericData.Record

import org.apache.spark.{SparkFunSuite, SharedSparkContext}
import org.apache.spark.io.CompressionCodec


class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val schema : Schema = SchemaBuilder
.record("testRecord").fields()
.requiredString("data")
.endRecord()
val record = new Record(schema)
record.put("data", "test data")

test("schema compression and decompression") {
val genericSer = new GenericAvroSerializer(conf)
val compressor = genericSer.compressor()
val decompessor = genericSer.decompressor()

assert(schema === decompessor.compose(compressor)(schema))
}

test("record serialization and deserialization") {
val genericSer = new GenericAvroSerializer(conf)
val serializer = genericSer.serialize()
val deserializer = genericSer.deserialize()

val outputStream = new ByteArrayOutputStream()
val output = new Output(outputStream)
serializer(record, output)
output.flush()
output.close()

val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
assert(deserializer(input) === record)
}

test("uses schema fingerprint to decrease message size") {
val genericSer = new GenericAvroSerializer(conf)
val serializer = genericSer.serialize()
val output = new Output(new ByteArrayOutputStream())

val beginningNormalPosition = output.total()
serializer(record, output)
output.flush()
val normalLength = output.total - beginningNormalPosition

conf.registerAvroSchema(Array(schema))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you switch to varargs this could just be conf.registerAvroSchema(schema)

val beginningFingerprintPosition = output.total()
serializer(record, output)
val fingerprintLength = output.total - beginningFingerprintPosition

assert(fingerprintLength < normalLength)
}

test("caches previously seen schemas") {
val genericSer = new GenericAvroSerializer(conf)
val compressor = genericSer.compressor()
val decompressor = genericSer.decompressor()
val compressedSchema = compressor(schema)

assert(compressedSchema.eq(compressor(schema)))
assert(decompressor(compressedSchema).eq(decompressor(compressedSchema)))
}
}