Skip to content

Commit

Permalink
FLUME-1729. New Flume-Spark integration.
Browse files Browse the repository at this point in the history
Avro does not support inheritance, so the error message needs to be part of the message itself.
  • Loading branch information
harishreedharan committed Jun 18, 2014
1 parent bda01fc commit 4b0c7fc
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 67 deletions.
1 change: 1 addition & 0 deletions external/flume-sink/src/main/avro/sparkflume.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ protocol SparkFlumeProtocol {
}

record EventBatch {
string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
string sequenceNumber;
array<SparkSinkEvent> events;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.{TimeUnit, CountDownLatch, Callable}
import scala.util.control.Breaks

import org.apache.flume.{Transaction, Channel}
import org.apache.spark.flume.{SparkSinkEvent, ErrorEventBatch, EventBatch}
import org.apache.spark.flume.{SparkSinkEvent, EventBatch}
import org.slf4j.LoggerFactory


Expand All @@ -50,7 +50,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
private val LOG = LoggerFactory.getLogger(classOf[TransactionProcessor])

// If a real batch is not returned, we always have to return an error batch.
@volatile private var eventBatch: EventBatch = new ErrorEventBatch("Unknown Error")
@volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
util.Collections.emptyList())

// Synchronization primitives
val batchGeneratedLatch = new CountDownLatch(1)
Expand All @@ -70,7 +71,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
/**
* Get an event batch from the channel. This method will block until a batch of events is
* available from the channel. If no events are available after a large number of attempts of
* polling the channel, this method will return [[ErrorEventBatch]].
* polling the channel, this method will return an [[EventBatch]] with a non-empty error message
*
* @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
* maximum of maxBatchSize events
Expand All @@ -96,16 +97,14 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,

/**
* Populates events into the event batch. If the batch cannot be populated,
* this method will not set the event batch which will stay [[ErrorEventBatch]]
* this method will not set the events into the event batch, but it sets an error message.
*/
private def populateEvents() {
try {
txOpt = Option(channel.getTransaction)
if(txOpt.isEmpty) {
assert(eventBatch.isInstanceOf[ErrorEventBatch])
eventBatch.asInstanceOf[ErrorEventBatch].message = "Something went wrong. Channel was " +
"unable to create a transaction!"
eventBatch
eventBatch.setErrorMsg("Something went wrong. Channel was " +
"unable to create a transaction!")
}
txOpt.map(tx => {
tx.begin()
Expand Down Expand Up @@ -135,16 +134,16 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
val msg = "Tried several times, " +
"but did not get any events from the channel!"
LOG.warn(msg)
eventBatch.asInstanceOf[ErrorEventBatch].message = msg
eventBatch.setErrorMsg(msg)
} else {
// At this point, the events are available, so fill them into the event batch
eventBatch = new EventBatch(seqNum, events)
eventBatch = new EventBatch("",seqNum, events)
}
})
} catch {
case e: Exception =>
LOG.error("Error while processing transaction.", e)
eventBatch.asInstanceOf[ErrorEventBatch].message = e.getMessage
eventBatch.setErrorMsg(e.getMessage)
try {
txOpt.map(tx => {
rollbackAndClose(tx, close = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.avro.ipc.specific.SpecificRequestor
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory

import org.apache.spark.Logging
import org.apache.spark.flume.{EventBatch, ErrorEventBatch, SparkSinkEvent, SparkFlumeProtocol}
import org.apache.spark.flume.{EventBatch, SparkSinkEvent, SparkFlumeProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
Expand Down Expand Up @@ -99,37 +99,40 @@ private[streaming] class FlumePollingReceiver(
counter = counter % connections.size
val client = connections(counter).client
counter += 1
client.getEventBatch(maxBatchSize) match {
case errorBatch: ErrorEventBatch =>
logWarning("Error Event Batch received from Spark Sink. " + errorBatch.message)
case batch: EventBatch =>
val seq = batch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = batch.getEvents
logDebug(
"Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
events.foreach(event => {
store(SparkFlumePollingEvent.fromSparkSinkEvent(event))
})
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
} catch {
case e: Throwable =>
try {
// Let Flume know that the events need to be pushed back into the channel.
client.nack(seq) // If the agent is down, even this could fail and throw
} catch {
case e: Throwable => logError(
"Sending Nack also failed. A Flume agent is down.")
}
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
logWarning("Error while attempting to store events", e)
}
val eventBatch = client.getEventBatch(maxBatchSize)
val errorMsg = eventBatch.getErrorMsg
if (errorMsg.toString.equals("")) { // No error, proceed with processing data
val seq = eventBatch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
logDebug(
"Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
events.foreach(event => {
store(SparkFlumePollingEvent.fromSparkSinkEvent(event))
})
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
} catch {
case e: Exception =>
try {
// Let Flume know that the events need to be pushed back into the channel.
client.nack(seq) // If the agent is down, even this could fail and throw
} catch {
case e: Exception => logError(
"Sending Nack also failed. A Flume agent is down.")
}
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
logWarning("Error while attempting to store events", e)
}
} else {
logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
"" + errorMsg.toString)
}
}
}
}

// Create multiple threads and start all of them.
for (i <- 0 until parallelism) {
logInfo("Starting Flume Polling Receiver worker threads starting..")
Expand Down

0 comments on commit 4b0c7fc

Please sign in to comment.