Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into unsafe-starve-m…
Browse files Browse the repository at this point in the history
…emory-agg

Conflicts:
	core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
  • Loading branch information
Andrew Or committed Aug 7, 2015
2 parents ca1b44c + 881548a commit 355a9bd
Show file tree
Hide file tree
Showing 18 changed files with 133 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public final class BytesToBytesMap {
* Position {@code 2 * i} in the array is used to track a pointer to the key at index {@code i},
* while position {@code 2 * i + 1} in the array holds key's full 32-bit hashcode.
*/
private LongArray longArray;
@Nullable private LongArray longArray;
// TODO: we're wasting 32 bits of space here; we can probably store fewer bits of the hashcode
// and exploit word-alignment to use fewer bits to hold the address. This might let us store
// only one long per map entry, increasing the chance that this array will fit in cache at the
Expand All @@ -124,7 +124,7 @@ public final class BytesToBytesMap {
* A {@link BitSet} used to track location of the map where the key is set.
* Size of the bitset should be half of the size of the long array.
*/
private BitSet bitset;
@Nullable private BitSet bitset;

private final double loadFactor;

Expand Down Expand Up @@ -166,6 +166,8 @@ public final class BytesToBytesMap {

private long numHashCollisions = 0;

private long peakMemoryUsedBytes = 0L;

public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
ShuffleMemoryManager shuffleMemoryManager,
Expand Down Expand Up @@ -326,6 +328,9 @@ public Location lookup(
Object keyBaseObject,
long keyBaseOffset,
int keyRowLengthBytes) {
assert(bitset != null);
assert(longArray != null);

if (enablePerfMetrics) {
numKeyLookups++;
}
Expand Down Expand Up @@ -415,6 +420,7 @@ private void updateAddressesAndSizes(final Object page, final long offsetInPage)
}

private Location with(int pos, int keyHashcode, boolean isDefined) {
assert(longArray != null);
this.pos = pos;
this.isDefined = isDefined;
this.keyHashcode = keyHashcode;
Expand Down Expand Up @@ -530,6 +536,9 @@ public boolean putNewKey(
assert (!isDefined) : "Can only set value once for a key";
assert (keyLengthBytes % 8 == 0);
assert (valueLengthBytes % 8 == 0);
assert(bitset != null);
assert(longArray != null);

if (numElements == MAX_CAPACITY) {
throw new IllegalStateException("BytesToBytesMap has reached maximum capacity");
}
Expand Down Expand Up @@ -674,6 +683,7 @@ private void allocate(int capacity) {
* This method is idempotent and can be called multiple times.
*/
public void free() {
updatePeakMemoryUsed();
longArray = null;
bitset = null;
Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator();
Expand All @@ -700,14 +710,30 @@ public long getPageSizeBytes() {

/**
* Returns the total amount of memory, in bytes, consumed by this map's managed structures.
* Note that this is also the peak memory used by this map, since the map is append-only.
*/
public long getTotalMemoryConsumption() {
long totalDataPagesSize = 0L;
for (MemoryBlock dataPage : dataPages) {
totalDataPagesSize += dataPage.size();
}
return totalDataPagesSize + bitset.memoryBlock().size() + longArray.memoryBlock().size();
return totalDataPagesSize +
((bitset != null) ? bitset.memoryBlock().size() : 0L) +
((longArray != null) ? longArray.memoryBlock().size() : 0L);
}

private void updatePeakMemoryUsed() {
long mem = getTotalMemoryConsumption();
if (mem > peakMemoryUsedBytes) {
peakMemoryUsedBytes = mem;
}
}

/**
* Return the peak memory used so far, in bytes.
*/
public long getPeakMemoryUsedBytes() {
updatePeakMemoryUsed();
return peakMemoryUsedBytes;
}

/**
Expand Down Expand Up @@ -747,6 +773,9 @@ public int getNumDataPages() {
*/
@VisibleForTesting
void growAndRehash() {
assert(bitset != null);
assert(longArray != null);

long resizeStartTime = -1;
if (enablePerfMetrics) {
resizeStartTime = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public void resizingLargeMap() {
}

@Test
public void testTotalMemoryConsumption() {
public void testPeakMemoryUsed() {
final long recordLengthBytes = 24;
final long pageSizeBytes = 256 + 8; // 8 bytes for end-of-page marker
final long numRecordsPerPage = (pageSizeBytes - 8) / recordLengthBytes;
Expand All @@ -536,8 +536,8 @@ public void testTotalMemoryConsumption() {
// monotonically increasing. More specifically, every time we allocate a new page it
// should increase by exactly the size of the page. In this regard, the memory usage
// at any given time is also the peak memory used.
long previousMemory = map.getTotalMemoryConsumption();
long newMemory;
long previousPeakMemory = map.getPeakMemoryUsedBytes();
long newPeakMemory;
try {
for (long i = 0; i < numRecordsPerPage * 10; i++) {
final long[] value = new long[]{i};
Expand All @@ -548,15 +548,21 @@ public void testTotalMemoryConsumption() {
value,
PlatformDependent.LONG_ARRAY_OFFSET,
8);
newMemory = map.getTotalMemoryConsumption();
newPeakMemory = map.getPeakMemoryUsedBytes();
if (i % numRecordsPerPage == 0 && i > 0) {
// We allocated a new page for this record, so peak memory should change
assertEquals(previousMemory + pageSizeBytes, newMemory);
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
assertEquals(previousMemory, newMemory);
assertEquals(previousPeakMemory, newPeakMemory);
}
previousMemory = newMemory;
previousPeakMemory = newPeakMemory;
}

// Freeing the map should not change the peak memory
map.free();
newPeakMemory = map.getPeakMemoryUsedBytes();
assertEquals(previousPeakMemory, newPeakMemory);

} finally {
map.free();
}
Expand Down
4 changes: 1 addition & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ def explain(self, extended=False):
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
>>> df.explain()
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\
NativeMethodAccessorImpl.java:...
Scan PhysicalRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==
Expand All @@ -224,7 +223,6 @@ def explain(self, extended=False):
...
== Physical Plan ==
...
== RDD ==
"""
if extended:
print(self._jdf.queryExecution().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ object Cast {
case class Cast(child: Expression, dataType: DataType)
extends UnaryExpression with CodegenFallback {

override def toString: String = s"cast($child as ${dataType.simpleString})"

override def checkInputDataTypes(): TypeCheckResult = {
if (Cast.canCast(child.dataType, dataType)) {
TypeCheckResult.TypeCheckSuccess
Expand All @@ -118,8 +120,6 @@ case class Cast(child: Expression, dataType: DataType)

override def nullable: Boolean = Cast.forceNullable(child.dataType, dataType) || child.nullable

override def toString: String = s"CAST($child, $dataType)"

// [[func]] assumes the input is no longer null because eval already does the null check.
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[sql] case class AggregateExpression2(
AttributeSet(childReferences)
}

override def toString: String = s"(${aggregateFunction}2,mode=$mode,isDistinct=$isDistinct)"
override def toString: String = s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)"
}

abstract class AggregateFunction2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,10 @@ public void close() {
}

/**
* The memory used by this map's managed structures, in bytes.
* Note that this is also the peak memory used by this map, since the map is append-only.
* Return the peak memory used so far, in bytes.
*/
public long getMemoryUsage() {
return map.getTotalMemoryConsumption();
public long getPeakMemoryUsedBytes() {
return map.getPeakMemoryUsedBytes();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ public KVSorterIterator sortedIterator() throws IOException {
}
}

/**
* Return the peak memory used so far, in bytes.
*/
public long getPeakMemoryUsedBytes() {
return sorter.getPeakMemoryUsedBytes();
}

/**
* Marks the current page as no-more-space-available, and as a result, either allocate a
* new page or spill when we see the next record.
Expand Down
4 changes: 0 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1011,9 +1011,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
// however, the `toRdd` will cause the real execution, which is not what we want.
// We need to think about how to avoid the side effect.
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
Expand All @@ -1024,7 +1021,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
|== RDD ==
""".stripMargin.trim
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.util.MutablePair
import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}

Expand All @@ -43,18 +43,11 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
override def nodeName: String = if (tungstenMode) "TungstenExchange" else "Exchange"

/**
* Returns true iff the children outputs aggregate UDTs that are not part of the SQL type.
* This only happens with the old aggregate implementation and should be removed in 1.6.
* Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = {
val unserializableUDT = child.schema.exists(_.dataType match {
case _: UserDefinedType[_] => true
case _ => false
})
// Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to
// an interpreted RowOrdering being applied to an UnsafeRow, which will lead to
// ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed.
!unserializableUDT && !newPartitioning.isInstanceOf[RangePartitioning]
GenerateUnsafeProjection.canSupport(child.schema) &&
!newPartitioning.isInstanceOf[RangePartitioning]
}

override def outputPartitioning: Partitioning = newPartitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SQLContext}

Expand Down Expand Up @@ -95,11 +96,23 @@ private[sql] case class LogicalRDD(
/** Physical plan node for scanning data from an RDD. */
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow]) extends LeafNode {
rdd: RDD[InternalRow],
extraInformation: String) extends LeafNode {

override protected[sql] val trackNumOfRowsEnabled = true

protected override def doExecute(): RDD[InternalRow] = rdd

override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
}

private[sql] object PhysicalRDD {
def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation): PhysicalRDD = {
PhysicalRDD(output, rdd, relation.toString)
}
}

/** Logical plan node for scanning data from a local collection. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Generate(
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil
case logical.RepartitionByExpression(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil
case BroadcastHint(child) => apply(child)
case _ => Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ case class TungstenAggregate(
val allAggregateExpressions = nonCompleteAggregateExpressions ++ completeAggregateExpressions

testFallbackStartsAt match {
case None => s"TungstenAggregate ${groupingExpressions} ${allAggregateExpressions}"
case None =>
val keyString = groupingExpressions.mkString("[", ",", "]")
val valueString = allAggregateExpressions.mkString("[", ",", "]")
s"TungstenAggregate(key=$keyString, value=$valueString"
case Some(fallbackStartsAt) =>
s"TungstenAggregateWithControlledFallback ${groupingExpressions} " +
s"${allAggregateExpressions} fallbackStartsAt=$fallbackStartsAt"
s"TungstenAggregateWithControlledFallback $groupingExpressions " +
s"$allAggregateExpressions fallbackStartsAt=$fallbackStartsAt"
}
}
}
Loading

0 comments on commit 355a9bd

Please sign in to comment.