diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index e79613749f0ce..d65b5cbfc094e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -21,12 +21,14 @@ import java.io._ import java.net.URI import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.ArrayBuffer + import org.apache.hadoop.fs.Path import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec} import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils} @@ -62,7 +64,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) - val eventMonster = new EventMonster(conf) + val eventMonster = new EventBufferingListener try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -108,7 +110,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val replayer = new ReplayListenerBus() - val eventMonster = new EventMonster(conf) + val eventMonster = new EventBufferingListener replayer.addListener(eventMonster) // Verify the replay returns the events given the input maybe truncated. @@ -145,7 +147,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) - val eventMonster = new EventMonster(conf) + val eventMonster = new EventBufferingListener try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -207,7 +209,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Replay events val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) - val eventMonster = new EventMonster(conf) + val eventMonster = new EventBufferingListener try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -219,11 +221,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Verify the same events are replayed in the same order assert(sc.eventLogger.isDefined) val originalEvents = sc.eventLogger.get.loggedEvents + .map(JsonProtocol.sparkEventFromJson(_)) val replayedEvents = eventMonster.loggedEvents + .map(JsonProtocol.sparkEventFromJson(_)) originalEvents.zip(replayedEvents).foreach { case (e1, e2) => - // Don't compare the JSON here because accumulators in StageInfo may be out of order - JsonProtocolSuite.assertEquals( - JsonProtocol.sparkEventFromJson(e1), JsonProtocol.sparkEventFromJson(e2)) + JsonProtocolSuite.assertEquals(e1, e1) } } @@ -235,21 +237,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp /** * A simple listener that buffers all the events it receives. - * - * The event buffering functionality must be implemented within EventLoggingListener itself. - * This is because of the following race condition: the event may be mutated between being - * processed by one listener and being processed by another. Thus, in order to establish - * a fair comparison between the original events and the replayed events, both functionalities - * must be implemented within one listener (i.e. the EventLoggingListener). - * - * This child listener inherits only the event buffering functionality, but does not actually - * log the events. */ - private class EventMonster(conf: SparkConf) - extends EventLoggingListener("test", None, new URI("testdir"), conf) { + private class EventBufferingListener extends SparkFirehoseListener { - override def start() { } + private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + override def onEvent(event: SparkListenerEvent) { + val eventJson = JsonProtocol.sparkEventToJson(event) + loggedEvents += eventJson + } } /*