Skip to content

Commit

Permalink
Removed SynchBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 10, 2016
1 parent 9caec83 commit 458199b
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.util

import scala.collection.mutable
import java.util.concurrent.ConcurrentLinkedQueue

import scala.util.control.NonFatal

import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -71,7 +72,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with

// There should be only on progress event as batch has been processed
assert(listener.progressStatuses.size === 1)
val status = listener.progressStatuses.head
val status = listener.progressStatuses.peek()
assert(status != null)
assert(status.active == true)
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
Expand Down Expand Up @@ -167,8 +168,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with

@volatile var startStatus: QueryStatus = null
@volatile var terminationStatus: QueryStatus = null
val progressStatuses = new mutable.ArrayBuffer[QueryStatus]
with mutable.SynchronizedBuffer[QueryStatus]
val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]

def reset(): Unit = {
startStatus = null
Expand All @@ -195,7 +195,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
override def onQueryProgress(queryProgress: QueryProgress): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
progressStatuses += QueryStatus(queryProgress.query)
progressStatuses.add(QueryStatus(queryProgress.query))
}
}

Expand Down

0 comments on commit 458199b

Please sign in to comment.