Skip to content
Permalink
Browse files

Merge 30327b0 into 6e6ac8d

  • Loading branch information
heuermh committed Apr 29, 2019
2 parents 6e6ac8d + 30327b0 commit 2948c4adb09e4faed2cb9c943d5e612a532de326

Large diffs are not rendered by default.

@@ -45,7 +45,6 @@ import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.math.max
import scala.reflect.ClassTag
@@ -321,7 +320,7 @@ sealed abstract class NucleotideContigFragmentDataset extends AvroGenomicDataset
if (isFragment(record)) {
sb.append(s" fragment ${record.getIndex + 1} of ${record.getFragments}")
}
for (line <- Splitter.fixedLength(lineWidth).split(record.getSequence)) {
for (line <- Splitter.fixedLength(lineWidth).split(record.getSequence).asScala) {
sb.append("\n")
sb.append(line)
}
@@ -50,7 +50,6 @@ import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
@@ -397,6 +396,7 @@ sealed abstract class FragmentDataset extends AvroReadGroupGenomicDataset[Fragme
*/
protected def getReferenceRegions(elem: Fragment): Seq[ReferenceRegion] = {
elem.getAlignments
.asScala
.flatMap(r => ReferenceRegion.opt(r))
.toSeq
}
@@ -62,7 +62,6 @@ import org.bdgenomics.utils.interval.array.{
IntervalArraySerializer
}
import org.seqdoop.hadoop_bam._
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.math.{ abs, min }
@@ -689,7 +688,7 @@ sealed abstract class AlignmentRecordDataset extends AvroReadGroupGenomicDataset
val pgRecords = processingSteps.map(r => {
AlignmentRecordDataset.processingStepToSam(r)
})
header.setProgramRecords(pgRecords)
header.setProgramRecords(pgRecords.asJava)

// broadcast for efficiency
val hdrBcast = rdd.context.broadcast(SAMFileHeaderWritable(header))
@@ -810,7 +809,7 @@ sealed abstract class AlignmentRecordDataset extends AvroReadGroupGenomicDataset
val ssd = header.getSequenceDictionary
binaryCodec.writeInt(ssd.size())
ssd.getSequences
.toList
.asScala
.foreach(r => {
binaryCodec.writeString(r.getSequenceName(), true, true)
binaryCodec.writeInt(r.getSequenceLength())
@@ -91,8 +91,19 @@ class WritableSerializer[T <: Writable] extends Serializer[T] {
}

class ADAMKryoRegistrator extends KryoRegistrator with Logging {

override def registerClasses(kryo: Kryo) {

def registerByName(kryo: Kryo, name: String) {
try {
kryo.register(Class.forName(name))
} catch {
case cnfe: java.lang.ClassNotFoundException => {
debug("Could not register class %s by name".format(name))
}
}
}

// Register Avro classes using fully qualified class names
// Sort alphabetically and add blank lines between packages

@@ -113,7 +124,7 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLine])
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLineCount])
kryo.register(classOf[htsjdk.variant.vcf.VCFHeaderLineType])
kryo.register(Class.forName("htsjdk.variant.vcf.VCFCompoundHeaderLine$SupportedHeaderLineType"))
registerByName(kryo, "htsjdk.variant.vcf.VCFCompoundHeaderLine$SupportedHeaderLineType")

// java.lang
kryo.register(classOf[java.lang.Class[_]])
@@ -126,21 +137,21 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[java.util.HashSet[_]])

// org.apache.avro
kryo.register(Class.forName("org.apache.avro.Schema$RecordSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$Field"))
kryo.register(Class.forName("org.apache.avro.Schema$Field$Order"))
kryo.register(Class.forName("org.apache.avro.Schema$UnionSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$Type"))
kryo.register(Class.forName("org.apache.avro.Schema$LockableArrayList"))
kryo.register(Class.forName("org.apache.avro.Schema$BooleanSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$NullSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$StringSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$IntSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$FloatSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$EnumSchema"))
kryo.register(Class.forName("org.apache.avro.Schema$Name"))
kryo.register(Class.forName("org.apache.avro.Schema$LongSchema"))
kryo.register(Class.forName("org.apache.avro.generic.GenericData$Array"))
registerByName(kryo, "org.apache.avro.Schema$RecordSchema")
registerByName(kryo, "org.apache.avro.Schema$Field")
registerByName(kryo, "org.apache.avro.Schema$Field$Order")
registerByName(kryo, "org.apache.avro.Schema$UnionSchema")
registerByName(kryo, "org.apache.avro.Schema$Type")
registerByName(kryo, "org.apache.avro.Schema$LockableArrayList")
registerByName(kryo, "org.apache.avro.Schema$BooleanSchema")
registerByName(kryo, "org.apache.avro.Schema$NullSchema")
registerByName(kryo, "org.apache.avro.Schema$StringSchema")
registerByName(kryo, "org.apache.avro.Schema$IntSchema")
registerByName(kryo, "org.apache.avro.Schema$FloatSchema")
registerByName(kryo, "org.apache.avro.Schema$EnumSchema")
registerByName(kryo, "org.apache.avro.Schema$Name")
registerByName(kryo, "org.apache.avro.Schema$LongSchema")
registerByName(kryo, "org.apache.avro.generic.GenericData$Array")

// org.apache.hadoop.conf
kryo.register(classOf[org.apache.hadoop.conf.Configuration],
@@ -215,7 +226,7 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[org.bdgenomics.adam.rdd.read.realignment.TargetSet],
new org.bdgenomics.adam.rdd.read.realignment.TargetSetSerializer)

// org.bdgenomics.adam.rdd.read.recalibration.
// org.bdgenomics.adam.rdd.read.recalibration
kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.CovariateKey])
kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.CycleCovariate])
kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.DinucCovariate])
@@ -279,24 +290,23 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[org.codehaus.jackson.node.BooleanNode])
kryo.register(classOf[org.codehaus.jackson.node.TextNode])

// org.apache.spark
try {
kryo.register(Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"))
kryo.register(Class.forName("org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult"))
kryo.register(Class.forName("org.apache.spark.sql.execution.datasources.BasicWriteTaskStats"))
kryo.register(Class.forName("org.apache.spark.sql.execution.datasources.ExecutedWriteSummary"))
} catch {
case cnfe: java.lang.ClassNotFoundException => {
debug("Did not find Spark internal class. This is expected for earlier Spark versions.")
}
}
// org.apache.spark.internal
registerByName(kryo, "org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage")

// org.apache.spark.catalyst
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow])
kryo.register(Class.forName("org.apache.spark.sql.types.BooleanType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.DoubleType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.FloatType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.IntegerType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.LongType$"))
kryo.register(Class.forName("org.apache.spark.sql.types.StringType$"))

// org.apache.spark.sql
registerByName(kryo, "org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.BasicWriteTaskStats")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.ExecutedWriteSummary")
registerByName(kryo, "org.apache.spark.sql.execution.datasources.WriteTaskResult")
registerByName(kryo, "org.apache.spark.sql.types.BooleanType$")
registerByName(kryo, "org.apache.spark.sql.types.DoubleType$")
registerByName(kryo, "org.apache.spark.sql.types.FloatType$")
registerByName(kryo, "org.apache.spark.sql.types.IntegerType$")
registerByName(kryo, "org.apache.spark.sql.types.LongType$")
registerByName(kryo, "org.apache.spark.sql.types.StringType$")
kryo.register(classOf[org.apache.spark.sql.types.ArrayType])
kryo.register(classOf[org.apache.spark.sql.types.MapType])
kryo.register(classOf[org.apache.spark.sql.types.Metadata])
@@ -344,26 +354,26 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[scala.Array[Long]])
kryo.register(classOf[scala.Array[String]])
kryo.register(classOf[scala.Array[Option[_]]])
kryo.register(Class.forName("scala.Tuple2$mcCC$sp"))
registerByName(kryo, "scala.Tuple2$mcCC$sp")

// scala.collection
kryo.register(Class.forName("scala.collection.Iterator$$anon$11"))
kryo.register(Class.forName("scala.collection.Iterator$$anonfun$toStream$1"))
registerByName(kryo, "scala.collection.Iterator$$anon$11")
registerByName(kryo, "scala.collection.Iterator$$anonfun$toStream$1")

// scala.collection.convert
kryo.register(Class.forName("scala.collection.convert.Wrappers$"))
registerByName(kryo, "scala.collection.convert.Wrappers$")

// scala.collection.immutable
kryo.register(classOf[scala.collection.immutable.::[_]])
kryo.register(classOf[scala.collection.immutable.Range])
kryo.register(Class.forName("scala.collection.immutable.Stream$Cons"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Empty$"))
kryo.register(Class.forName("scala.collection.immutable.Set$EmptySet$"))
registerByName(kryo, "scala.collection.immutable.Stream$Cons")
registerByName(kryo, "scala.collection.immutable.Stream$Empty$")
registerByName(kryo, "scala.collection.immutable.Set$EmptySet$")

// scala.collection.mutable
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.collection.mutable.ListBuffer[_]])
kryo.register(Class.forName("scala.collection.mutable.ListBuffer$$anon$1"))
registerByName(kryo, "scala.collection.mutable.ListBuffer$$anon$1")
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofInt])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofLong])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofByte])
@@ -380,10 +390,10 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
// See also:
//
// https://mail-archives.apache.org/mod_mbox/spark-user/201504.mbox/%3CCAC95X6JgXQ3neXF6otj6a+F_MwJ9jbj9P-Ssw3Oqkf518_eT1w@mail.gmail.com%3E
kryo.register(Class.forName("scala.reflect.ClassTag$$anon$1"))
registerByName(kryo, "scala.reflect.ClassTag$$anon$1")

// needed for manifests
kryo.register(Class.forName("scala.reflect.ManifestFactory$ClassTypeManifest"))
registerByName(kryo, "scala.reflect.ManifestFactory$ClassTypeManifest")

// Added to Spark in 1.6.0; needed here for Spark < 1.6.0.
kryo.register(classOf[Array[Tuple1[Any]]])
@@ -1598,6 +1598,7 @@ class AlignmentRecordDatasetSuite extends ADAMFunSuite {
val tempPath = tmpLocation(".adam")
variants.saveAsParquet(tempPath)

System.out.println("loading " + tempPath + " as parquet into RDD...");
assert(sc.loadVariants(tempPath).rdd.count === 20)
}

21 pom.xml
@@ -21,8 +21,8 @@
<avro.version>1.8.2</avro.version>
<scala.version>2.11.12</scala.version>
<scala.version.prefix>2.11</scala.version.prefix>
<spark.version>2.3.3</spark.version>
<parquet.version>1.8.3</parquet.version>
<spark.version>2.4.2</spark.version>
<parquet.version>1.10.1</parquet.version>
<!-- Edit the following line to configure the Hadoop (HDFS) version. -->
<hadoop.version>2.7.5</hadoop.version>
<hadoop-bam.version>7.9.2</hadoop-bam.version>
@@ -508,22 +508,7 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<!--
IMPORTANT NOTE
==============
Spark 2.2.0 bumps to Parquet 1.8.2, but keeps Avro at 1.7.3. This
causes an issue when using parquet-avro 1.8.2, which relies on a
method that is new in Avro 1.8.x. The recommended workaround is to
pin parquet-avro to 1.8.1, instead of 1.8.2, as described in the
Spark 2.2.0 release notes.
-->
<version>1.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.seqdoop</groupId>
@@ -11,11 +11,7 @@ then
exit 1
fi

find . -name "pom.xml" -exec sed -e "s/2.10.6/2.11.12/g" \
-e "s/2.10/2.11/g" \
find . -name "pom.xml" -exec sed -e "s/2.12.6/2.11.12/g" \
-e "s/2.12/2.11/g" \
-i.2.11.bak '{}' \;
# keep parquet-scala at parquet-scala_2.10
find . -name "pom.xml" -exec sed -e "s/parquet-scala_2.11/parquet-scala_2.10/g" -i.2.11.2.bak '{}' \;
# keep maven-javadoc-plugin at version 2.10.4
find . -name "pom.xml" -exec sed -e "s/2.11.12/2.10.4/g" -i.2.11.3.bak '{}' \;
find . -name "*.2.11.*bak" -exec rm -f {} \;
@@ -0,0 +1,19 @@
#!/bin/bash

set +x

grep "<scala\.version>" pom.xml | grep -q 2.12
if [[ $? == 0 ]];
then
echo "Scala version is already set to 2.12 (Scala artifacts have _2.12 version suffix in artifact name)."
echo "Cowardly refusing to move to Scala 2.12 a second time..."

exit 1
fi

find . -name "pom.xml" -exec sed -e "s/2.11.12/2.12.6/g" \
-e "s/2.11/2.12/g" \
-i.2.12.bak '{}' \;
# keep parquet-scala at parquet-scala_2.10
find . -name "pom.xml" -exec sed -e "s/parquet-scala_2.12/parquet-scala_2.10/g" -i.2.12.2.bak '{}' \;
find . -name "*.2.12.*bak" -exec rm -f {} \;

0 comments on commit 2948c4a

Please sign in to comment.
You can’t perform that action at this time.