Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into unsafe-starve-m…
Browse files Browse the repository at this point in the history
…emory
  • Loading branch information
Andrew Or committed Aug 6, 2015
2 parents 254032e + 3504bf3 commit 5d5afdf
Show file tree
Hide file tree
Showing 58 changed files with 2,467 additions and 1,238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,22 +227,35 @@ public static final class BytesToBytesMapIterator implements Iterator<Location>
private final Iterator<MemoryBlock> dataPagesIterator;
private final Location loc;

private MemoryBlock currentPage;
private MemoryBlock currentPage = null;
private int currentRecordNumber = 0;
private Object pageBaseObject;
private long offsetInPage;

// If this iterator destructive or not. When it is true, it frees each page as it moves onto
// next one.
private boolean destructive = false;
private BytesToBytesMap bmap;

private BytesToBytesMapIterator(
int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc,
boolean destructive, BytesToBytesMap bmap) {
this.numRecords = numRecords;
this.dataPagesIterator = dataPagesIterator;
this.loc = loc;
this.destructive = destructive;
this.bmap = bmap;
if (dataPagesIterator.hasNext()) {
advanceToNextPage();
}
}

private void advanceToNextPage() {
if (destructive && currentPage != null) {
dataPagesIterator.remove();
this.bmap.taskMemoryManager.freePage(currentPage);
this.bmap.shuffleMemoryManager.release(currentPage.size());
}
currentPage = dataPagesIterator.next();
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
Expand Down Expand Up @@ -281,7 +294,21 @@ public void remove() {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public BytesToBytesMapIterator iterator() {
return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc);
return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, false, this);
}

/**
* Returns a destructive iterator for iterating over the entries of this map. It frees each page
* as it moves onto next one. Notice: it is illegal to call any method on the map after
* `destructiveIterator()` has been called.
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
* If any other lookups or operations are performed on this map while iterating over it, including
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public BytesToBytesMapIterator destructiveIterator() {
return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, true, this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M
val parentIterator = firstParent[T].iterator(partition, context)
executePartition(context, partition.index, preparedArgument, parentIterator)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
*/
def waitTillTime(targetTime: Long): Long = synchronized {
while (time < targetTime) {
wait(100)
wait(10)
}
getTimeMillis()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ public void setAndRetrieveAKey() {
}
}

@Test
public void iteratorTest() throws Exception {
private void iteratorTestBase(boolean destructive) throws Exception {
final int size = 4096;
BytesToBytesMap map = new BytesToBytesMap(
taskMemoryManager, shuffleMemoryManager, size / 2, PAGE_SIZE_BYTES);
Expand Down Expand Up @@ -216,7 +215,14 @@ public void iteratorTest() throws Exception {
}
}
final java.util.BitSet valuesSeen = new java.util.BitSet(size);
final Iterator<BytesToBytesMap.Location> iter = map.iterator();
final Iterator<BytesToBytesMap.Location> iter;
if (destructive) {
iter = map.destructiveIterator();
} else {
iter = map.iterator();
}
int numPages = map.getNumDataPages();
int countFreedPages = 0;
while (iter.hasNext()) {
final BytesToBytesMap.Location loc = iter.next();
Assert.assertTrue(loc.isDefined());
Expand All @@ -228,18 +234,39 @@ public void iteratorTest() throws Exception {
if (keyLength == 0) {
Assert.assertTrue("value " + value + " was not divisible by 5", value % 5 == 0);
} else {
final long key = PlatformDependent.UNSAFE.getLong(
keyAddress.getBaseObject(), keyAddress.getBaseOffset());
final long key = PlatformDependent.UNSAFE.getLong(
keyAddress.getBaseObject(), keyAddress.getBaseOffset());
Assert.assertEquals(value, key);
}
valuesSeen.set((int) value);
if (destructive) {
// The iterator moves onto next page and frees previous page
if (map.getNumDataPages() < numPages) {
numPages = map.getNumDataPages();
countFreedPages++;
}
}
}
if (destructive) {
// Latest page is not freed by iterator but by map itself
Assert.assertEquals(countFreedPages, numPages - 1);
}
Assert.assertEquals(size, valuesSeen.cardinality());
} finally {
map.free();
}
}

@Test
public void iteratorTest() throws Exception {
iteratorTestBase(false);
}

@Test
public void destructiveIteratorTest() throws Exception {
iteratorTestBase(true);
}

@Test
public void iteratingOverDataPagesWithWastedSpace() throws Exception {
final int NUM_ENTRIES = 1000 * 1000;
Expand Down
19 changes: 19 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,25 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.storage.memoryFraction</code>.
</td>
</tr>
<tr>
<td><code>spark.shuffle.service.enabled</code></td>
<td>false</td>
<td>
Enables the external shuffle service. This service preserves the shuffle files written by
executors so the executors can be safely removed. This must be enabled if
<code>spark.dynamicAllocation.enabled</code> is "true". The external shuffle service
must be set up in order to enable it. See
<a href="job-scheduling.html#configuration-and-setup">dynamic allocation
configuration and setup documentation</a> for more information.
</td>
</tr>
<tr>
<td><code>spark.shuffle.service.port</code></td>
<td>7337</td>
<td>
Port on which the external shuffle service will run.
</td>
</tr>
<tr>
<td><code>spark.shuffle.sort.bypassMergeThreshold</code></td>
<td>200</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator

/**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
Expand Down Expand Up @@ -61,7 +62,7 @@ class DirectKafkaInputDStream[
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging {
) extends InputDStream[R](ssc_) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

Expand All @@ -71,14 +72,35 @@ class DirectKafkaInputDStream[
protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData


/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override protected[streaming] val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new DirectKafkaRateController(id,
RateEstimator.create(ssc.conf, ssc_.graph.batchDuration)))
} else {
None
}
}

protected val kc = new KafkaCluster(kafkaParams)

protected val maxMessagesPerPartition: Option[Long] = {
val ratePerSec = context.sparkContext.getConf.getInt(
private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
if (ratePerSec > 0) {
protected def maxMessagesPerPartition: Option[Long] = {
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
val numPartitions = currentOffsets.keys.size

val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
.map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
.getOrElse(maxRateLimitPerPartition)

if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some((secsPerBatch * ratePerSec).toLong)
Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
} else {
None
}
Expand Down Expand Up @@ -170,11 +192,18 @@ class DirectKafkaInputDStream[
val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))

batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
}
}
}

/**
* A RateController to retrieve the rate from RateEstimator.
*/
private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit = ()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ trait HasOffsetRanges {
* :: Experimental ::
* Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
* can be created with `OffsetRange.create()`.
* @param topic Kafka topic name
* @param partition Kafka partition id
* @param fromOffset Inclusive starting offset
* @param untilOffset Exclusive ending offset
*/
@Experimental
final class OffsetRange private(
/** Kafka topic name */
val topic: String,
/** Kafka partition id */
val partition: Int,
/** inclusive starting offset */
val fromOffset: Long,
/** exclusive ending offset */
val untilOffset: Long) extends Serializable {
import OffsetRange.OffsetRangeTuple

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ReliableKafkaReceiver[
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()

// Initialize the block generator for storing Kafka message.
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)
blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)

if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package org.apache.spark.streaming.kafka
import java.io.File
import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler.rate.RateEstimator

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand Down Expand Up @@ -350,6 +353,77 @@ class DirectKafkaStreamSuite
ssc.stop()
}

test("using rate controller") {
val topic = "backpressure"
val topicPartition = TopicAndPartition(topic, 0)
kafkaTestUtils.createTopic(topic)
val kafkaParams = Map(
"metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "smallest"
)

val batchIntervalMilliseconds = 100
val estimator = new ConstantEstimator(100)
val messageKeys = (1 to 200).map(_.toString)
val messages = messageKeys.map((_, 1)).toMap

val sparkConf = new SparkConf()
// Safe, even with streaming, because we're using the direct API.
// Using 1 core is useful to make the test more predictable.
.setMaster("local[1]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition", "100")

// Setup the streaming context
ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))

val kafkaStream = withClue("Error creating direct stream") {
val kc = new KafkaCluster(kafkaParams)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
val m = kc.getEarliestLeaderOffsets(Set(topicPartition))
.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))

new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, m, messageHandler) {
override protected[streaming] val rateController =
Some(new DirectKafkaRateController(id, estimator))
}
}

val collectedData =
new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]]

// Used for assertion failure messages.
def dataToString: String =
collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")

// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
collectedData += data
}

ssc.start()

// Try different rate limits.
// Send data to Kafka and wait for arrays of data to appear matching the rate.
Seq(100, 50, 20).foreach { rate =>
collectedData.clear() // Empty this buffer on each pass.
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
kafkaTestUtils.sendMessages(topic, messages)
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.exists(_.size == expectedSize),
s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
}
}

ssc.stop()
}

/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
Expand Down Expand Up @@ -381,3 +455,18 @@ object DirectKafkaStreamSuite {
}
}
}

private[streaming] class ConstantEstimator(@volatile private var rate: Long)
extends RateEstimator {

def updateRate(newRate: Long): Unit = {
rate = newRate
}

def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(rate)
}

Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private[kinesis] class KinesisReceiver(
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
*/
override def onStart() {
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, SparkEnv.get.conf)
blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)

workerId = Utils.localHostName() + ":" + UUID.randomUUID()

Expand Down
Loading

0 comments on commit 5d5afdf

Please sign in to comment.