Skip to content

Commit

Permalink
More progress on the graph method unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed Apr 27, 2016
1 parent 9217787 commit 6110c66
Showing 1 changed file with 208 additions and 4 deletions.
212 changes: 208 additions & 4 deletions src/test/scala/internal/GraphSuite.scala
Expand Up @@ -373,15 +373,219 @@ class GraphSuite extends FunSuite with LocalSparkContext with Logging {
}

test("addGraph") {
sc = new SparkContext("local", "test")
val graph2 = Graph(
sources = Set(SourceId(0), SourceId(1), SourceId(2)),
operators = Map(
NodeId(0) -> DatumOperator(10),
NodeId(1) -> DatumOperator(11),
NodeId(2) -> DatumOperator(12),
NodeId(3) -> DatumOperator(13),
NodeId(4) -> DatumOperator(14),
NodeId(5) -> DatumOperator(15),
NodeId(6) -> DatumOperator(16),
NodeId(7) -> DatumOperator(17),
NodeId(8) -> DatumOperator(18),
NodeId(9) -> DatumOperator(19)
),
dependencies = Map(
NodeId(0) -> Seq(),
NodeId(1) -> Seq(SourceId(1), SourceId(2)),
NodeId(2) -> Seq(),
NodeId(3) -> Seq(SourceId(0)),
NodeId(4) -> Seq(NodeId(1), NodeId(2)),
NodeId(5) -> Seq(NodeId(2), NodeId(3), NodeId(4)),
NodeId(6) -> Seq(SourceId(0), NodeId(1)),
NodeId(7) -> Seq(SourceId(1), NodeId(1), NodeId(6)),
NodeId(8) -> Seq(NodeId(4), NodeId(5)),
NodeId(9) -> Seq(NodeId(0), NodeId(3), NodeId(7), NodeId(8))
),
sinkDependencies = Map(
SinkId(0) -> SourceId(2),
SinkId(1) -> NodeId(4),
SinkId(2) -> NodeId(9)
)
)

require(false)
val (addedGraph, sourceIdMap, sinkIdMap) = graph.addGraph(graph2)

// Make sure the new sink & source ids don't clash with the old ones
require(sinkIdMap.values.toSet.forall(i => !graph.sinks.contains(i)))
require(sourceIdMap.values.toSet.forall(i => !graph.sources.contains(i)))

// Make sure the new node ids don't clash with the old ones
val nodeIdByDatum = addedGraph.operators.toSeq.map(x => (x._2.asInstanceOf[DatumOperator].datum, x._1)).toMap
require((10 to 19).map(i => nodeIdByDatum(i)).forall(i => !graph.nodes.contains(i)))

val expectedGraph = Graph(
sources = Set(
SourceId(1),
SourceId(2),
SourceId(3),
sourceIdMap(SourceId(1)),
sourceIdMap(SourceId(2)),
sourceIdMap(SourceId(0))),
operators = Map(
NodeId(0) -> DatumOperator(0),
NodeId(1) -> DatumOperator(1),
NodeId(2) -> DatumOperator(2),
NodeId(3) -> DatumOperator(3),
NodeId(4) -> DatumOperator(4),
NodeId(5) -> DatumOperator(5),
NodeId(6) -> DatumOperator(6),
NodeId(7) -> DatumOperator(7),
NodeId(8) -> DatumOperator(8),
NodeId(9) -> DatumOperator(9),
nodeIdByDatum(10) -> DatumOperator(10),
nodeIdByDatum(11) -> DatumOperator(11),
nodeIdByDatum(12) -> DatumOperator(12),
nodeIdByDatum(13) -> DatumOperator(13),
nodeIdByDatum(14) -> DatumOperator(14),
nodeIdByDatum(15) -> DatumOperator(15),
nodeIdByDatum(16) -> DatumOperator(16),
nodeIdByDatum(17) -> DatumOperator(17),
nodeIdByDatum(18) -> DatumOperator(18),
nodeIdByDatum(19) -> DatumOperator(19)
),
dependencies = Map(
NodeId(0) -> Seq(),
NodeId(1) -> Seq(SourceId(1), SourceId(2)),
NodeId(2) -> Seq(),
NodeId(3) -> Seq(SourceId(3)),
NodeId(4) -> Seq(NodeId(1), NodeId(2)),
NodeId(5) -> Seq(NodeId(2), NodeId(3), NodeId(4)),
NodeId(6) -> Seq(SourceId(3), NodeId(1)),
NodeId(7) -> Seq(SourceId(1), NodeId(1), NodeId(6)),
NodeId(8) -> Seq(NodeId(4), NodeId(5)),
NodeId(9) -> Seq(NodeId(0), NodeId(3), NodeId(7), NodeId(8)),
nodeIdByDatum(10) -> Seq(),
nodeIdByDatum(11) -> Seq(sourceIdMap(SourceId(1)), sourceIdMap(SourceId(2))),
nodeIdByDatum(12) -> Seq(),
nodeIdByDatum(13) -> Seq(sourceIdMap(SourceId(0))),
nodeIdByDatum(14) -> Seq(nodeIdByDatum(11), nodeIdByDatum(12)),
nodeIdByDatum(15) -> Seq(nodeIdByDatum(12), nodeIdByDatum(13), nodeIdByDatum(14)),
nodeIdByDatum(16) -> Seq(sourceIdMap(SourceId(0)), nodeIdByDatum(11)),
nodeIdByDatum(17) -> Seq(sourceIdMap(SourceId(1)), nodeIdByDatum(11), nodeIdByDatum(16)),
nodeIdByDatum(18) -> Seq(nodeIdByDatum(14), nodeIdByDatum(15)),
nodeIdByDatum(19) -> Seq(nodeIdByDatum(10), nodeIdByDatum(13), nodeIdByDatum(17), nodeIdByDatum(18))
),
sinkDependencies = Map(
SinkId(0) -> SourceId(2),
SinkId(1) -> NodeId(4),
SinkId(2) -> NodeId(9),
sinkIdMap(SinkId(0)) -> sourceIdMap(SourceId(2)),
sinkIdMap(SinkId(1)) -> nodeIdByDatum(14),
sinkIdMap(SinkId(2)) -> nodeIdByDatum(19)
)
)

assert(expectedGraph === addedGraph)
}

test("connectGraph") {
sc = new SparkContext("local", "test")
val graph2 = Graph(
sources = Set(SourceId(0), SourceId(1), SourceId(2)),
operators = Map(
NodeId(0) -> DatumOperator(10),
NodeId(1) -> DatumOperator(11),
NodeId(2) -> DatumOperator(12),
NodeId(3) -> DatumOperator(13),
NodeId(4) -> DatumOperator(14),
NodeId(5) -> DatumOperator(15),
NodeId(6) -> DatumOperator(16),
NodeId(7) -> DatumOperator(17),
NodeId(8) -> DatumOperator(18),
NodeId(9) -> DatumOperator(19)
),
dependencies = Map(
NodeId(0) -> Seq(),
NodeId(1) -> Seq(SourceId(1), SourceId(2)),
NodeId(2) -> Seq(),
NodeId(3) -> Seq(SourceId(0)),
NodeId(4) -> Seq(NodeId(1), NodeId(2)),
NodeId(5) -> Seq(NodeId(2), NodeId(3), NodeId(4)),
NodeId(6) -> Seq(SourceId(0), NodeId(1)),
NodeId(7) -> Seq(SourceId(1), NodeId(1), NodeId(6)),
NodeId(8) -> Seq(NodeId(4), NodeId(5)),
NodeId(9) -> Seq(NodeId(0), NodeId(3), NodeId(7), NodeId(8))
),
sinkDependencies = Map(
SinkId(0) -> SourceId(2),
SinkId(1) -> NodeId(4),
SinkId(2) -> NodeId(9)
)
)

require(false)
val spliceMap = Map[SourceId, SinkId](SourceId(0) -> SinkId(2), SourceId(1) -> SinkId(1))

val (connectedGraph, sourceIdMap, sinkIdMap) = graph.connectGraph(graph2, spliceMap)

// Make sure the new sink & source ids don't clash with the old ones
require(sinkIdMap.values.toSet.forall(i => !graph.sinks.contains(i)))
require(sourceIdMap.values.toSet.forall(i => !graph.sources.contains(i)))

// Make sure the new node ids don't clash with the old ones
val nodeIdByDatum = connectedGraph.operators.toSeq.map(x => (x._2.asInstanceOf[DatumOperator].datum, x._1)).toMap
require((10 to 19).map(i => nodeIdByDatum(i)).forall(i => !graph.nodes.contains(i)))

val expectedGraph = Graph(
sources = Set(
SourceId(1),
SourceId(2),
SourceId(3),
sourceIdMap(SourceId(2))),
operators = Map(
NodeId(0) -> DatumOperator(0),
NodeId(1) -> DatumOperator(1),
NodeId(2) -> DatumOperator(2),
NodeId(3) -> DatumOperator(3),
NodeId(4) -> DatumOperator(4),
NodeId(5) -> DatumOperator(5),
NodeId(6) -> DatumOperator(6),
NodeId(7) -> DatumOperator(7),
NodeId(8) -> DatumOperator(8),
NodeId(9) -> DatumOperator(9),
nodeIdByDatum(10) -> DatumOperator(10),
nodeIdByDatum(11) -> DatumOperator(11),
nodeIdByDatum(12) -> DatumOperator(12),
nodeIdByDatum(13) -> DatumOperator(13),
nodeIdByDatum(14) -> DatumOperator(14),
nodeIdByDatum(15) -> DatumOperator(15),
nodeIdByDatum(16) -> DatumOperator(16),
nodeIdByDatum(17) -> DatumOperator(17),
nodeIdByDatum(18) -> DatumOperator(18),
nodeIdByDatum(19) -> DatumOperator(19)
),
dependencies = Map(
NodeId(0) -> Seq(),
NodeId(1) -> Seq(SourceId(1), SourceId(2)),
NodeId(2) -> Seq(),
NodeId(3) -> Seq(SourceId(3)),
NodeId(4) -> Seq(NodeId(1), NodeId(2)),
NodeId(5) -> Seq(NodeId(2), NodeId(3), NodeId(4)),
NodeId(6) -> Seq(SourceId(3), NodeId(1)),
NodeId(7) -> Seq(SourceId(1), NodeId(1), NodeId(6)),
NodeId(8) -> Seq(NodeId(4), NodeId(5)),
NodeId(9) -> Seq(NodeId(0), NodeId(3), NodeId(7), NodeId(8)),
nodeIdByDatum(10) -> Seq(),
nodeIdByDatum(11) -> Seq(NodeId(4), sourceIdMap(SourceId(2))),
nodeIdByDatum(12) -> Seq(),
nodeIdByDatum(13) -> Seq(NodeId(9)),
nodeIdByDatum(14) -> Seq(nodeIdByDatum(11), nodeIdByDatum(12)),
nodeIdByDatum(15) -> Seq(nodeIdByDatum(12), nodeIdByDatum(13), nodeIdByDatum(14)),
nodeIdByDatum(16) -> Seq(NodeId(9), nodeIdByDatum(11)),
nodeIdByDatum(17) -> Seq(NodeId(4), nodeIdByDatum(11), nodeIdByDatum(16)),
nodeIdByDatum(18) -> Seq(nodeIdByDatum(14), nodeIdByDatum(15)),
nodeIdByDatum(19) -> Seq(nodeIdByDatum(10), nodeIdByDatum(13), nodeIdByDatum(17), nodeIdByDatum(18))
),
sinkDependencies = Map(
SinkId(0) -> SourceId(2),
sinkIdMap(SinkId(0)) -> sourceIdMap(SourceId(2)),
sinkIdMap(SinkId(1)) -> nodeIdByDatum(14),
sinkIdMap(SinkId(2)) -> nodeIdByDatum(19)
)
)

assert(expectedGraph === connectedGraph)
}

test("replaceNodes") {
Expand Down

0 comments on commit 6110c66

Please sign in to comment.