Skip to content

Commit

Permalink
SPARK-1729. Make Flume pull data from source, rather than the current…
Browse files Browse the repository at this point in the history
… push model

Added support for polling several Flume agents from a single receiver.
  • Loading branch information
harishreedharan committed May 24, 2014
1 parent 87775aa commit 0f10788
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import java.io.{ObjectOutput, ObjectInput, Externalizable}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable

class FlumePollingInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
val host: String,
val port: Int,
val addresses: Seq[InetSocketAddress],
val maxBatchSize: Int,
val parallelism: Int,
storageLevel: StorageLevel
Expand All @@ -47,30 +47,44 @@ class FlumePollingInputDStream[T: ClassTag](
* of a NetworkInputDStream.
*/
override def getReceiver(): Receiver[SparkPollingEvent] = {
new FlumePollingReceiver(host, port, maxBatchSize, parallelism, storageLevel)
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
}
}

private[streaming] class FlumePollingReceiver(
host: String,
port: Int,
addresses: Seq[InetSocketAddress],
maxBatchSize: Int,
parallelism: Int,
storageLevel: StorageLevel
) extends Receiver[SparkPollingEvent](storageLevel) with Logging {

lazy val channelFactoryExecutor =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
setNameFormat("Flume Receiver Channel Thread - %d").build())

lazy val channelFactory =
new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor())
lazy val transceiver = new NettyTransceiver(new InetSocketAddress(host, port), channelFactory)
lazy val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)

lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())

private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later

override def onStart(): Unit = {
val connectionBuilder = new mutable.ArrayBuilder.ofRef[FlumeConnection]()
addresses.map(host => {
val transceiver = new NettyTransceiver(host, channelFactory)
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
connectionBuilder += new FlumeConnection(transceiver, client)
})
connections = connectionBuilder.result()
val dataReceiver = new Runnable {
override def run(): Unit = {
var counter = 0
while (true) {
counter = counter % connections.size
val client = connections(counter).client
counter += 1
val batch = client.getEventBatch(maxBatchSize)
val seq = batch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = batch.getEventBatch
Expand Down Expand Up @@ -104,11 +118,16 @@ private[streaming] class FlumePollingReceiver(
override def onStop(): Unit = {
logInfo("Shutting down Flume Polling Receiver")
receiverExecutor.shutdownNow()
transceiver.close()
connections.map(connection => {
connection.tranceiver.close()
})
channelFactory.releaseExternalResources()
}
}

private class FlumeConnection(val tranceiver: NettyTransceiver,
val client: SparkFlumeProtocol.Callback)

private[streaming] object SparkPollingEvent {
def fromSparkSinkEvent(in: SparkSinkEvent): SparkPollingEvent = {
val event = new SparkPollingEvent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import java.net.InetSocketAddress

object FlumeUtils {
/**
Expand Down Expand Up @@ -72,8 +73,7 @@ object FlumeUtils {
/**
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* @param host The host on which the Flume agent is running
* @param port The port the Spark Sink is accepting connections on
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
* @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
* single RPC call
* @param parallelism Number of concurrent requests this stream should send to the sink. Note
Expand All @@ -83,21 +83,19 @@ object FlumeUtils {
*/
def createPollingStream (
ssc: StreamingContext,
host: String,
port: Int,
addresses: Seq[InetSocketAddress],
maxBatchSize: Int = 100,
parallelism: Int = 5,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkPollingEvent] = {
new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize,
new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}

/**
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* @param host The host on which the Flume agent is running
* @param port The port the Spark Sink is accepting connections on
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
* @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
* single RPC call
* @param parallelism Number of concurrent requests this stream should send to the sink. Note
Expand All @@ -107,13 +105,12 @@ object FlumeUtils {
*/
def createJavaPollingStream (
ssc: StreamingContext,
host: String,
port: Int,
addresses: Seq[InetSocketAddress],
maxBatchSize: Int = 100,
parallelism: Int = 5,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): JavaReceiverInputDStream[SparkPollingEvent] = {
new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize,
new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import org.apache.spark.flume.sink.{SparkSinkConfig, SparkSink}
import scala.collection.JavaConversions._
import org.apache.flume.event.EventBuilder
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}

class FlumePollingReceiverSuite extends TestSuiteBase {

Expand All @@ -38,7 +40,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 1,
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 5,
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
with SynchronizedBuffer[Seq[SparkPollingEvent]]
Expand All @@ -60,42 +62,81 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
sink.setChannel(channel)
sink.start()
ssc.start()
writeAndVerify(Seq(channel), ssc, outputBuffer)
sink.stop()
channel.stop()
}

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
var t = 0
for (i <- 0 until 5) {
val tx = channel.getTransaction
tx.begin()
for (j <- 0 until 5) {
channel.put(EventBuilder.withBody(
String.valueOf(t).getBytes("utf-8"),
Map[String, String]("test-" + t.toString -> "header")))
t += 1
}
test("flume polling test multiple hosts") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort),
new InetSocketAddress("localhost", testPort + 1)), 100, 5,
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
with SynchronizedBuffer[Seq[SparkPollingEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()

// Start the channel and sink.
val context = new Context()
context.put("capacity", "5000")
context.put("transactionCapacity", "1000")
context.put("keep-alive", "0")
val channel = new MemoryChannel()
Configurables.configure(channel, context)

val channel2 = new MemoryChannel()
Configurables.configure(channel2, context)

tx.commit()
tx.close()
Thread.sleep(500) // Allow some time for the events to reach
clock.addToTime(batchDuration.milliseconds)
val sink = new SparkSink()
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
Configurables.configure(sink, context)
sink.setChannel(channel)
sink.start()

val sink2 = new SparkSink()
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
Configurables.configure(sink2, context)
sink2.setChannel(channel2)
sink2.start()
ssc.start()
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
sink.stop()
channel.stop()

}

def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
outputBuffer: ArrayBuffer[Seq[SparkPollingEvent]]) {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val executor = Executors.newCachedThreadPool()
val executorCompletion = new ExecutorCompletionService[Void](executor)
channels.map(channel => {
executorCompletion.submit(new TxnSubmitter(channel, clock))
})
for(i <- 0 until channels.size) {
executorCompletion.take()
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < 5 && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
while (outputBuffer.size < 5 * channels.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size)
Thread.sleep(100)
}
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()
sink.stop()
channel.stop()

val flattenedBuffer = outputBuffer.flatten
assert(flattenedBuffer.size === 25)
assert(flattenedBuffer.size === 25 * channels.size)
var counter = 0
for (i <- 0 until 25) {
val eventToVerify = EventBuilder.withBody(
String.valueOf(i).getBytes("utf-8"),
for (k <- 0 until channels.size; i <- 0 until 25) {
val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
String.valueOf(i)).getBytes("utf-8"),
Map[String, String]("test-" + i.toString -> "header"))
var found = false
var j = 0
Expand All @@ -110,7 +151,26 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
j += 1
}
}
assert (counter === 25)
assert(counter === 25 * channels.size)
}

private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
override def call(): Void = {
var t = 0
for (i <- 0 until 5) {
val tx = channel.getTransaction
tx.begin()
for (j <- 0 until 5) {
channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes("utf-8"),
Map[String, String]("test-" + t.toString -> "header")))
t += 1
}
tx.commit()
tx.close()
Thread.sleep(500) // Allow some time for the events to reach
clock.addToTime(batchDuration.milliseconds)
}
null
}
}
}

0 comments on commit 0f10788

Please sign in to comment.