Skip to content

Commit

Permalink
Merge pull request #19 from ryan-williams/h
Browse files Browse the repository at this point in the history
move JavaSerialization to test-utils
  • Loading branch information
ryan-williams committed Aug 15, 2017
2 parents fa24209 + cbecca9 commit 6a701f5
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -6,7 +6,7 @@ jdk:
scala:
- 2.11.8

script: sbt ++$TRAVIS_SCALA_VERSION clean test
script: sbt ++$TRAVIS_SCALA_VERSION clean coverageTest

cache:
directories:
Expand Down
6 changes: 2 additions & 4 deletions build.sbt
@@ -1,16 +1,14 @@
name := "spark-tests"

version := "2.1.0"
version := "2.2.0"

deps ++= Seq(
paths % "1.2.0",
scalatest,
spark,
spark_util % "1.2.1",
spark_util % "1.3.0",
testUtils
)

testUtilsVersion := "1.3.0"

// Don't include default parent-plugin test-deps
testDeps := Nil
2 changes: 1 addition & 1 deletion project/plugins.sbt
@@ -1 +1 @@
addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.0.0")
addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.1.1")
5 changes: 5 additions & 0 deletions src/main/resources/log4j.properties
@@ -0,0 +1,5 @@
log4j.rootLogger = WARN, out

log4j.appender.out = org.apache.log4j.ConsoleAppender
log4j.appender.out.layout = org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern = %d (%t) [%p] %m%n
53 changes: 33 additions & 20 deletions src/main/scala/org/hammerlab/spark/test/rdd/RDDSerialization.scala
@@ -1,8 +1,9 @@
package org.hammerlab.spark.test.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{ GetFileSplit, RDD }
import org.hammerlab.paths.Path
import org.hammerlab.spark.test.suite.SparkSuite
import org.hammerlab.hadoop.splits.PartFileBasename

import scala.reflect.ClassTag

Expand Down Expand Up @@ -43,34 +44,46 @@ trait RDDSerialization
else
origFileSizes

val numPartitions =
if (fileSizes.isEmpty)
4
else
fileSizes.size
val numPartitions = fileSizes.size

val path = tmpPath()

val rdd = sc.parallelize(elems, numPartitions)

serializeRDD[T](rdd, path)

if (fileSizes.nonEmpty) {
val fileSizeMap =
fileSizes
.zipWithIndex
.map(p => "part-%05d".format(p._2) p._1)
.toMap
val fileSizeMap =
fileSizes
.zipWithIndex
.map {
case (size, idx)
PartFileBasename(idx)
size
}
.toMap

path
.list("part-*")
.map(
p
p.basename p.size
)
.toMap should be(fileSizeMap)

path
.list("part-*")
val after = deserializeRDD[T](path)

after.getNumPartitions should be(numPartitions)
after
.partitions
.map(
GetFileSplit(_).path
) should be(
(0 until numPartitions)
.map(
p
p.basename p.size
i
path / PartFileBasename(i)
)
.toMap should be(fileSizeMap)
}

deserializeRDD[T](path).collect() should be(elems.toArray)
)
after.collect() should be(elems.toArray)
}
}

This file was deleted.

@@ -0,0 +1,64 @@
package org.hammerlab.spark.test.rdd
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream }

import org.apache.hadoop.io.{ BytesWritable, NullWritable }
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.rdd.RDD
import org.hammerlab.hadoop.splits.UnsplittableSequenceFileInputFormat
import org.hammerlab.paths.Path

import scala.reflect.ClassTag

class RDDSerializationTest
extends RDDSerialization {
override protected def serializeRDD[T: ClassTag](rdd: RDD[T],
path: Path): RDD[T] = {
rdd
.map {
t
NullWritable.get {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(t)
oos.close()
new BytesWritable(bos.toByteArray)
}
}
.saveAsHadoopFile[
SequenceFileOutputFormat[
NullWritable,
BytesWritable
]
](
path.toString
)

rdd
}

override protected def deserializeRDD[T: ClassTag](path: Path): RDD[T] =
sc
.hadoopFile[
NullWritable,
BytesWritable,
UnsplittableSequenceFileInputFormat[
NullWritable,
BytesWritable
]
](
path.toString
)
.values
.map {
bytes
val bis = new ByteArrayInputStream(bytes.getBytes)
val ois = new ObjectInputStream(bis)
val t = ois.readObject().asInstanceOf[T]
ois.close()
t
}

test("ints round trip") {
verifyFileSizesAndSerde(1 to 1000, 23565)
}
}
@@ -1,7 +1,7 @@
package org.hammerlab.spark.test.serde

import org.hammerlab.test.version.Util
import org.hammerlab.spark.test.serde.JavaSerialization._
import org.hammerlab.test.serde.JavaSerialization._
import org.hammerlab.spark.test.serde.util.{Foo, FooRegistrarTest, HasKryoSuite}

class SerializationSizeTest
Expand Down

0 comments on commit 6a701f5

Please sign in to comment.