Skip to content

Commit

Permalink
[SPARK-28770][CORE][TEST] Fix ReplayListenerSuite tests that sometime…
Browse files Browse the repository at this point in the history
…s fail

### What changes were proposed in this pull request?

`ReplayListenerSuite` depends on a listener class to listen for replayed events. This class was implemented by extending `EventLoggingListener`. `EventLoggingListener` does not log executor metrics update events, but uses them to update internal state; on a stage completion event, it then logs stage executor metrics events using this internal state. As executor metrics update events do not get written to the event log, they do not get replayed. The internal state of the replay listener can therefore be different from the original listener, leading to different stage completion events being logged.

We reimplement the replay listener to simply buffer each and every event it receives. This makes it a simpler yet better tool for verifying the events that get sent through the ReplayListenerBus.

### Why are the changes needed?

As explained above. Tests sometimes fail due to events being received by the `EventLoggingListener` that do not get logged (and thus do not get replayed) but influence other events that get logged.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests.

Closes #25673 from wypoon/SPARK-28770.

Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
  • Loading branch information
wypoon authored and squito committed Sep 5, 2019
1 parent 0647906 commit 151b954
Showing 1 changed file with 16 additions and 20 deletions.
Expand Up @@ -21,12 +21,14 @@ import java.io._
import java.net.URI
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec}
import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}
Expand Down Expand Up @@ -62,7 +64,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
val eventMonster = new EventMonster(conf)
val eventMonster = new EventBufferingListener
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
Expand Down Expand Up @@ -108,7 +110,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val replayer = new ReplayListenerBus()

val eventMonster = new EventMonster(conf)
val eventMonster = new EventBufferingListener
replayer.addListener(eventMonster)

// Verify the replay returns the events given the input maybe truncated.
Expand Down Expand Up @@ -145,7 +147,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
val eventMonster = new EventMonster(conf)
val eventMonster = new EventBufferingListener
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
Expand Down Expand Up @@ -207,7 +209,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

// Replay events
val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
val eventMonster = new EventMonster(conf)
val eventMonster = new EventBufferingListener
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
Expand All @@ -219,11 +221,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
// Verify the same events are replayed in the same order
assert(sc.eventLogger.isDefined)
val originalEvents = sc.eventLogger.get.loggedEvents
.map(JsonProtocol.sparkEventFromJson(_))
val replayedEvents = eventMonster.loggedEvents
.map(JsonProtocol.sparkEventFromJson(_))
originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
// Don't compare the JSON here because accumulators in StageInfo may be out of order
JsonProtocolSuite.assertEquals(
JsonProtocol.sparkEventFromJson(e1), JsonProtocol.sparkEventFromJson(e2))
JsonProtocolSuite.assertEquals(e1, e1)
}
}

Expand All @@ -235,21 +237,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp

/**
* A simple listener that buffers all the events it receives.
*
* The event buffering functionality must be implemented within EventLoggingListener itself.
* This is because of the following race condition: the event may be mutated between being
* processed by one listener and being processed by another. Thus, in order to establish
* a fair comparison between the original events and the replayed events, both functionalities
* must be implemented within one listener (i.e. the EventLoggingListener).
*
* This child listener inherits only the event buffering functionality, but does not actually
* log the events.
*/
private class EventMonster(conf: SparkConf)
extends EventLoggingListener("test", None, new URI("testdir"), conf) {
private class EventBufferingListener extends SparkFirehoseListener {

override def start() { }
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

override def onEvent(event: SparkListenerEvent) {
val eventJson = JsonProtocol.sparkEventToJson(event)
loggedEvents += eventJson
}
}

/*
Expand Down

0 comments on commit 151b954

Please sign in to comment.