Skip to content

Commit

Permalink
SPARK-1729. Use LinkedBlockingQueue instead of ArrayBuffer to keep tr…
Browse files Browse the repository at this point in the history
…ack of connections.
  • Loading branch information
harishreedharan committed Jul 15, 2014
1 parent 120e2a1 commit 393bd94
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.streaming.flume
import java.io.{ObjectOutput, ObjectInput, Externalizable}
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.concurrent.{TimeUnit, Executors}
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}

import org.apache.spark.flume.sink.SparkSinkUtils

Expand All @@ -33,7 +33,7 @@ import org.apache.avro.ipc.specific.SpecificRequestor
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory

import org.apache.spark.Logging
import org.apache.spark.flume.{EventBatch, SparkSinkEvent, SparkFlumeProtocol}
import org.apache.spark.flume.{SparkSinkEvent, SparkFlumeProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
Expand Down Expand Up @@ -83,60 +83,64 @@ private[streaming] class FlumePollingReceiver(
lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())

private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later
private val connections = new LinkedBlockingQueue[FlumeConnection]()

override def onStart(): Unit = {
// Create the connections to each Flume agent.
connections = addresses.map(host => {
addresses.foreach(host => {
val transceiver = new NettyTransceiver(host, channelFactory)
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
new FlumeConnection(transceiver, client)
}).toArray

connections.add(new FlumeConnection(transceiver, client))
})
for (i <- 0 until parallelism) {
logInfo("Starting Flume Polling Receiver worker threads starting..")
// Threads that pull data from Flume.
receiverExecutor.submit(new Runnable {
override def run(): Unit = {
var counter = i
while (true) {
counter = counter % (connections.length)
val client = connections(counter).client
counter += 1
val eventBatch = client.getEventBatch(maxBatchSize)
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
// No error, proceed with processing data
val seq = eventBatch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
logDebug(
"Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
var j = 0
while (j < events.size()) {
store(SparkFlumePollingEvent.fromSparkSinkEvent(events(j)))
logDebug("Stored events with seq:" + seq)
j += 1
}
logInfo("Sending ack for: " +seq)
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
logDebug("Ack sent for sequence number: " + seq)
} catch {
case e: Exception =>
try {
// Let Flume know that the events need to be pushed back into the channel.
client.nack(seq) // If the agent is down, even this could fail and throw
} catch {
case e: Exception => logError(
"Sending Nack also failed. A Flume agent is down.")
val connection = connections.poll()
val client = connection.client
try {
val eventBatch = client.getEventBatch(maxBatchSize)
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
// No error, proceed with processing data
val seq = eventBatch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
logDebug(
"Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
var j = 0
while (j < events.size()) {
store(SparkFlumePollingEvent.fromSparkSinkEvent(events(j)))
logDebug("Stored events with seq:" + seq)
j += 1
}
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
logWarning("Error while attempting to store events", e)
logDebug("Sending ack for: " +seq)
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
logDebug("Ack sent for sequence number: " + seq)
} catch {
case e: Exception =>
try {
// Let Flume know that the events need to be pushed back into the channel.
client.nack(seq) // If the agent is down, even this could fail and throw
} catch {
case e: Exception => logError(
"Sending Nack also failed. A Flume agent is down.")
}
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
logWarning("Error while attempting to store events", e)
}
} else {
logWarning("Did not receive events from Flume agent due to error on the Flume " +
"agent: " + eventBatch.getErrorMsg)
}
} else {
logWarning("Did not receive events from Flume agent due to error on the Flume " +
"agent: " + eventBatch.getErrorMsg)
} catch {
case e: Exception =>
logWarning("Error while reading data from Flume", e)
} finally {
connections.add(connection)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
sink.start()
ssc.start()
writeAndVerify(Seq(channel), ssc, outputBuffer)
assertQueuesAreEmpty(channel)
assertChannelIsEmpty(channel)
sink.stop()
channel.stop()
}
Expand All @@ -76,7 +76,7 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort),
new InetSocketAddress("localhost", testPort + 1)), 100, 2,
new InetSocketAddress("localhost", testPort + 1)), 100, 5,
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
Expand Down Expand Up @@ -109,8 +109,8 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
sink2.start()
ssc.start()
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
assertQueuesAreEmpty(channel)
assertQueuesAreEmpty(channel2)
assertChannelIsEmpty(channel)
assertChannelIsEmpty(channel2)
sink.stop()
channel.stop()
}
Expand Down Expand Up @@ -160,7 +160,7 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
assert(counter === 25 * channels.size)
}

def assertQueuesAreEmpty(channel: MemoryChannel) = {
def assertChannelIsEmpty(channel: MemoryChannel) = {
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
queueRemaining.setAccessible(true)
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
Expand Down
1 change: 0 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ object SparkBuild extends PomBuild {
retrieveManaged := true,
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
"spark-streaming-flume-sink",
"spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter",
"spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
"spark-core").map(versionArtifact(_).get intransitive())
Expand Down

0 comments on commit 393bd94

Please sign in to comment.