Skip to content

Commit

Permalink
Simplify code and fix conflicts after latest rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Jul 30, 2014
1 parent 0174149 commit 9464d5f
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class ExternalAppendOnlyMap[K, V, C](
}
// Do not synchronize spills
if (shouldSpill) {
spill(mapSize)
spill(currentSize)
}
}
currentMap.changeValue(curEntry._1, update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[spark] class ExternalSorter[K, V, C](

// Collective memory threshold shared across all running tasks
private val maxMemoryThreshold = {
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.3)
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark.util.collection
/**
* An append-only map that keeps track of its estimated size in bytes.
*/
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker {
private[spark] class SizeTrackingAppendOnlyMap[K, V]
extends AppendOnlyMap[K, V] with SizeTracker with SizeTrackingPairCollection[K, V]
{
override def update(key: K, value: V): Unit = {
super.update(key, value)
super.afterUpdate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,11 @@ package org.apache.spark.util.collection

import java.util.Comparator

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.util.SizeEstimator

/**
* Append-only buffer of key-value pairs that keeps track of its estimated size in bytes.
* We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
* as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
*
* The tracking code is copied from SizeTrackingAppendOnlyMap -- we'll factor that out soon.
*/
private[spark] class SizeTrackingPairBuffer[K, V](initialCapacity: Int = 64)
extends SizeTrackingPairCollection[K, V]
extends SizeTrackingPairCollection[K, V] with SizeTracker
{
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
require(initialCapacity >= 1, "Invalid initial capacity")
Expand All @@ -42,32 +34,6 @@ private[spark] class SizeTrackingPairBuffer[K, V](initialCapacity: Int = 64)
private var curSize = 0
private var data = new Array[AnyRef](2 * initialCapacity)

// Size-tracking variables: we maintain a sequence of samples since the size of the collection
// depends on both the array and how many of its elements are filled. We reset this each time
// we grow the array since the ratio of null vs non-null elements will change.

private case class Sample(bytes: Long, numUpdates: Long)

/**
* Controls the base of the exponential which governs the rate of sampling.
* E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
*/
private val SAMPLE_GROWTH_RATE = 1.1

/** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
private val samples = new ArrayBuffer[Sample]()

/** Number of insertions into the buffer since the last resetSamples(). */
private var numUpdates: Long = _

/** The value of 'numUpdates' at which we will take our next sample. */
private var nextSampleNum: Long = _

/** The average number of bytes per update between our last two samples. */
private var bytesPerUpdate: Double = _

resetSamples()

/** Add an element into the buffer */
def insert(key: K, value: V): Unit = {
if (curSize == capacity) {
Expand All @@ -76,10 +42,7 @@ private[spark] class SizeTrackingPairBuffer[K, V](initialCapacity: Int = 64)
data(2 * curSize) = key.asInstanceOf[AnyRef]
data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
curSize += 1
numUpdates += 1
if (nextSampleNum == numUpdates) {
takeSample()
}
afterUpdate()
}

/** Total number of elements in buffer */
Expand All @@ -101,13 +64,6 @@ private[spark] class SizeTrackingPairBuffer[K, V](initialCapacity: Int = 64)
}
}

/** Estimate the current size of the buffer in bytes. O(1) time. */
override def estimateSize(): Long = {
assert(samples.nonEmpty)
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.bytes + extrapolatedDelta).toLong
}

/** Double the size of the array because we've reached capacity */
private def growArray(): Unit = {
if (capacity == (1 << 29)) {
Expand All @@ -122,28 +78,6 @@ private[spark] class SizeTrackingPairBuffer[K, V](initialCapacity: Int = 64)
resetSamples()
}

/** Called after the buffer grows in size, as this creates many new null elements. */
private def resetSamples() {
numUpdates = 1
nextSampleNum = 1
samples.clear()
takeSample()
}

/** Takes a new sample of the current buffer's size. */
private def takeSample() {
samples += Sample(SizeEstimator.estimate(this), numUpdates)
// Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
bytesPerUpdate = 0
if (samples.length >= 2) {
val last = samples(samples.length - 1)
val prev = samples(samples.length - 2)
bytesPerUpdate = (last.bytes - prev.bytes).toDouble / (last.numUpdates - prev.numUpdates)
bytesPerUpdate = math.max(0, bytesPerUpdate)
}
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
}

/** Iterate through the data in a given order. For this class this is not really destructive. */
override def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, curSize, keyComparator)
Expand Down

0 comments on commit 9464d5f

Please sign in to comment.