Skip to content

Commit

Permalink
Merge f4a9119 into 6a701f5
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Oct 10, 2017
2 parents 6a701f5 + f4a9119 commit f17f98a
Show file tree
Hide file tree
Showing 17 changed files with 236 additions and 167 deletions.
6 changes: 3 additions & 3 deletions build.sbt
@@ -1,12 +1,12 @@
name := "spark-tests"

version := "2.2.0"
version := "2.3.0"

deps ++= Seq(
paths % "1.2.0",
paths % "1.3.1",
scalatest,
spark,
spark_util % "1.3.0",
spark_util % "2.0.0",
testUtils
)

Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
@@ -1 +1 @@
addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.1.1")
addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.3.0")
31 changes: 0 additions & 31 deletions src/main/scala/org/hammerlab/kryo/Registrar.scala

This file was deleted.

57 changes: 0 additions & 57 deletions src/main/scala/org/hammerlab/kryo/Registration.scala

This file was deleted.

23 changes: 0 additions & 23 deletions src/main/scala/org/hammerlab/kryo/spark/Registrator.scala

This file was deleted.

21 changes: 21 additions & 0 deletions src/main/scala/org/hammerlab/spark/AtomicLongAccumulator.scala
@@ -0,0 +1,21 @@
package org.hammerlab.spark.test

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.util.AccumulatorV2

/**
* Thread-safe version of [[org.apache.spark.util.LongAccumulator]], suitable for use as a static value in tests.
*
* See https://issues.apache.org/jira/browse/SPARK-21425.
*/
case class AtomicLongAccumulator(initialValue: Long = 0)
extends AccumulatorV2[Long, Long] {
private var _value = new AtomicLong(initialValue)
override def value: Long = _value.get
override def isZero: Boolean = value == 0
override def copy(): AccumulatorV2[Long, Long] = AtomicLongAccumulator(value)
override def reset(): Unit = _value = new AtomicLong(0)
override def add(v: Long): Unit = _value.addAndGet(v)
override def merge(other: AccumulatorV2[Long, Long]): Unit = add(other.value)
}
@@ -1,27 +1,36 @@
package org.hammerlab.spark.test.serde

import java.io.{ByteArrayOutputStream, FileInputStream, FileOutputStream, InputStream, OutputStream}
import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.io.{ Input, Output }
import org.hammerlab.paths.Path

import scala.reflect.ClassTag

/**
* Helpers for kryo serde in tests.
*/
object KryoSerialization {
def kryoRead[T](bytes: Array[Byte])(implicit ct: ClassTag[T], kryo: Kryo): T = {
def kryoRead[T: ClassTag](bytes: Array[Byte])(
implicit kryo: Kryo
): T =
kryoRead[T](bytes, includeClass = false)
}
def kryoRead[T](bytes: Array[Byte], includeClass: Boolean)(implicit ct: ClassTag[T], kryo: Kryo): T = {

def kryoRead[T: ClassTag](bytes: Array[Byte],
includeClass: Boolean)(
implicit kryo: Kryo
): T =
kryoRead[T](new Input(bytes), includeClass)
}

def kryoRead[T](is: InputStream)(implicit ct: ClassTag[T], kryo: Kryo): T = {

def kryoRead[T: ClassTag](is: InputStream)(implicit kryo: Kryo): T =
kryoRead[T](is, includeClass = false)
}
def kryoRead[T](is: InputStream, includeClass: Boolean)(implicit ct: ClassTag[T], kryo: Kryo): T = {

def kryoRead[T: ClassTag](is: InputStream,
includeClass: Boolean)(
implicit kryo: Kryo
): T = {
val ip = new Input(is)
try {
kryoRead[T](ip, includeClass)
Expand All @@ -30,23 +39,39 @@ object KryoSerialization {
}
}

def kryoRead[T](fn: String)(implicit ct: ClassTag[T], kryo: Kryo): T = {
kryoRead[T](fn, includeClass = false)
}
def kryoRead[T](fn: String, includeClass: Boolean)(implicit ct: ClassTag[T], kryo: Kryo): T = {
kryoRead[T](new FileInputStream(fn), includeClass)
}
def kryoRead[T: ClassTag](path: Path)(implicit kryo: Kryo): T =
kryoRead[T](path, includeClass = false)

def kryoRead[T: ClassTag](path: Path,
includeClass: Boolean)(
implicit kryo: Kryo
): T =
kryoRead[T](
path.inputStream,
includeClass
)

def kryoRead[T](ip: Input, includeClass: Boolean)(implicit ct: ClassTag[T], kryo: Kryo): T = {
def kryoRead[T](ip: Input,
includeClass: Boolean)(
implicit
ct: ClassTag[T],
kryo: Kryo
): T =
if (includeClass) {
kryo.readClassAndObject(ip).asInstanceOf[T]
kryo
.readClassAndObject(ip)
.asInstanceOf[T]
} else {
kryo.readObject(ip, ct.runtimeClass).asInstanceOf[T]
kryo
.readObject(ip, ct.runtimeClass)
.asInstanceOf[T]
}
}

def kryoWrite(o: Object, os: OutputStream, includeClass: Boolean)(implicit kryo: Kryo): Unit = {

def kryoWrite(o: Object,
os: OutputStream,
includeClass: Boolean)(
implicit kryo: Kryo
): Unit = {
val op = new Output(os)
if (includeClass) {
kryo.writeClassAndObject(op, o)
Expand All @@ -56,12 +81,15 @@ object KryoSerialization {
op.close()
}

def kryoWrite(o: Object, fn: String)(implicit kryo: Kryo): Unit = kryoWrite(o, fn, includeClass = false)
def kryoWrite(o: Object, fn: String, includeClass: Boolean)(implicit kryo: Kryo): Unit = {
kryoWrite(o, new FileOutputStream(fn), includeClass)
}
def kryoWrite(o: Object, out: Path)(implicit kryo: Kryo): Unit =
kryoWrite(o, out, includeClass = false)

def kryoWrite(o: Object, out: Path, includeClass: Boolean)(implicit kryo: Kryo): Unit =
kryoWrite(o, out.outputStream, includeClass)

def kryoBytes(o: Object)(implicit kryo: Kryo): Array[Byte] =
kryoBytes(o, includeClass = false)

def kryoBytes(o: Object)(implicit kryo: Kryo): Array[Byte] = kryoBytes(o, includeClass = false)
def kryoBytes(o: Object, includeClass: Boolean)(implicit kryo: Kryo): Array[Byte] = {
val baos = new ByteArrayOutputStream()
kryoWrite(o, baos, includeClass)
Expand Down
Expand Up @@ -8,6 +8,6 @@ import org.apache.spark.serializer.JavaSerializer
trait JavaSerializerSuite
extends SparkSuite {
sparkConf(
"spark.serializer" classOf[JavaSerializer].getCanonicalName
"spark.serializer" classOf[JavaSerializer].getName
)
}
@@ -1,8 +1,6 @@
package org.hammerlab.spark.test.suite

import org.apache.spark.serializer.KryoRegistrator
import org.hammerlab.kryo.spark.Registrator
import org.hammerlab.spark.confs
import org.hammerlab.spark.SelfRegistrar

/**
* Base for test-suites that rely on Kryo serialization, including registering classes for serialization in a
Expand All @@ -11,7 +9,4 @@ import org.hammerlab.spark.confs
class KryoSparkSuite(override val registrationRequired: Boolean = true,
override val referenceTracking: Boolean = false)
extends SparkSuite
with Registrator
with confs.Kryo {
override def registrar: Class[_ <: KryoRegistrator] = getClass
}
with SelfRegistrar
Expand Up @@ -2,7 +2,6 @@ package org.hammerlab.spark.test.suite

import java.lang.System.{ clearProperty, getProperty, setProperty }

import org.apache.spark.serializer.KryoRegistrator
import org.hammerlab.spark.{ SparkConfBase, confs }
import org.hammerlab.test.Suite

Expand All @@ -14,7 +13,7 @@ import scala.collection.mutable
*
* Sets Spark configuration settings, including Kryo-serde with required registration, through system properties.
*/
class MainSuite(override val registrar: Class[_ <: KryoRegistrator] = null)
class MainSuite
extends Suite
with SparkConfBase
with TestConfs
Expand Down
Expand Up @@ -5,7 +5,7 @@ import org.hammerlab.test.Suite
/**
* Base for test suites that shar one [[org.apache.spark.SparkContext]] across all test cases.
*/
trait SparkSuite
abstract class SparkSuite
extends Suite
with SparkSuiteBase {

Expand Down
@@ -0,0 +1,47 @@
package org.hammerlab.spark.test

import org.hammerlab.spark.test.suite.SparkSuite

class AtomicLongAccumulatorTest
extends SparkSuite {

test("instance") {
val acc = AtomicLongAccumulator()
sc.register(acc, "acc")
sc
.parallelize(
1 to 10,
numSlices = 4
)
.map {
n
acc.add(n)
n.toString
}
.collect should be(1 to 10 map(_.toString))

acc.value should be(55)
}

test("static") {
import AtomicLongAccumulatorTest.acc
sc.register(acc, "acc")
sc
.parallelize(
1 to 10,
numSlices = 4
)
.map {
n
acc.add(n)
n.toString
}
.collect should be(1 to 10 map(_.toString))

acc.value should be(55)
}
}

object AtomicLongAccumulatorTest {
val acc = AtomicLongAccumulator()
}
Expand Up @@ -6,11 +6,13 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.rdd.RDD
import org.hammerlab.hadoop.splits.UnsplittableSequenceFileInputFormat
import org.hammerlab.paths.Path
import org.hammerlab.spark.test.suite.JavaSerializerSuite

import scala.reflect.ClassTag

class RDDSerializationTest
extends RDDSerialization {
extends RDDSerialization
with JavaSerializerSuite {
override protected def serializeRDD[T: ClassTag](rdd: RDD[T],
path: Path): RDD[T] = {
rdd
Expand Down

0 comments on commit f17f98a

Please sign in to comment.