Skip to content

Commit

Permalink
updated per feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
JDrit committed Jul 22, 2015
1 parent c0cf329 commit 8158d51
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
* record serializer can decrease network IO
*/
def registerAvroSchemas(schemas: Schema*): SparkConf = {
schemas.foldLeft(this) { (conf, schema) =>
conf.set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString)
for (schema <- schemas) {
set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString)
}
this
}

/** Gets all the avro schemas in the configuration used in the generic Avro record serializer */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._
import org.apache.commons.io.IOUtils

import org.apache.spark.SparkEnv
import org.apache.spark.{SparkException, SparkEnv}
import org.apache.spark.io.CompressionCodec

/**
Expand All @@ -38,6 +38,9 @@ import org.apache.spark.io.CompressionCodec
* schema, as to reduce network IO.
* Actions like parsing or compressing schemas are computationally expensive so the serializer
* caches all previously seen values as to reduce the amount of work needed to do.
* @param schemas a map where the keys are unique IDs for Avro schemas and the values are the
* string representation of the Avro schema, used to decrease the amount of data
* that needs to be serialized.
*/
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
extends KSerializer[GenericRecord] {
Expand Down Expand Up @@ -118,7 +121,12 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
schemaCache.getOrElseUpdate(fingerprint, {
schemas.get(fingerprint) match {
case Some(s) => new Schema.Parser().parse(s)
case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint")
case None =>
throw new SparkException(
"""Error reading attempting to read avro data --
|encountered an unknown fingerprint: $fingerprint, not sure what schema to use.
|This could happen if you registered additional schemas after starting your
|spark context.""".stripMargin)
}
})
} else {
Expand Down

0 comments on commit 8158d51

Please sign in to comment.