-
Notifications
You must be signed in to change notification settings - Fork 1
/
SerializerSuite.scala
126 lines (99 loc) · 3.53 KB
/
SerializerSuite.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package org.hammerlab.genomics.loci.set
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.hammerlab.genomics.loci.set.test.LociSetUtil
import org.hammerlab.genomics.reference.ContigName.Factory
import org.hammerlab.genomics.reference.test.LociConversions._
import org.hammerlab.genomics.reference.test.{ ClearContigNames, LenientContigNameConversions }
import org.hammerlab.genomics.reference.{ Locus, PermissiveRegistrar }
import org.hammerlab.kryo._
import org.hammerlab.spark.test.suite.{ KryoSparkSuite, SparkSerialization }
import scala.collection.mutable
class SerializerSuite
extends KryoSparkSuite(referenceTracking = true)
with SparkSerialization
with LenientContigNameConversions
with ClearContigNames
with LociSetUtil
with Serializable {
import Helpers._
register(
arr[LociSet],
// "a closure that includes a LociSet" parallelizes some Range[Long]s.
cls[Range],
cls[Array[Locus]],
// "make an RDD[LociSet] and an RDD[Contig]" collects some Strings.
cls[Array[String]],
cls[mutable.WrappedArray.ofRef[_]],
PermissiveRegistrar
)
test("make an RDD[LociSet]") {
val sets =
List[LociSet](
"",
"empty:20-20,empty2:30-30",
"20:100-200",
"21:300-400",
"with_dots._and_..underscores11:900-1000",
"X:5-17,X:19-22,Y:50-60",
"chr21:100-200,chr20:0-10,chr20:8-15,chr20:100-120"
)
val rdd = sc.parallelize(sets)
val result = rdd.map(_.toString).collect.toSeq
result should ===(sets.map(_.toString))
}
test("make an RDD[LociSet], and an RDD[Contig]") {
val sets =
List[LociSet](
"",
"empty:20-20,empty2:30-30",
"20:100-200",
"21:300-400",
"X:5-17,X:19-22,Y:50-60",
"chr21:100-200,chr20:0-10,chr20:8-15,chr20:100-120"
)
val rdd = sc.parallelize(sets)
val result =
rdd
.map(mapTask)
.collect
.toSeq
result should ===(sets.map(_("20").toString))
}
test("a closure that includes a LociSet") {
val set: LociSet = "chr21:100-200,chr20:0-10,chr20:8-15,chr20:100-120,empty:10-10"
val setBC = sc.broadcast(set)
val rdd = sc.parallelize[Locus]((0 until 1000).toSeq)
val result = rdd.filter(filterTask(setBC)).collect
result should ===(100 until 200)
}
test("java serialization") {
val loci: LociSet = "chr21:100-200,chr20:0-10,chr20:8-15,chr20:100-120,empty:10-10"
val baos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(baos)
oos.writeObject(loci)
oos.close()
val bytes = baos.toByteArray
val bais = new ByteArrayInputStream(bytes)
val ois = new ObjectInputStream(bais)
val loci2 = ois.readObject().asInstanceOf[LociSet]
loci2 should ===(loci)
}
}
object Helpers {
/**
* Isolate this method in its own object because otherwise ClosureCleaner will attempt to Java-serialize the enclosing
* [[org.hammerlab.test.Suite]], which errors due to a non-serializable [[org.scalatest.Assertions]].AssertionsHelper
* member inherited from [[org.scalatest.FunSuite]]. See https://github.com/scalatest/scalatest/issues/1013.
*/
def mapTask(implicit factory: Factory) =
(set: LociSet) ⇒ {
set("21").contains(5)
// no op
val _ = set("21").ranges // no op
set("20").toString
}
def filterTask(setBC: Broadcast[LociSet])(implicit factory: Factory) =
(locus: Locus) ⇒
setBC.value("chr21").contains(locus)
}