Skip to content

Commit

Permalink
SPARK-1729. New Spark-Flume integration.
Browse files Browse the repository at this point in the history
Minor formatting changes.
  • Loading branch information
harishreedharan committed Jun 10, 2014
1 parent 70bcc2a commit 3c23c18
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.flume.{Channel, Transaction, FlumeException, Context}
import org.slf4j.LoggerFactory

import org.apache.spark.flume.{SparkSinkEvent, EventBatch, SparkFlumeProtocol}

/**
* A sink that uses Avro RPC to run a server that can be polled by Spark's
* FlumePollingInputDStream. This sink has the following configuration parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ private[streaming] class FlumePollingReceiver(
logDebug("Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
events.foreach(event => store(SparkFlumePollingEvent.fromSparkSinkEvent(event)))
events.foreach(event => {
store(SparkFlumePollingEvent.fromSparkSinkEvent(event))
})
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
} catch {
Expand Down Expand Up @@ -153,7 +155,7 @@ private[streaming] class FlumePollingReceiver(
* @param client The client that the callbacks are received on.
*/
private class FlumeConnection(val transceiver: NettyTransceiver,
val client: SparkFlumeProtocol.Callback)
val client: SparkFlumeProtocol.Callback)

private[streaming] object SparkFlumePollingEvent {
def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = {
Expand All @@ -162,13 +164,14 @@ private[streaming] object SparkFlumePollingEvent {
event
}
}

/*
* Unfortunately Avro does not allow including pre-compiled classes - so even though
* SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
* around that to make it externalizable.
*/
class SparkFlumePollingEvent() extends Externalizable with Logging {
var event : SparkSinkEvent = new SparkSinkEvent()
var event: SparkSinkEvent = new SparkSinkEvent()

/* De-serialize from bytes. */
def readExternal(in: ObjectInput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,19 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
}

def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) {
outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val executor = Executors.newCachedThreadPool()
val executorCompletion = new ExecutorCompletionService[Void](executor)
channels.map(channel => {
executorCompletion.submit(new TxnSubmitter(channel, clock))
})
for(i <- 0 until channels.size) {
for (i <- 0 until channels.size) {
executorCompletion.take()
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < 5 * channels.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
while (outputBuffer.size < 5 * channels.size &&
System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size)
Thread.sleep(100)
}
Expand Down Expand Up @@ -164,7 +165,8 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
val tx = channel.getTransaction
tx.begin()
for (j <- 0 until 5) {
channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes("utf-8"),
channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
"utf-8"),
Map[String, String]("test-" + t.toString -> "header")))
t += 1
}
Expand All @@ -176,4 +178,5 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
null
}
}

}

0 comments on commit 3c23c18

Please sign in to comment.