diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala index dbb8a8efa404a..537b56b49f866 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala @@ -19,7 +19,8 @@ package org.apache.spark.rdd import java.util.concurrent.atomic.AtomicInteger -import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude, JsonPropertyOrder} +import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule @@ -38,9 +39,11 @@ import org.apache.spark.SparkContext * There is no particular relationship between an operation scope and a stage or a job. * A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take). */ +@JsonInclude(Include.NON_NULL) +@JsonPropertyOrder(Array("id", "name", "parent")) private[spark] class RDDOperationScope( val name: String, - parent: Option[RDDOperationScope] = None) { + val parent: Option[RDDOperationScope] = None) { val id: Int = RDDOperationScope.nextScopeId() @@ -56,6 +59,16 @@ private[spark] class RDDOperationScope( def getAllScopes: Seq[RDDOperationScope] = { parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this) } + + override def equals(other: Any): Boolean = { + other match { + case s: RDDOperationScope => + id == s.id && name == s.name && parent == s.parent + case _ => false + } + } + + override def toString: String = toJson } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala new file mode 100644 index 0000000000000..d75ecbf1f0b4d --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.{TaskContext, Partition, SparkContext} + +/** + * + */ +class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter { + private var sc: SparkContext = null + private val scope1 = new RDDOperationScope("scope1") + private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1)) + private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2)) + + before { + sc = new SparkContext("local", "test") + } + + after { + sc.stop() + } + + test("getAllScopes") { + assert(scope1.getAllScopes === Seq(scope1)) + assert(scope2.getAllScopes === Seq(scope1, scope2)) + assert(scope3.getAllScopes === Seq(scope1, scope2, scope3)) + } + + test("json de/serialization") { + val scope1Json = scope1.toJson + val scope2Json = scope2.toJson + val scope3Json = scope3.toJson + assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""") + assert(scope2Json === s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""") + assert(scope3Json === s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""") + assert(RDDOperationScope.fromJson(scope1Json) === scope1) + assert(RDDOperationScope.fromJson(scope2Json) === scope2) + assert(RDDOperationScope.fromJson(scope3Json) === scope3) + } + + test("withScope") { + val rdd0: MyCoolRDD = new MyCoolRDD(sc) + var rdd1: MyCoolRDD = null + var rdd2: MyCoolRDD = null + var rdd3: MyCoolRDD = null + RDDOperationScope.withScope(sc, "scope1", allowNesting = false) { + rdd1 = new MyCoolRDD(sc) + RDDOperationScope.withScope(sc, "scope2", allowNesting = false) { + rdd2 = new MyCoolRDD(sc) + RDDOperationScope.withScope(sc, "scope3", allowNesting = false) { + rdd3 = new MyCoolRDD(sc) + } + } + } + assert(rdd0.scope.isEmpty) + assert(rdd1.scope.isDefined) + assert(rdd2.scope.isDefined) + assert(rdd3.scope.isDefined) + assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1")) + assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1")) + assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1")) + } + + test("withScope with partial nesting") { + val rdd0: MyCoolRDD = new MyCoolRDD(sc) + var rdd1: MyCoolRDD = null + var rdd2: MyCoolRDD = null + var rdd3: MyCoolRDD = null + RDDOperationScope.withScope(sc, "scope1", allowNesting = true) { // allow nesting here + rdd1 = new MyCoolRDD(sc) + RDDOperationScope.withScope(sc, "scope2", allowNesting = false) { // stop nesting here + rdd2 = new MyCoolRDD(sc) + RDDOperationScope.withScope(sc, "scope3", allowNesting = false) { + rdd3 = new MyCoolRDD(sc) + } + } + } + assert(rdd0.scope.isEmpty) + assert(rdd1.scope.isDefined) + assert(rdd2.scope.isDefined) + assert(rdd3.scope.isDefined) + assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1")) + assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2")) + assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2")) + } + + test("withScope with multiple layers of nesting") { + val rdd0: MyCoolRDD = new MyCoolRDD(sc) + var rdd1: MyCoolRDD = null + var rdd2: MyCoolRDD = null + var rdd3: MyCoolRDD = null + RDDOperationScope.withScope(sc, "scope1", allowNesting = true) { + rdd1 = new MyCoolRDD(sc) + RDDOperationScope.withScope(sc, "scope2", allowNesting = true) { + rdd2 = new MyCoolRDD(sc) + RDDOperationScope.withScope(sc, "scope3", allowNesting = true) { + rdd3 = new MyCoolRDD(sc) + } + } + } + assert(rdd0.scope.isEmpty) + assert(rdd1.scope.isDefined) + assert(rdd2.scope.isDefined) + assert(rdd3.scope.isDefined) + assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1")) + assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2")) + assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2", "scope3")) + } + +} + +private class MyCoolRDD(sc: SparkContext) extends RDD[Int](sc, Nil) { + override def getPartitions: Array[Partition] = Array.empty + override def compute(p: Partition, context: TaskContext): Iterator[Int] = { Nil.toIterator } +}