Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ class ReliableKafkaReceiver[
rememberBlockOffsets(blockId)
}

def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_],
numRecordsLimit: Long): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ private[kinesis] class KinesisReceiver[T](
}

/** Callback method called when a block is ready to be pushed / stored. */
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_],
numRecordsLimit: Long): Unit = {
storeBlockWithRanges(blockId,
arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[streaming] trait BlockGeneratorListener {
* thread, that is not synchronized with any other callbacks. Hence it is okay to do long
* blocking operation in this callback.
*/
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_], numRecordsLimit: Long)

/**
* Called when an error has occurred in the BlockGenerator. Can be called form many places
Expand All @@ -78,9 +78,9 @@ private[streaming] class BlockGenerator(
receiverId: Int,
conf: SparkConf,
clock: Clock = new SystemClock()
) extends RateLimiter(conf) with Logging {
) extends RateLimiter(conf, clock) with Logging {

private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any], numRecordsLimit: Long)

/**
* The BlockGenerator can be in 5 possible states, in the order as follows.
Expand Down Expand Up @@ -238,7 +238,8 @@ private[streaming] class BlockGenerator(
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
val numRecordsLimit = sumHistoryThenTrim(clock.getTimeMillis())
newBlock = new Block(blockId, newBlockBuffer, numRecordsLimit)
}
}

Expand Down Expand Up @@ -293,7 +294,7 @@ private[streaming] class BlockGenerator(
}

private def pushBlock(block: Block) {
listener.onPushBlock(block.id, block.buffer)
listener.onPushBlock(block.id, block.buffer, block.numRecordsLimit)
logInfo("Pushed block " + block.id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.spark.streaming.receiver

import scala.collection.mutable.ArrayBuffer

import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Clock


/** Provides waitToPush() method to limit the rate at which receivers consume data.
*
Expand All @@ -32,7 +36,7 @@ import org.apache.spark.{Logging, SparkConf}
*
* @param conf spark configuration
*/
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
private[receiver] abstract class RateLimiter(conf: SparkConf, clock: Clock) extends Logging {

// treated as an upper limit
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
Expand All @@ -57,8 +61,10 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
appendLimitToHistory(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
appendLimitToHistory(newRate)
}
}

Expand All @@ -68,4 +74,57 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
private def getInitialRateLimit(): Long = {
math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit)
}

private[receiver] case class RateLimitSnapshot(limit: Double, ts: Long)

private[receiver] val rateLimitHistory: ArrayBuffer[RateLimitSnapshot] =
ArrayBuffer(RateLimitSnapshot(getInitialRateLimit().toDouble, -1L))

/**
* Logs the rateLimit change history, so that we can do a sum later.
*
* @param rate the new rate
* @param ts at which time the rate changed
*/
private[receiver] def appendLimitToHistory(rate: Double, ts: Long = clock.getTimeMillis()) {
rateLimitHistory.synchronized {
rateLimitHistory += RateLimitSnapshot(rate, ts)
}
}

private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")

/**
* Calculate the upper bound of how many events can be received in a block interval.
* Note this should be called for each block interval once and only once.
*
* @param ts the ending timestamp of a block interval
* @return the upper bound of how many events can be received in a block interval
*/
private[receiver] def sumHistoryThenTrim(ts: Long = clock.getTimeMillis()): Long = {
var sum: Double = 0
rateLimitHistory.synchronized {
// first add a RateLimitSnapshot
// this RateLimitSnapshot will be used as the ending of this block interval and the beginning
// of the next block interval
rateLimitHistory += RateLimitSnapshot(rateLimitHistory.last.limit, ts)

// then do a sum
for (idx <- 0 until rateLimitHistory.length - 1) {
val duration = rateLimitHistory(idx + 1).ts - (if (rateLimitHistory(idx).ts < 0) {
rateLimitHistory.last.ts - blockIntervalMs
}
else {
rateLimitHistory(idx).ts
})
sum += rateLimitHistory(idx).limit * duration
}

// trim the history to the last one
rateLimitHistory.trimStart(rateLimitHistory.length - 1)
}

(sum / 1000).ceil.toLong
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable

/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def store(dataBuffer: ArrayBuffer[T]) {
supervisor.pushArrayBuffer(dataBuffer, None, None)
supervisor.pushArrayBuffer(dataBuffer, None, None, None)
}

/**
Expand All @@ -130,12 +130,12 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None)
supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None, None)
}

/** Store an iterator of received data as a data block into Spark's memory. */
def store(dataIterator: Iterator[T]) {
supervisor.pushIterator(dataIterator, None, None)
supervisor.pushIterator(dataIterator, None, None, None)
}

/**
Expand All @@ -144,12 +144,12 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: java.util.Iterator[T], metadata: Any) {
supervisor.pushIterator(dataIterator.asScala, Some(metadata), None)
supervisor.pushIterator(dataIterator.asScala, Some(metadata), None, None)
}

/** Store an iterator of received data as a data block into Spark's memory. */
def store(dataIterator: java.util.Iterator[T]) {
supervisor.pushIterator(dataIterator.asScala, None, None)
supervisor.pushIterator(dataIterator.asScala, None, None, None)
}

/**
Expand All @@ -158,7 +158,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: Iterator[T], metadata: Any) {
supervisor.pushIterator(dataIterator, Some(metadata), None)
supervisor.pushIterator(dataIterator, Some(metadata), None, None)
}

/**
Expand All @@ -167,7 +167,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* that Spark is configured to use.
*/
def store(bytes: ByteBuffer) {
supervisor.pushBytes(bytes, None, None)
supervisor.pushBytes(bytes, None, None, None)
}

/**
Expand All @@ -176,7 +176,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(bytes: ByteBuffer, metadata: Any) {
supervisor.pushBytes(bytes, Some(metadata), None)
supervisor.pushBytes(bytes, Some(metadata), None, None)
}

/** Report exceptions in receiving data. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,25 @@ private[streaming] abstract class ReceiverSupervisor(
/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]
)

/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
BlockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]
)

/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ private[streaming] class ReceiverSupervisorImpl(
reportError(message, throwable)
}

def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_], numRecordsLimit: Long) {
pushArrayBuffer(arrayBuffer, None, Some(blockId), Some(numRecordsLimit))
}
}
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
Expand All @@ -123,41 +123,49 @@ private[streaming] class ReceiverSupervisorImpl(
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]
) {
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption,
numRecordsLimitOption)
}

/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]
) {
pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption)
pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption,
numRecordsLimitOption)
}

/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]
) {
pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption)
pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption,
numRecordsLimitOption)
}

/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
val blockInfo = ReceivedBlockInfo(streamId, numRecords, numRecordsLimitOption,
metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Option[Long],
numRecordsLimitOption: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
) {

require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")
require(numRecords.isEmpty || numRecords.get >= 0,
"numRecordsOption must not be negative")
require(numRecordsLimitOption.isEmpty || numRecordsLimitOption.get >= 0,
"numRecordsLimitOption must not be negative")

@volatile private var _isBlockIdValid = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ class ReceivedBlockTrackerSuite

/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None, None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,6 @@ class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll {
} else {
new BlockManagerBasedStoreResult(blockId, None)
}
new ReceivedBlockInfo(0, None, None, storeResult)
new ReceivedBlockInfo(0, None, None, None, storeResult)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,22 +298,25 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {

def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]) {
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]) {
byteBuffers += bytes
}

def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]) {
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]) {
iterators += iterator
}

def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]) {
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId],
numRecordsLimitOption: Option[Long]) {
arrayBuffers += arrayBuffer
}

Expand Down Expand Up @@ -341,7 +344,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {

def onGenerateBlock(blockId: StreamBlockId) { }

def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_], numRecordsLimit: Long) {
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
arrayBuffers += bufferOfInts
Thread.sleep(0)
Expand Down
Loading