Skip to content

Commit

Permalink
spark-util upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Jun 16, 2018
1 parent d16cd03 commit 2f05f3e
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 399 deletions.
16 changes: 7 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
group("org.hammerlab.adam")
name := "core"
v"0.23.2"
subgroup("adam", "core")
github.repo("adam")
v"0.23.4"

spark
publishTestJar

dep(
bdg.formats,
hammerlab("bdg-utils", "cli") % "0.3.0",
//bdg.utils.cli,
bdg.utils.intervalrdd,
bdg.utils.io,
bdg.utils.metrics,
bdg.utils.misc +testtest,
commons.io,
seqdoop_hadoop_bam ^ "7.9.0",
genomics.loci ^ "2.1.0",
genomics.reference % "1.4.3" - htsjdk +testtest,
seqdoop_hadoop_bam % "7.9.0",
genomics.loci % "2.2.0",
genomics.reference % "1.5.0" - htsjdk +testtest,
htsjdk,
log4j,
parquet_avro,
paths ^ "1.5.0",
spark_util ^ "1.2.1",
paths % "1.5.0",
spark_util % "3.0.0",
"it.unimi.dsi" ^ "fastutil" ^ "6.6.5",
"org.apache.avro" ^ "avro" ^ "1.8.1",
"org.apache.httpcomponents" ^ "httpclient" ^ "4.5.2",
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
addSbtPlugin("org.hammerlab.sbt" % "base" % "4.6.2-SNAPSHOT")
addSbtPlugin("org.hammerlab.sbt" % "base" % "4.6.2")

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.bdgenomics.adam.serialization

import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import it.unimi.dsi.fastutil.io.{ FastByteArrayInputStream, FastByteArrayOutputStream }
import org.apache.avro.io.{ BinaryDecoder, BinaryEncoder, DecoderFactory, EncoderFactory }
import org.apache.avro.specific.{ SpecificDatumReader, SpecificDatumWriter, SpecificRecord }

import scala.reflect.ClassTag
// NOTE: This class is not thread-safe; however, Spark guarantees that only a single thread will access it.
class AvroSerializer[T <: SpecificRecord: ClassTag] extends Serializer[T] {
val reader = new SpecificDatumReader[T](scala.reflect.classTag[T].runtimeClass.asInstanceOf[Class[T]])
val writer = new SpecificDatumWriter[T](scala.reflect.classTag[T].runtimeClass.asInstanceOf[Class[T]])
var in = InputStreamWithDecoder(1024)
val outstream = new FastByteArrayOutputStream()
val encoder = EncoderFactory.get().directBinaryEncoder(outstream, null.asInstanceOf[BinaryEncoder])

setAcceptsNull(false)

def write(kryo: Kryo, kryoOut: Output, record: T) = {
outstream.reset()
writer.write(record, encoder)
kryoOut.writeInt(outstream.array.length, true)
kryoOut.write(outstream.array)
}

def read(kryo: Kryo, kryoIn: Input, klazz: Class[T]): T = this.synchronized {
val len = kryoIn.readInt(true)
if (len > in.size) {
in = InputStreamWithDecoder(len + 1024)
}
in.stream.reset()
// Read Kryo bytes into input buffer
kryoIn.readBytes(in.buffer, 0, len)
// Read the Avro object from the buffer
reader.read(null.asInstanceOf[T], in.decoder)
}
}

case class InputStreamWithDecoder(size: Int) {
val buffer = new Array[Byte](size)
val stream = new FastByteArrayInputStream(buffer)
val decoder = DecoderFactory.get().directBinaryDecoder(stream, null.asInstanceOf[BinaryDecoder])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.bdgenomics.adam.serialization

import com.esotericsoftware.kryo.io.{ Input, KryoDataInput, KryoDataOutput, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import org.apache.hadoop.io.Writable

/**
* A Kryo serializer for Hadoop writables.
*
* Lifted from the Apache Spark user email list
* (http://apache-spark-user-list.1001560.n3.nabble.com/Hadoop-Writable-and-Spark-serialization-td5721.html)
* which indicates that it was originally copied from Shark itself, back when
* Spark 0.9 was the state of the art.
*
* @tparam T The class to serialize, which implements the Writable interface.
*/
class WritableSerializer[T <: Writable] extends Serializer[T] {
override def write(kryo: Kryo, output: Output, writable: T) {
writable.write(new KryoDataOutput(output))
}

override def read(kryo: Kryo, input: Input, cls: java.lang.Class[T]): T = {
val writable = cls.newInstance()
writable.readFields(new KryoDataInput(input))
writable
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ class SequenceDictionarySuite
}
}

// implicit def cmpContigName(implicit cmp: Cmp[String]): Cmp.Aux[ContigName, cmp.Diff] = Cmp.by[String, ContigName](_.name)(cmp)
// implicit def cmpLocus(implicit cmp: Cmp[Long]): Cmp.Aux[Locus, cmp.Diff] = Cmp.by[Long, Locus](_.locus)(cmp)
//
// implicitly[Cmp[ContigName]]
// implicitly[Cmp[NumLoci]]
// implicitly[Cmp[Option[String]]]
// implicitly[Cmp[Option[Int]]]
// implicitly[Generic[SequenceRecord]]
// implicitly[Cmp[SequenceRecord]]

test("Can retrieve sequence by name") {
val rec = record("chr1")
val asd = SequenceDictionary(rec)
Expand Down
9 changes: 4 additions & 5 deletions src/test/scala/org/bdgenomics/adam/util/ADAMFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.serializer.KryoRegistrator
import org.bdgenomics.adam.serialization.ADAMKryoRegistrator
import org.hammerlab.cmp.CanEq
import org.hammerlab.genomics.reference.NumLoci
import org.hammerlab.genomics.reference.test.{ ClearContigNames, ContigNameCanEqualString, LocusCanEqualInt }
import org.hammerlab.genomics.reference.test.{ ClearContigNames, LocusCanEqualInt }
import org.hammerlab.hadoop.Configuration
import org.hammerlab.paths.Path
import org.hammerlab.spark.test.suite.KryoSparkSuite
Expand All @@ -36,10 +36,9 @@ abstract class ADAMFunSuite
extends KryoSparkSuite(
referenceTracking = true
)
with ContigNameCanEqualString
with LocusCanEqualInt
with ClearContigNames
with TypeCheckedTripleEquals {
with LocusCanEqualInt
with ClearContigNames
with TypeCheckedTripleEquals {

register(new ADAMKryoRegistrator: KryoRegistrator)

Expand Down

0 comments on commit 2f05f3e

Please sign in to comment.