Skip to content

Commit

Permalink
Add tests for RDDOperationScope
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 4, 2015
1 parent 1c310e4 commit 6f9574a
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 2 deletions.
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.rdd

import java.util.concurrent.atomic.AtomicInteger

import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude, JsonPropertyOrder}
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

Expand All @@ -38,9 +39,11 @@ import org.apache.spark.SparkContext
* There is no particular relationship between an operation scope and a stage or a job.
* A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take).
*/
@JsonInclude(Include.NON_NULL)
@JsonPropertyOrder(Array("id", "name", "parent"))
private[spark] class RDDOperationScope(
val name: String,
parent: Option[RDDOperationScope] = None) {
val parent: Option[RDDOperationScope] = None) {

val id: Int = RDDOperationScope.nextScopeId()

Expand All @@ -56,6 +59,16 @@ private[spark] class RDDOperationScope(
def getAllScopes: Seq[RDDOperationScope] = {
parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this)
}

override def equals(other: Any): Boolean = {
other match {
case s: RDDOperationScope =>
id == s.id && name == s.name && parent == s.parent
case _ => false
}
}

override def toString: String = toJson
}

/**
Expand Down
133 changes: 133 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.{TaskContext, Partition, SparkContext}

/**
*
*/
class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
private var sc: SparkContext = null
private val scope1 = new RDDOperationScope("scope1")
private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))

before {
sc = new SparkContext("local", "test")
}

after {
sc.stop()
}

test("getAllScopes") {
assert(scope1.getAllScopes === Seq(scope1))
assert(scope2.getAllScopes === Seq(scope1, scope2))
assert(scope3.getAllScopes === Seq(scope1, scope2, scope3))
}

test("json de/serialization") {
val scope1Json = scope1.toJson
val scope2Json = scope2.toJson
val scope3Json = scope3.toJson
assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""")
assert(scope2Json === s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""")
assert(scope3Json === s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""")
assert(RDDOperationScope.fromJson(scope1Json) === scope1)
assert(RDDOperationScope.fromJson(scope2Json) === scope2)
assert(RDDOperationScope.fromJson(scope3Json) === scope3)
}

test("withScope") {
val rdd0: MyCoolRDD = new MyCoolRDD(sc)
var rdd1: MyCoolRDD = null
var rdd2: MyCoolRDD = null
var rdd3: MyCoolRDD = null
RDDOperationScope.withScope(sc, "scope1", allowNesting = false) {
rdd1 = new MyCoolRDD(sc)
RDDOperationScope.withScope(sc, "scope2", allowNesting = false) {
rdd2 = new MyCoolRDD(sc)
RDDOperationScope.withScope(sc, "scope3", allowNesting = false) {
rdd3 = new MyCoolRDD(sc)
}
}
}
assert(rdd0.scope.isEmpty)
assert(rdd1.scope.isDefined)
assert(rdd2.scope.isDefined)
assert(rdd3.scope.isDefined)
assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
}

test("withScope with partial nesting") {
val rdd0: MyCoolRDD = new MyCoolRDD(sc)
var rdd1: MyCoolRDD = null
var rdd2: MyCoolRDD = null
var rdd3: MyCoolRDD = null
RDDOperationScope.withScope(sc, "scope1", allowNesting = true) { // allow nesting here
rdd1 = new MyCoolRDD(sc)
RDDOperationScope.withScope(sc, "scope2", allowNesting = false) { // stop nesting here
rdd2 = new MyCoolRDD(sc)
RDDOperationScope.withScope(sc, "scope3", allowNesting = false) {
rdd3 = new MyCoolRDD(sc)
}
}
}
assert(rdd0.scope.isEmpty)
assert(rdd1.scope.isDefined)
assert(rdd2.scope.isDefined)
assert(rdd3.scope.isDefined)
assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2"))
assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2"))
}

test("withScope with multiple layers of nesting") {
val rdd0: MyCoolRDD = new MyCoolRDD(sc)
var rdd1: MyCoolRDD = null
var rdd2: MyCoolRDD = null
var rdd3: MyCoolRDD = null
RDDOperationScope.withScope(sc, "scope1", allowNesting = true) {
rdd1 = new MyCoolRDD(sc)
RDDOperationScope.withScope(sc, "scope2", allowNesting = true) {
rdd2 = new MyCoolRDD(sc)
RDDOperationScope.withScope(sc, "scope3", allowNesting = true) {
rdd3 = new MyCoolRDD(sc)
}
}
}
assert(rdd0.scope.isEmpty)
assert(rdd1.scope.isDefined)
assert(rdd2.scope.isDefined)
assert(rdd3.scope.isDefined)
assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2"))
assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2", "scope3"))
}

}

private class MyCoolRDD(sc: SparkContext) extends RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
override def compute(p: Partition, context: TaskContext): Iterator[Int] = { Nil.toIterator }
}

0 comments on commit 6f9574a

Please sign in to comment.