Skip to content

Commit

Permalink
Merge f2cd5b0 into 9362f15
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Mar 1, 2019
2 parents 9362f15 + f2cd5b0 commit fe0dafe
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 70 deletions.
Expand Up @@ -91,8 +91,19 @@ class WritableSerializer[T <: Writable] extends Serializer[T] {
}

class ADAMKryoRegistrator extends KryoRegistrator with Logging {

override def registerClasses(kryo: Kryo) {

def registerByName(kryo: Kryo, name: String) {
try {
kryo.register(Class.forName(name))
} catch {
case cnfe: java.lang.ClassNotFoundException => {
if (log.isDebugEnabled) log.debug("Could not register class {} by name", name)
}
}
}

// Register Avro classes using fully qualified class names
// Sort alphabetically and add blank lines between packages

Expand All @@ -113,7 +124,7 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLine])
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLineCount])
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLineType])
kryo.register(Class.forName("htsjdk.variant.vcf.VCFCompoundHeaderLine$SupportedHeaderLineType"))
registerByName(kryo, "htsjdk.variant.vcf.VCFCompoundHeaderLine$SupportedHeaderLineType")

// java.lang
kryo.register(classOf[java.lang.Class[_]])
Expand All @@ -126,21 +137,21 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[java.util.HashSet[_]])

// org.apache.avro
kryo.register(Class.forName("org.apache.avro.Schema$RecordSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$Field"))
kryo.register(Class.forName("org.apache.avro.Schema$Field$Order"))
kryo.register(Class.forName("org.apache.avro.Schema$UnionSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$Type"))
kryo.register(Class.forName("org.apache.avro.Schema$LockableArrayList"))
kryo.register(Class.forName("org.apache.avro.Schema$BooleanSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$NullSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$StringSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$IntSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$FloatSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$EnumSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$Name"))
kryo.register(Class.forName("org.apache.avro.Schema$LongSchema"))
kryo.register(Class.forName("org.apache.avro.generic.GenericData$Array"))
registerByName(kryo, "org.apache.avro.Schema$RecordSchema")
registerByName(kryo, "org.apache.avro.Schema$Field")
registerByName(kryo, "org.apache.avro.Schema$Field$Order")
registerByName(kryo, "org.apache.avro.Schema$UnionSchema")
registerByName(kryo, "org.apache.avro.Schema$Type")
registerByName(kryo, "org.apache.avro.Schema$LockableArrayList")
registerByName(kryo, "org.apache.avro.Schema$BooleanSchema")
registerByName(kryo, "org.apache.avro.Schema$NullSchema")
registerByName(kryo, "org.apache.avro.Schema$StringSchema")
registerByName(kryo, "org.apache.avro.Schema$IntSchema")
registerByName(kryo, "org.apache.avro.Schema$FloatSchema")
registerByName(kryo, "org.apache.avro.Schema$EnumSchema")
registerByName(kryo, "org.apache.avro.Schema$Name")
registerByName(kryo, "org.apache.avro.Schema$LongSchema")
registerByName(kryo, "org.apache.avro.generic.GenericData$Array")

// org.apache.hadoop.conf
kryo.register(classOf[org.apache.hadoop.conf.Configuration],
Expand Down Expand Up @@ -215,7 +226,7 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[org.bdgenomics.adam.rdd.read.realignment.TargetSet],
new org.bdgenomics.adam.rdd.read.realignment.TargetSetSerializer)

// org.bdgenomics.adam.rdd.read.recalibration.
// org.bdgenomics.adam.rdd.read.recalibration
kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.CovariateKey])
kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.CycleCovariate])
kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.DinucCovariate])
Expand Down Expand Up @@ -279,24 +290,23 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[org.codehaus.jackson.node.BooleanNode])
kryo.register(classOf[org.codehaus.jackson.node.TextNode])

// org.apache.spark
try {
kryo.register(Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"))
kryo.register(Class.forName("org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult"))
kryo.register(Class.forName("org.apache.spark.sql.execution.datasources.BasicWriteTaskStats"))
kryo.register(Class.forName("org.apache.spark.sql.execution.datasources.ExecutedWriteSummary"))
} catch {
case cnfe: java.lang.ClassNotFoundException => {
if (log.isDebugEnabled) log.debug("Did not find Spark internal class. This is expected for earlier Spark versions.")
}
}
// org.apache.spark.internal
registerByName(kryo, "org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage")

// org.apache.spark.catalyst
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow])
kryo.register(Class.forName("org.apache.spark.sql.types.BooleanType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.DoubleType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.FloatType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.IntegerType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.LongType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.StringType$"))

// org.apache.spark.sql
registerByName(kryo, "org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.BasicWriteTaskStats")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.ExecutedWriteSummary")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.WriteTaskResult")
registerByName(kryo, "org.apache.spark.sql.types.BooleanType$")
registerByName(kryo, "org.apache.spark.sql.types.DoubleType$")
registerByName(kryo, "org.apache.spark.sql.types.FloatType$")
registerByName(kryo, "org.apache.spark.sql.types.IntegerType$")
registerByName(kryo, "org.apache.spark.sql.types.LongType$")
registerByName(kryo, "org.apache.spark.sql.types.StringType$")
kryo.register(classOf[org.apache.spark.sql.types.ArrayType])
kryo.register(classOf[org.apache.spark.sql.types.MapType])
kryo.register(classOf[org.apache.spark.sql.types.Metadata])
Expand Down Expand Up @@ -344,26 +354,26 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[scala.Array[Long]])
kryo.register(classOf[scala.Array[String]])
kryo.register(classOf[scala.Array[Option[_]]])
kryo.register(Class.forName("scala.Tuple2$mcCC$sp"))
registerByName(kryo, "scala.Tuple2$mcCC$sp")

// scala.collection
kryo.register(Class.forName("scala.collection.Iterator$$anon$11"))
kryo.register(Class.forName("scala.collection.Iterator$$anonfun$toStream$1"))
registerByName(kryo, "scala.collection.Iterator$$anon$11")
registerByName(kryo, "scala.collection.Iterator$$anonfun$toStream$1")

// scala.collection.convert
kryo.register(Class.forName("scala.collection.convert.Wrappers$"))
registerByName(kryo, "scala.collection.convert.Wrappers$")

// scala.collection.immutable
kryo.register(classOf[scala.collection.immutable.::[_]])
kryo.register(classOf[scala.collection.immutable.Range])
kryo.register(Class.forName("scala.collection.immutable.Stream$Cons"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Empty$"))
kryo.register(Class.forName("scala.collection.immutable.Set$EmptySet$"))
registerByName(kryo, "scala.collection.immutable.Stream$Cons")
registerByName(kryo, "scala.collection.immutable.Stream$Empty$")
registerByName(kryo, "scala.collection.immutable.Set$EmptySet$")

// scala.collection.mutable
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.collection.mutable.ListBuffer[_]])
kryo.register(Class.forName("scala.collection.mutable.ListBuffer$$anon$1"))
registerByName(kryo, "scala.collection.mutable.ListBuffer$$anon$1")
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofInt])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofLong])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofByte])
Expand All @@ -380,10 +390,10 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
// See also:
//
// https://mail-archives.apache.org/mod_mbox/spark-user/201504.mbox/%3CCAC95X6JgXQ3neXF6otj6a+F_MwJ9jbj9P-Ssw3Oqkf518_eT1w@mail.gmail.com%3E
kryo.register(Class.forName("scala.reflect.ClassTag$$anon$1"))
registerByName(kryo, "scala.reflect.ClassTag$$anon$1")

// needed for manifests
kryo.register(Class.forName("scala.reflect.ManifestFactory$ClassTypeManifest"))
registerByName(kryo, "scala.reflect.ManifestFactory$ClassTypeManifest")

// Added to Spark in 1.6.0; needed here for Spark < 1.6.0.
kryo.register(classOf[Array[Tuple1[Any]]])
Expand Down
Expand Up @@ -1598,6 +1598,7 @@ class AlignmentRecordDatasetSuite extends ADAMFunSuite {
val tempPath = tmpLocation(".adam")
variants.saveAsParquet(tempPath)

System.out.println("loading " + tempPath + " as parquet into RDD...");
assert(sc.loadVariants(tempPath).rdd.count === 20)
}

Expand Down
25 changes: 5 additions & 20 deletions pom.xml
Expand Up @@ -21,14 +21,14 @@
<avro.version>1.8.2</avro.version>
<scala.version>2.11.12</scala.version>
<scala.version.prefix>2.11</scala.version.prefix>
<spark.version>2.3.3</spark.version>
<parquet.version>1.8.3</parquet.version>
<spark.version>2.4.0</spark.version>
<parquet.version>1.10.0</parquet.version>
<!-- Edit the following line to configure the Hadoop (HDFS) version. -->
<hadoop.version>2.7.5</hadoop.version>
<hadoop-bam.version>7.9.2</hadoop-bam.version>
<slf4j.version>1.7.25</slf4j.version>
<bdg-formats.version>0.12.0</bdg-formats.version>
<bdg-utils.version>0.2.13</bdg-utils.version>
<bdg-utils.version>0.2.14</bdg-utils.version>
<htsjdk.version>2.16.1</htsjdk.version>
<scoverage.plugin.version>1.1.1</scoverage.plugin.version>
</properties>
Expand Down Expand Up @@ -507,22 +507,7 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<!--
IMPORTANT NOTE
==============
Spark 2.2.0 bumps to Parquet 1.8.2, but keeps Avro at 1.7.3. This
causes an issue when using parquet-avro 1.8.2, which relies on a
method that is new in Avro 1.8.x. The recommended workaround is to
pin parquet-avro to 1.8.1, instead of 1.8.2, as described in the
Spark 2.2.0 release notes.
-->
<version>1.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down Expand Up @@ -560,7 +545,7 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version.prefix}</artifactId>
<version>2.2.6</version>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
8 changes: 2 additions & 6 deletions scripts/move_to_scala_2.11.sh
Expand Up @@ -11,11 +11,7 @@ then
exit 1
fi

find . -name "pom.xml" -exec sed -e "s/2.10.6/2.11.12/g" \
-e "s/2.10/2.11/g" \
find . -name "pom.xml" -exec sed -e "s/2.12.6/2.11.12/g" \
-e "s/2.12/2.11/g" \
-i.2.11.bak '{}' \;
# keep parquet-scala at parquet-scala_2.10
find . -name "pom.xml" -exec sed -e "s/parquet-scala_2.11/parquet-scala_2.10/g" -i.2.11.2.bak '{}' \;
# keep maven-javadoc-plugin at version 2.10.4
find . -name "pom.xml" -exec sed -e "s/2.11.12/2.10.4/g" -i.2.11.3.bak '{}' \;
find . -name "*.2.11.*bak" -exec rm -f {} \;
19 changes: 19 additions & 0 deletions scripts/move_to_scala_2.12.sh
@@ -0,0 +1,19 @@
#!/bin/bash

set +x

grep "<scala\.version>" pom.xml | grep -q 2.12
if [[ $? == 0 ]];
then
echo "Scala version is already set to 2.12 (Scala artifacts have _2.12 version suffix in artifact name)."
echo "Cowardly refusing to move to Scala 2.12 a second time..."

exit 1
fi

find . -name "pom.xml" -exec sed -e "s/2.11.12/2.12.6/g" \
-e "s/2.11/2.12/g" \
-i.2.12.bak '{}' \;
# keep parquet-scala at parquet-scala_2.10
find . -name "pom.xml" -exec sed -e "s/parquet-scala_2.12/parquet-scala_2.10/g" -i.2.12.2.bak '{}' \;
find . -name "*.2.12.*bak" -exec rm -f {} \;

0 comments on commit fe0dafe

Please sign in to comment.