Skip to content

Commit

Permalink
SPARK-1729. Fixes based on review.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Jun 6, 2014
1 parent c604a3c commit 9741683
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 85 deletions.
2 changes: 1 addition & 1 deletion external/flume-sink/src/main/avro/sparkflume.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protocol SparkFlumeProtocol {

record EventBatch {
string sequenceNumber;
array<SparkSinkEvent> eventBatch;
array<SparkSinkEvent> events;
}

EventBatch getEventBatch (int n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ import org.apache.avro.ipc.NettyServer
import org.apache.avro.ipc.specific.SpecificResponder
import java.net.InetSocketAddress

class SparkSink() extends AbstractSink with Configurable {
class SparkSink extends AbstractSink with Configurable {
private val LOG = LoggerFactory.getLogger(this.getClass)
private val lock = new ReentrantLock()
private val blockingCondition = lock.newCondition()

// This sink will not persist sequence numbers and reuses them if it gets restarted.
// So it is possible to commit a transaction which may have been meant for the sink before the
Expand All @@ -58,19 +56,20 @@ class SparkSink() extends AbstractSink with Configurable {

private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()

private var processorFactory: Option[SparkHandlerFactory] = None
private var processorManager: Option[TransactionProcessorManager] = None
private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
private var port: Int = 0
private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS
private var serverOpt: Option[NettyServer] = None
private var running = false

private val blockingLatch = new CountDownLatch(1)

override def start() {
transactionExecutorOpt = Option(Executors.newFixedThreadPool(numProcessors,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink, " + getName + " Processor Thread - %d").build()))

processorFactory = Option(new SparkHandlerFactory(numProcessors))
processorManager = Option(new TransactionProcessorManager(numProcessors))

val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler())

Expand All @@ -80,12 +79,6 @@ class SparkSink() extends AbstractSink with Configurable {
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))

serverOpt.map(server => server.start())
lock.lock()
try {
running = true
} finally {
lock.unlock()
}
super.start()
}

Expand All @@ -95,65 +88,48 @@ class SparkSink() extends AbstractSink with Configurable {
server.close()
server.join()
})
lock.lock()
try {
running = false
blockingCondition.signalAll()
} finally {
lock.unlock()
}
blockingLatch.countDown()
super.stop()
}

override def configure(ctx: Context) {
import SparkSinkConfig._
hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
val portOpt = Option(ctx.getInteger(CONF_PORT))
if(portOpt.isDefined) {
port = portOpt.get
} else {
throw new ConfigurationException("The Port to bind must be specified")
}
port = Option(ctx.getInteger(CONF_PORT)).
getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
numProcessors = ctx.getInteger(PROCESSOR_COUNT, DEFAULT_PROCESSOR_COUNT)
transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS)
}

override def process(): Status = {
// This method is called in a loop by the Flume framework - block it until the sink is
// stopped to save CPU resources
lock.lock()
try {
while(running) {
blockingCondition.await()
}
} finally {
lock.unlock()
}
// stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
// being shut down.
blockingLatch.await()
Status.BACKOFF
}


// Object representing an empty batch returned by the txn processor due to some error.
case object ErrorEventBatch extends EventBatch

private class AvroCallbackHandler() extends SparkFlumeProtocol {
private class AvroCallbackHandler extends SparkFlumeProtocol {

override def getEventBatch(n: Int): EventBatch = {
val processor = processorFactory.get.checkOut(n)
val processor = processorManager.get.checkOut(n)
transactionExecutorOpt.map(executor => executor.submit(processor))
// Wait until a batch is available - can be null if some error was thrown
val eventBatch = processor.eventQueue.take()
eventBatch match {
processor.eventQueue.take() match {
case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" +
" retrieved from channel.")
case events => {
processorMap.put(events.getSequenceNumber, processor)
case eventBatch: EventBatch =>
processorMap.put(eventBatch.getSequenceNumber, processor)
if (LOG.isDebugEnabled) {
LOG.debug("Sent " + events.getEventBatch.size() +
" events with sequence number: " + events.getSequenceNumber)
LOG.debug("Sent " + eventBatch.getEvents.size() +
" events with sequence number: " + eventBatch.getSequenceNumber)
}
events
}
eventBatch
}
}

Expand Down Expand Up @@ -214,41 +190,23 @@ class SparkSink() extends AbstractSink with Configurable {
tx.begin()
try {
eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet())
val events = eventBatch.getEventBatch
val events = eventBatch.getEvents
events.clear()
val loop = new Breaks
var gotEventsInThisTxn = false
loop.breakable {
var i = 0
// Using for here causes the maxBatchSize change to be ineffective as the Range gets
// pregenerated
while (i < maxBatchSize) {
i += 1
val eventOpt = Option(getChannel.take())
eventOpt.map(event => {
events.add(new SparkSinkEvent(toCharSequenceMap(event
.getHeaders),
ByteBuffer.wrap(event.getBody)))
gotEventsInThisTxn = true
})
if (eventOpt.isEmpty) {
if (!gotEventsInThisTxn) {
// To avoid sending empty batches, we wait till events are available backing off
// between attempts to get events. Each attempt to get an event though causes one
// iteration to be lost. To ensure that we still send back maxBatchSize number of
// events, we cheat and increase the maxBatchSize by 1 to account for the lost
// iteration. Even throwing an exception is expensive as Avro will serialize it
// and send it over the wire, which is useless. Before incrementing though,
// ensure that we are not anywhere near INT_MAX.
if (maxBatchSize >= Int.MaxValue / 2) {
// Random sanity check
throw new RuntimeException("Safety exception - polled too many times, no events!")
while (events.size() < maxBatchSize) {
Option(getChannel.take()) match {
case Some(event) =>
events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
ByteBuffer.wrap(event.getBody)))
gotEventsInThisTxn = true
case None =>
if (!gotEventsInThisTxn) {
Thread.sleep(500)
} else {
loop.break()
}
maxBatchSize += 1
Thread.sleep(500)
} else {
loop.break()
}
}
}
}
Expand Down Expand Up @@ -284,7 +242,7 @@ class SparkSink() extends AbstractSink with Configurable {
} finally {
resultQueueUpdateLock.unlock()
}
eventBatch.getEventBatch.clear()
eventBatch.getEvents.clear()
// If the batch failed on spark side, throw a FlumeException
maybeResult.map(success =>
if (!success) {
Expand Down Expand Up @@ -315,7 +273,7 @@ class SparkSink() extends AbstractSink with Configurable {
// remove the event from the map and then clear the value
resultQueue.clear()
processorMap.remove(eventBatch.getSequenceNumber)
processorFactory.get.checkIn(this)
processorManager.get.checkIn(this)
tx.close()
}
}
Expand All @@ -328,7 +286,7 @@ class SparkSink() extends AbstractSink with Configurable {
}
}

private class SparkHandlerFactory(val maxInstances: Int) {
private class TransactionProcessorManager(val maxInstances: Int) {
val queue = new scala.collection.mutable.Queue[TransactionProcessor]
val queueModificationLock = new ReentrantLock()
var currentSize = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ private[streaming] class FlumePollingReceiver(
private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later

override def onStart(): Unit = {
val connectionBuilder = new mutable.ArrayBuilder.ofRef[FlumeConnection]()
addresses.map(host => {
// Create the connections to each Flume agent.
connections = addresses.map(host => {
val transceiver = new NettyTransceiver(host, channelFactory)
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
connectionBuilder += new FlumeConnection(transceiver, client)
})
connections = connectionBuilder.result()
new FlumeConnection(transceiver, client)
}).toArray

val dataReceiver = new Runnable {
override def run(): Unit = {
var counter = 0
Expand All @@ -93,14 +93,18 @@ private[streaming] class FlumePollingReceiver(
counter += 1
val batch = client.getEventBatch(maxBatchSize)
val seq = batch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = batch.getEventBatch
val events: java.util.List[SparkSinkEvent] = batch.getEvents
logDebug("Received batch of " + events.size() + " events with sequence number: " + seq)
try {
events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event)))
client.ack(seq)
} catch {
case e: Throwable =>
client.nack(seq)
try {
client.nack(seq) // If the agent is down, even this could fail and throw
} catch {
case e: Throwable => logError("Sending Nack also failed. A Flume agent is down.")
}
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
logWarning("Error while attempting to store events", e)
}
Expand Down

0 comments on commit 9741683

Please sign in to comment.