Skip to content

Commit

Permalink
Fix complexity in threading model in test
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Aug 19, 2014
1 parent 4df5be6 commit 120b81e
Showing 1 changed file with 37 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, Executors}

import scala.collection.JavaConversions._
import scala.concurrent.{Promise, Future}
import scala.util.{Failure, Success, Try}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.avro.ipc.NettyTransceiver
Expand All @@ -38,7 +38,7 @@ class SparkSinkSuite extends TestSuiteBase {
val channelCapacity = 5000

test("Success") {
val (channel, sink) = initializeChannelAndSink(None)
val (channel, sink) = initializeChannelAndSink()
channel.start()
sink.start()

Expand All @@ -58,7 +58,7 @@ class SparkSinkSuite extends TestSuiteBase {
}

test("Nack") {
val (channel, sink) = initializeChannelAndSink(None)
val (channel, sink) = initializeChannelAndSink()
channel.start()
sink.start()
putEvents(channel, eventsPerBatch)
Expand All @@ -77,8 +77,8 @@ class SparkSinkSuite extends TestSuiteBase {
}

test("Timeout") {
val (channel, sink) = initializeChannelAndSink(Option(Map(SparkSinkConfig
.CONF_TRANSACTION_TIMEOUT -> 1.toString)))
val (channel, sink) = initializeChannelAndSink(Map(SparkSinkConfig
.CONF_TRANSACTION_TIMEOUT -> 1.toString))
channel.start()
sink.start()
putEvents(channel, eventsPerBatch)
Expand All @@ -96,69 +96,67 @@ class SparkSinkSuite extends TestSuiteBase {
}

test("Multiple consumers") {
multipleClients(failSome = false)
testMultipleConsumers(failSome = false)
}

test("Multiple consumers With Some Failures") {
multipleClients(failSome = true)
test("Multiple consumers with some failures") {
testMultipleConsumers(failSome = true)
}

def multipleClients(failSome: Boolean): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
val (channel, sink) = initializeChannelAndSink(None)
def testMultipleConsumers(failSome: Boolean): Unit = {
implicit val executorContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(5))
val (channel, sink) = initializeChannelAndSink()
channel.start()
sink.start()
(1 to 5).map(_ => putEvents(channel, eventsPerBatch))
(1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
val port = sink.getPort
val address = new InetSocketAddress("0.0.0.0", port)

val transAndClient = getTransceiverAndClient(address, 5)
val transceiversAndClients = getTransceiverAndClient(address, 5)
val batchCounter = new CountDownLatch(5)
val counter = new AtomicInteger(0)
transAndClient.foreach(x => {
val promise = Promise[EventBatch]()
val future = promise.future
transceiversAndClients.foreach(x => {
Future {
val client = x._2
var events: EventBatch = null
Try {
events = client.getEventBatch(1000)
if(!failSome || counter.getAndIncrement() % 2 == 0) {
client.ack(events.getSequenceNumber)
} else {
client.nack(events.getSequenceNumber)
}
}.map(_ => promise.success(events)).recover({
case e => promise.failure(e)
})
}
future.onComplete {
case Success(events) => assert(events.getEvents.size() === 1000)
events = client.getEventBatch(1000)
if (!failSome || counter.getAndIncrement() % 2 == 0) {
client.ack(events.getSequenceNumber)
} else {
client.nack(events.getSequenceNumber)
throw new RuntimeException("Sending NACK for failure!")
}
events
}.onComplete {
case Success(events) =>
assert(events.getEvents.size() === 1000)
batchCounter.countDown()
case Failure(t) =>
// Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout
batchCounter.countDown()
case Failure(t) => batchCounter.countDown()
throw t
}
})
batchCounter.await()
executorContext.shutdown()
if(failSome) {
assert(availableChannelSlots(channel) === 3000)
} else {
assertChannelIsEmpty(channel)
}
sink.stop()
channel.stop()
transAndClient.foreach(x => x._1.close())
transceiversAndClients.foreach(x => x._1.close())
}

def initializeChannelAndSink(overrides: Option[Map[String, String]]): (MemoryChannel,
private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty): (MemoryChannel,
SparkSink) = {
val channel = new MemoryChannel()
val channelContext = new Context()

channelContext.put("capacity", channelCapacity.toString)
channelContext.put("transactionCapacity", 1000.toString)
channelContext.put("keep-alive", 0.toString)
overrides.foreach(channelContext.putAll(_))
channelContext.putAll(overrides)
channel.configure(channelContext)

val sink = new SparkSink()
Expand All @@ -173,7 +171,7 @@ class SparkSinkSuite extends TestSuiteBase {
private def putEvents(ch: MemoryChannel, count: Int): Unit = {
val tx = ch.getTransaction
tx.begin()
(1 to count).map(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
(1 to count).foreach(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
tx.commit()
tx.close()
}
Expand All @@ -193,8 +191,8 @@ class SparkSinkSuite extends TestSuiteBase {
})
}

private def assertChannelIsEmpty(channel: MemoryChannel) = {
assert(availableChannelSlots(channel) === 5000)
private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
assert(availableChannelSlots(channel) === channelCapacity)
}

private def availableChannelSlots(channel: MemoryChannel): Int = {
Expand Down

0 comments on commit 120b81e

Please sign in to comment.