Skip to content

Commit

Permalink
SPARK-1729. Some test changes and changes to utils classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Jul 15, 2014
1 parent 9fd0da7 commit 120e2a1
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 61 deletions.
5 changes: 5 additions & 0 deletions external/flume-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
})
// Wait until a batch is available - will be an error if error message is non-empty
val batch = processor.getEventBatch
if (batch.getErrorMsg != null && !batch.getErrorMsg.equals("")) {
processorMap.put(sequenceNumber, processor)
if (!SparkSinkUtils.isErrorBatch(batch)) {
processorMap.put(sequenceNumber.toString, processor)
}

batch
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.flume.sink

import org.apache.spark.flume.EventBatch

object SparkSinkUtils {
/**
* This method determines if this batch represents an error or not.
* @param batch - The batch to check
* @return - true if the batch represents an error
*/
def isErrorBatch(batch: EventBatch): Boolean = {
!batch.getErrorMsg.toString.equals("") //If there is an error message, it is an error batch.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.concurrent.{TimeUnit, Executors}

import org.apache.spark.flume.sink.SparkSinkUtils

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -91,52 +93,54 @@ private[streaming] class FlumePollingReceiver(
new FlumeConnection(transceiver, client)
}).toArray

// Threads that pull data from Flume.
val dataReceiver = new Runnable {
override def run(): Unit = {
var counter = 0
while (true) {
counter = counter % connections.size
val client = connections(counter).client
counter += 1
val eventBatch = client.getEventBatch(maxBatchSize)
val errorMsg = eventBatch.getErrorMsg
if (errorMsg.toString.equals("")) { // No error, proceed with processing data
val seq = eventBatch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
logDebug(
"Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
events.foreach(event => {
store(SparkFlumePollingEvent.fromSparkSinkEvent(event))
})
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
} catch {
case e: Exception =>
try {
// Let Flume know that the events need to be pushed back into the channel.
client.nack(seq) // If the agent is down, even this could fail and throw
} catch {
case e: Exception => logError(
"Sending Nack also failed. A Flume agent is down.")
for (i <- 0 until parallelism) {
logInfo("Starting Flume Polling Receiver worker threads starting..")
// Threads that pull data from Flume.
receiverExecutor.submit(new Runnable {
override def run(): Unit = {
var counter = i
while (true) {
counter = counter % (connections.length)
val client = connections(counter).client
counter += 1
val eventBatch = client.getEventBatch(maxBatchSize)
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
// No error, proceed with processing data
val seq = eventBatch.getSequenceNumber
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
logDebug(
"Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
var j = 0
while (j < events.size()) {
store(SparkFlumePollingEvent.fromSparkSinkEvent(events(j)))
logDebug("Stored events with seq:" + seq)
j += 1
}
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
logWarning("Error while attempting to store events", e)
logInfo("Sending ack for: " +seq)
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
logDebug("Ack sent for sequence number: " + seq)
} catch {
case e: Exception =>
try {
// Let Flume know that the events need to be pushed back into the channel.
client.nack(seq) // If the agent is down, even this could fail and throw
} catch {
case e: Exception => logError(
"Sending Nack also failed. A Flume agent is down.")
}
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
logWarning("Error while attempting to store events", e)
}
} else {
logWarning("Did not receive events from Flume agent due to error on the Flume " +
"agent: " + eventBatch.getErrorMsg)
}
} else {
logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
"" + errorMsg.toString)
}
}
}
}

// Create multiple threads and start all of them.
for (i <- 0 until parallelism) {
logInfo("Starting Flume Polling Receiver worker threads starting..")
receiverExecutor.submit(dataReceiver)
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object FlumeUtils {
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
ssc, hostname, port, storageLevel, enableDecompression)

inputStream
}

Expand Down Expand Up @@ -109,6 +109,39 @@ object FlumeUtils {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}

/**
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* This stream will use a batch size of 100 events and run 5 threads to pull data.
* @param host The address of the host on which the Spark Sink is running
* @param port The port that the host is listening on
* @param storageLevel Storage level to use for storing the received objects
*/
def createPollingStream(
ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumePollingEvent] = {
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc,
Seq(new InetSocketAddress(host, port)), 100, 5, storageLevel)
}

/**
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* This stream will use a batch size of 100 events and run 5 threads to pull data.
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
* @param storageLevel Storage level to use for storing the received objects
*/
def createPollingStream (
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumePollingEvent] = {
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, 100, 5, storageLevel)
}

/**
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
Expand All @@ -123,14 +156,48 @@ object FlumeUtils {
def createPollingStream (
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
maxBatchSize: Int = 100,
parallelism: Int = 5,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
maxBatchSize: Int,
parallelism: Int,
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumePollingEvent] = {
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}

/**
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* This stream will use a batch size of 100 events and run 5 threads to pull data.
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
* @param storageLevel Storage level to use for storing the received objects
*/
def createPollingStream (
jssc: JavaStreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumePollingEvent] = {
new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc, addresses, 100, 5,
StorageLevel.MEMORY_AND_DISK_SER_2)
}

/**
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* This stream will use a batch size of 100 events and run 5 threads to pull data.
* @param host The address of the host on which the Spark Sink is running
* @param port The port that the host is listening on
* @param storageLevel Storage level to use for storing the received objects
*/
def createPollingStream(
jssc: JavaStreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumePollingEvent] = {
new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc,
Seq(new InetSocketAddress(host, port)), 100, 5, storageLevel)
}

/**
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
Expand All @@ -142,14 +209,14 @@ object FlumeUtils {
* result in this stream using more threads
* @param storageLevel Storage level to use for storing the received objects
*/
def createJavaPollingStream (
ssc: StreamingContext,
def createPollingStream (
jssc: JavaStreamingContext,
addresses: Seq[InetSocketAddress],
maxBatchSize: Int = 100,
parallelism: Int = 5,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
maxBatchSize: Int,
parallelism: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumePollingEvent] = {
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}

class FlumePollingReceiverSuite extends TestSuiteBase {
class FlumePollingReceiverSuite extends TestSuiteBase {

val testPort = 9999

test("flume polling test") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 5,
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 1,
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
Expand All @@ -66,6 +66,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
sink.start()
ssc.start()
writeAndVerify(Seq(channel), ssc, outputBuffer)
assertQueuesAreEmpty(channel)
sink.stop()
channel.stop()
}
Expand All @@ -75,7 +76,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort),
new InetSocketAddress("localhost", testPort + 1)), 100, 5,
new InetSocketAddress("localhost", testPort + 1)), 100, 2,
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
Expand Down Expand Up @@ -108,9 +109,10 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
sink2.start()
ssc.start()
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
assertQueuesAreEmpty(channel)
assertQueuesAreEmpty(channel2)
sink.stop()
channel.stop()

}

def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
Expand All @@ -126,12 +128,12 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < 5 * channels.size &&
System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
System.currentTimeMillis() - startTime < 15000) {
logInfo("output.size = " + outputBuffer.size)
Thread.sleep(100)
}
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()

Expand All @@ -158,6 +160,13 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
assert(counter === 25 * channels.size)
}

def assertQueuesAreEmpty(channel: MemoryChannel) = {
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
queueRemaining.setAccessible(true)
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
}

private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
override def call(): Void = {
var t = 0
Expand Down

0 comments on commit 120e2a1

Please sign in to comment.