Skip to content

Commit

Permalink
Reduce classForName calls to improve registrator performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Dec 14, 2021
1 parent 563d6aa commit 707933f
Showing 1 changed file with 62 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,9 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {

override def registerClasses(kryo: Kryo) {

def registerByName(kryo: Kryo, name: String) {
try {
kryo.register(Class.forName(name))
} catch {
case cnfe: java.lang.ClassNotFoundException => {
debug("Could not register class %s by name".format(name))
}
}
}

// Register Avro classes using fully qualified class names
// Sort alphabetically and add blank lines between packages
// Classes that require Class.forName list below in forNameClasses

// htsjdk.samtools
kryo.register(classOf[htsjdk.samtools.CigarElement])
Expand All @@ -116,8 +107,6 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[htsjdk.samtools.SAMSequenceDictionary])
kryo.register(classOf[htsjdk.samtools.SAMFileHeader])
kryo.register(classOf[htsjdk.samtools.SAMSequenceRecord])
registerByName(kryo, "htsjdk.samtools.SAMFileHeader$GroupOrder")
registerByName(kryo, "htsjdk.samtools.SAMFileHeader$SortOrder")

// htsjdk.variant.vcf
kryo.register(classOf[htsjdk.variant.vcf.VCFContigHeaderLine])
Expand All @@ -128,7 +117,6 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLine])
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLineCount])
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLineType])
registerByName(kryo, "htsjdk.variant.vcf.VCFCompoundHeaderLine$SupportedHeaderLineType")

// java.lang
kryo.register(classOf[java.lang.Class[_]])
Expand All @@ -140,23 +128,6 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[java.util.HashMap[_, _]])
kryo.register(classOf[java.util.HashSet[_]])

// org.apache.avro
registerByName(kryo, "org.apache.avro.Schema$RecordSchema")
registerByName(kryo, "org.apache.avro.Schema$Field")
registerByName(kryo, "org.apache.avro.Schema$Field$Order")
registerByName(kryo, "org.apache.avro.Schema$UnionSchema")
registerByName(kryo, "org.apache.avro.Schema$Type")
registerByName(kryo, "org.apache.avro.Schema$LockableArrayList")
registerByName(kryo, "org.apache.avro.Schema$BooleanSchema")
registerByName(kryo, "org.apache.avro.Schema$NullSchema")
registerByName(kryo, "org.apache.avro.Schema$StringSchema")
registerByName(kryo, "org.apache.avro.Schema$IntSchema")
registerByName(kryo, "org.apache.avro.Schema$FloatSchema")
registerByName(kryo, "org.apache.avro.Schema$EnumSchema")
registerByName(kryo, "org.apache.avro.Schema$Name")
registerByName(kryo, "org.apache.avro.Schema$LongSchema")
registerByName(kryo, "org.apache.avro.generic.GenericData$Array")

// org.apache.hadoop.conf
kryo.register(classOf[org.apache.hadoop.conf.Configuration],
new WritableSerializer[org.apache.hadoop.conf.Configuration])
Expand Down Expand Up @@ -291,23 +262,10 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[org.bdgenomics.formats.avro.VariantCallingAnnotations],
new AvroSerializer[org.bdgenomics.formats.avro.VariantCallingAnnotations])

// org.apache.spark.internal
registerByName(kryo, "org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage")

// org.apache.spark.catalyst
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow])

// org.apache.spark.sql
registerByName(kryo, "org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.BasicWriteTaskStats")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.ExecutedWriteSummary")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.WriteTaskResult")
registerByName(kryo, "org.apache.spark.sql.types.BooleanType$")
registerByName(kryo, "org.apache.spark.sql.types.DoubleType$")
registerByName(kryo, "org.apache.spark.sql.types.FloatType$")
registerByName(kryo, "org.apache.spark.sql.types.IntegerType$")
registerByName(kryo, "org.apache.spark.sql.types.LongType$")
registerByName(kryo, "org.apache.spark.sql.types.StringType$")
kryo.register(classOf[org.apache.spark.sql.types.ArrayType])
kryo.register(classOf[org.apache.spark.sql.types.MapType])
kryo.register(classOf[org.apache.spark.sql.types.Metadata])
Expand Down Expand Up @@ -354,26 +312,14 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[scala.Array[Long]])
kryo.register(classOf[scala.Array[String]])
kryo.register(classOf[scala.Array[Option[_]]])
registerByName(kryo, "scala.Tuple2$mcCC$sp")

// scala.collection
registerByName(kryo, "scala.collection.Iterator$$anon$11")
registerByName(kryo, "scala.collection.Iterator$$anonfun$toStream$1")

// scala.collection.convert
registerByName(kryo, "scala.collection.convert.Wrappers$")

// scala.collection.immutable
kryo.register(classOf[scala.collection.immutable.::[_]])
kryo.register(classOf[scala.collection.immutable.Range])
registerByName(kryo, "scala.collection.immutable.Stream$Cons")
registerByName(kryo, "scala.collection.immutable.Stream$Empty$")
registerByName(kryo, "scala.collection.immutable.Set$EmptySet$")

// scala.collection.mutable
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.collection.mutable.ListBuffer[_]])
registerByName(kryo, "scala.collection.mutable.ListBuffer$$anon$1")
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofInt])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofLong])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofByte])
Expand All @@ -383,47 +329,67 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
// scala.math
kryo.register(scala.math.Numeric.LongIsIntegral.getClass)

// scala.reflect
registerByName(kryo, "scala.reflect.ClassTag$GenericClassTag")

// This seems to be necessary when serializing a RangePartitioner, which writes out a ClassTag:
//
// https://github.com/apache/spark/blob/v1.5.2/core/src/main/scala/org/apache/spark/Partitioner.scala#L220
//
// See also:
//
// https://mail-archives.apache.org/mod_mbox/spark-user/201504.mbox/%3CCAC95X6JgXQ3neXF6otj6a+F_MwJ9jbj9P-Ssw3Oqkf518_eT1w@mail.gmail.com%3E
registerByName(kryo, "scala.reflect.ClassTag$$anon$1")

// needed for manifests
registerByName(kryo, "scala.reflect.ManifestFactory$ClassTypeManifest")

// Added to Spark in 1.6.0; needed here for Spark < 1.6.0.
kryo.register(classOf[Array[Tuple1[Any]]])
kryo.register(classOf[Array[(Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])
kryo.register(classOf[Array[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any)]])

kryo.register(Map.empty.getClass)
kryo.register(Nil.getClass)
kryo.register(None.getClass)

ADAMKryoRegistrator.forNameClasses.foreach { clazz =>
try {
kryo.register(clazz)
} catch {
case _: Throwable => // do nothing
}
}
}
}

private[serialization] object ADAMKryoRegistrator {
private lazy val forNameClasses: Seq[Class[_]] = {
Seq(
"htsjdk.samtools.SAMFileHeader$GroupOrder",
"htsjdk.samtools.SAMFileHeader$SortOrder",
"htsjdk.variant.vcf.VCFCompoundHeaderLine$SupportedHeaderLineType",
"org.apache.avro.Schema$RecordSchema",
"org.apache.avro.Schema$Field",
"org.apache.avro.Schema$Field$Order",
"org.apache.avro.Schema$UnionSchema",
"org.apache.avro.Schema$Type",
"org.apache.avro.Schema$LockableArrayList",
"org.apache.avro.Schema$BooleanSchema",
"org.apache.avro.Schema$NullSchema",
"org.apache.avro.Schema$StringSchema",
"org.apache.avro.Schema$IntSchema",
"org.apache.avro.Schema$FloatSchema",
"org.apache.avro.Schema$EnumSchema",
"org.apache.avro.Schema$Name",
"org.apache.avro.Schema$LongSchema",
"org.apache.avro.generic.GenericData$Array",
"org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage",
"org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult",
"org.apache.spark.sql.execution.datasources.BasicWriteTaskStats",
"org.apache.spark.sql.execution.datasources.ExecutedWriteSummary",
"org.apache.spark.sql.execution.datasources.WriteTaskResult",
"org.apache.spark.sql.types.BooleanType$",
"org.apache.spark.sql.types.DoubleType$",
"org.apache.spark.sql.types.FloatType$",
"org.apache.spark.sql.types.IntegerType$",
"org.apache.spark.sql.types.LongType$",
"org.apache.spark.sql.types.StringType$",
"scala.Tuple2$mcCC$sp",
"scala.collection.Iterator$$anon$11",
"scala.collection.Iterator$$anonfun$toStream$1",
"scala.collection.convert.Wrappers$",
"scala.collection.immutable.Stream$Cons",
"scala.collection.immutable.Stream$Empty$",
"scala.collection.immutable.Set$EmptySet$",
"scala.collection.mutable.ListBuffer$$anon$1",
"scala.reflect.ClassTag$GenericClassTag",
"scala.reflect.ClassTag$$anon$1",
"scala.reflect.ManifestFactory$ClassTypeManifest"
).flatMap { name =>
try {
Some[Class[_]](Class.forName(name))
} catch {
case _: Throwable => None
}
}
}
}

0 comments on commit 707933f

Please sign in to comment.