Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-14098][SQL] Generate Java code to build CachedColumnarBatch and get values from CachedColumnarBatch when DataFrame.cache() is called #15219

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8ed3ee7
Add benchmark code
Jun 17, 2016
6319cd4
backup
Jun 20, 2016
bf5a1f9
Narrow benchmarked code + add back old scan code
Jun 20, 2016
ba2f329
Fix benchmark to time only the read path
Jun 21, 2016
b9d9346
First working impl. of ColumnarBatch based caching
Jun 21, 2016
23a50d9
Always enable codegen and vectorized hashmap
Jun 22, 2016
b25841d
Don't benchmark aggregate
Jun 22, 2016
3d48f2d
Codegen memory scan using ColumnarBatches
Jun 22, 2016
9fa5b94
Clean up the code a little
Jun 22, 2016
631c3b4
Clean up a little more
Jun 22, 2016
fc556a6
Generate code for write path to support other types
Jun 23, 2016
9091f93
Move cache benchmark to new file
Jun 23, 2016
6bc3def
Abstract codegen code into ColumnarBatchScan
Jun 23, 2016
0e79dbe
Introduce CACHE_CODEGEN config to reduce dup code
Jun 24, 2016
08bb8a5
Add some tests for InMemoryRelation
Jun 24, 2016
6f2c274
Add some tests for InMemoryRelation
Jun 24, 2016
ac878d4
Fix InMemoryColumnarQuerySuite
Jun 24, 2016
7519125
Clean up code: abstract CachedBatch and ColumnarBatch
Jun 24, 2016
df6c1f2
Add end-to-end benchmark, including write path
Jun 24, 2016
aa22b16
merge with master
kiszk Sep 1, 2016
e954ac4
support all of data types
kiszk Sep 23, 2016
fc1f6aa
rebase
kiszk Sep 23, 2016
6f66119
support UDT and column pruning
kiszk Sep 24, 2016
b5b9e75
fix build error
kiszk Sep 24, 2016
9548c8a
fix test failures
kiszk Sep 24, 2016
3efea44
fix test failures (union33, load_dyn_part13, multiMapJoin1)
kiszk Sep 25, 2016
3c4959c
support compression of CachedColumnarBatch using CompressionCodec
kiszk Sep 27, 2016
ca7abf5
make generated code smaller
kiszk Sep 29, 2016
9aa5c9b
Generate Java code of an iterator for CachedColumnarBatch
kiszk Sep 29, 2016
1154aa5
replace test() with ignore()
kiszk Sep 29, 2016
c00e42e
support uncompressed ColumnVector
kiszk Sep 29, 2016
fdd2b1c
update benchmark programs
kiszk Sep 29, 2016
572abd9
rebase
kiszk Oct 4, 2016
750dc72
rebase
kiszk Oct 4, 2016
b573ec6
fix compilation errors
kiszk Oct 8, 2016
9066c3f
rebase
kiszk Dec 1, 2016
65011cb
fix typo
kiszk Dec 1, 2016
e4407c7
merge
kiszk Dec 1, 2016
3eb4ebe
remove duplicated test
kiszk Dec 1, 2016
b15d9d5
merge with master
kiszk Jan 20, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/spark/memory/MemoryMode.java
Expand Up @@ -22,5 +22,6 @@
@Private
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
OFF_HEAP,
ON_HEAP_UNSAFE
}
42 changes: 32 additions & 10 deletions core/src/main/scala/org/apache/spark/util/Benchmark.scala
Expand Up @@ -69,12 +69,17 @@ private[spark] class Benchmark(
* @param name of the benchmark case
* @param numIters if non-zero, forces exactly this many iterations to be run
*/
def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
addTimerCase(name, numIters) { timer =>
def addCase(
name: String,
numIters: Int = 0,
prepare: () => Unit = () => { },
cleanup: () => Unit = () => { })(f: Int => Unit): Unit = {
val timedF = (timer: Benchmark.Timer) => {
timer.startTiming()
f(timer.iteration)
timer.stopTiming()
}
benchmarks += Benchmark.Case(name, timedF, numIters, prepare, cleanup)
}

/**
Expand All @@ -101,7 +106,7 @@ private[spark] class Benchmark(

val results = benchmarks.map { c =>
println(" Running case: " + c.name)
measure(valuesPerIteration, c.numIters)(c.fn)
measure(valuesPerIteration, c.numIters, c.prepare, c.cleanup)(c.fn)
}
println

Expand All @@ -128,21 +133,33 @@ private[spark] class Benchmark(
* Runs a single function `f` for iters, returning the average time the function took and
* the rate of the function.
*/
def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = {
def measure(num: Long, overrideNumIters: Int, prepare: () => Unit, cleanup: () => Unit)
(f: Timer => Unit): Result = {
System.gc() // ensures garbage from previous cases don't impact this one
val warmupDeadline = warmupTime.fromNow
while (!warmupDeadline.isOverdue) {
f(new Benchmark.Timer(-1))
try {
prepare()
f(new Benchmark.Timer(-1))
} finally {
cleanup()
}
}
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
val runTimes = ArrayBuffer[Long]()
var i = 0
while (i < minIters || runTimes.sum < minDuration) {
val timer = new Benchmark.Timer(i)
f(timer)
val runTime = timer.totalTime()
runTimes += runTime
val runTime = try {
prepare()
val timer = new Benchmark.Timer(i)
f(timer)
val time = timer.totalTime()
runTimes += time
time
} finally {
cleanup()
}

if (outputPerIteration) {
// scalastyle:off
Expand Down Expand Up @@ -188,7 +205,12 @@ private[spark] object Benchmark {
}
}

case class Case(name: String, fn: Timer => Unit, numIters: Int)
case class Case(
name: String,
fn: Timer => Unit,
numIters: Int,
prepare: () => Unit = () => { },
cleanup: () => Unit = () => { })
case class Result(avgMs: Double, bestRate: Double, bestMs: Double)

/**
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.vectorized;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;

Expand All @@ -25,10 +26,14 @@

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
import org.apache.spark.sql.catalyst.expressions.UnsafeMapData;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -57,7 +62,9 @@
*
* ColumnVectors are intended to be reused.
*/
public abstract class ColumnVector implements AutoCloseable {
public abstract class ColumnVector implements AutoCloseable, Serializable {
ColumnVector() { }

/**
* Allocates a column to store elements of `type` on or off heap.
* Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
Expand All @@ -66,6 +73,8 @@ public abstract class ColumnVector implements AutoCloseable {
public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) {
if (mode == MemoryMode.OFF_HEAP) {
return new OffHeapColumnVector(capacity, type);
} else if (mode == MemoryMode.ON_HEAP_UNSAFE) {
return new OnHeapUnsafeColumnVector(capacity, type);
} else {
return new OnHeapColumnVector(capacity, type);
}
Expand Down Expand Up @@ -548,18 +557,69 @@ public ColumnarBatch.Row getStruct(int rowId) {
* Returns a utility object to get structs.
* provided to keep API compatibility with InternalRow for code generation
*/
public ColumnarBatch.Row getStruct(int rowId, int size) {
resultStruct.rowId = rowId;
return resultStruct;
public InternalRow getStruct(int rowId, int size) {
if (!unsafeDirectCopy) {
resultStruct.rowId = rowId;
return resultStruct;
}
resultArray.data.loadBytes(resultArray);
int offset = getArrayOffset(rowId);
int length = getArrayLength(rowId);
UnsafeRow map = new UnsafeRow(size);
map.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length);
return map;
}

public int putStruct(int rowId, InternalRow row) {
if (!unsafeDirectCopy) {
throw new UnsupportedOperationException();
}
assert(row instanceof UnsafeRow);
UnsafeRow unsafeRow = (UnsafeRow)row;
byte[] value = (byte[])unsafeRow.getBaseObject();
long offset = unsafeRow.getBaseOffset() - Platform.BYTE_ARRAY_OFFSET;
int length = unsafeRow.getSizeInBytes();
if (offset > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Cannot put this map to ColumnVector as " +
"it's too big.");
}
putByteArray(rowId, value, (int)offset, length);
return length;
}

/**
* Returns the array at rowid.
*/
public final Array getArray(int rowId) {
resultArray.length = getArrayLength(rowId);
resultArray.offset = getArrayOffset(rowId);
return resultArray;
public final ArrayData getArray(int rowId) {
if (unsafeDirectCopy) {
resultArray.data.loadBytes(resultArray); // update resultArray.byteData
int offset = getArrayOffset(rowId);
int length = getArrayLength(rowId);
UnsafeArrayData array = new UnsafeArrayData();
array.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length);
return array;
} else {
resultArray.length = getArrayLength(rowId);
resultArray.offset = getArrayOffset(rowId);
return resultArray;
}
}

public final int putArray(int rowId, ArrayData array) {
if (!unsafeDirectCopy) {
throw new UnsupportedOperationException();
}
assert(array instanceof UnsafeArrayData);
UnsafeArrayData unsafeArray = (UnsafeArrayData)array;
byte[] value = (byte[])unsafeArray.getBaseObject();
long offset = unsafeArray.getBaseOffset() - Platform.BYTE_ARRAY_OFFSET;
int length = unsafeArray.getSizeInBytes();
if (offset > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Cannot put this array to ColumnVector as " +
"it's too big.");
}
putByteArray(rowId, value, (int)offset, length);
return length;
}

/**
Expand All @@ -579,16 +639,43 @@ public final int putByteArray(int rowId, byte[] value) {
* Returns the value for rowId.
*/
private Array getByteArray(int rowId) {
Array array = getArray(rowId);
resultArray.length = getArrayLength(rowId);
resultArray.offset = getArrayOffset(rowId);
Array array = resultArray;
array.data.loadBytes(array);
return array;
}

/**
* Returns the value for rowId.
*/
public MapData getMap(int ordinal) {
throw new UnsupportedOperationException();
public MapData getMap(int rowId) {
if (!unsafeDirectCopy) {
throw new UnsupportedOperationException();
}
resultArray.data.loadBytes(resultArray);
int offset = getArrayOffset(rowId);
int length = getArrayLength(rowId);
UnsafeMapData map = new UnsafeMapData();
map.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length);
return map;
}

public int putMap(int rowId, MapData map) {
if (!unsafeDirectCopy) {
throw new UnsupportedOperationException();
}
assert(map instanceof UnsafeMapData);
UnsafeMapData unsafeMap = (UnsafeMapData)map;
byte[] value = (byte[])unsafeMap.getBaseObject();
long offset = unsafeMap.getBaseOffset() - Platform.BYTE_ARRAY_OFFSET;
int length = unsafeMap.getSizeInBytes();
if (offset > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Cannot put this map to ColumnVector as " +
"it's too big.");
}
putByteArray(rowId, value, (int)offset, length);
return length;
}

/**
Expand All @@ -609,14 +696,18 @@ public final Decimal getDecimal(int rowId, int precision, int scale) {
}


public final void putDecimal(int rowId, Decimal value, int precision) {
public final int putDecimal(int rowId, Decimal value, int precision) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
putInt(rowId, (int) value.toUnscaledLong());
return 4;
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
putLong(rowId, value.toUnscaledLong());
return 8;
} else {
BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
putByteArray(rowId, bigInteger.toByteArray());
byte[] array = bigInteger.toByteArray();
putByteArray(rowId, array);
return array.length;
}
}

Expand All @@ -633,6 +724,13 @@ public final UTF8String getUTF8String(int rowId) {
}
}

public final int putUTF8String(int rowId, UTF8String string) {
assert(dictionary == null);
byte[] array = string.getBytes();
putByteArray(rowId, array);
return array.length;
}

/**
* Returns the byte array for rowId.
*/
Expand All @@ -648,6 +746,11 @@ public final byte[] getBinary(int rowId) {
}
}

public final int putBinary(int rowId, byte[] bytes) {
putByteArray(rowId, bytes);
return bytes.length;
}

/**
* Append APIs. These APIs all behave similarly and will append data to the current vector. It
* is not valid to mix the put and append APIs. The append APIs are slower and should only be
Expand Down Expand Up @@ -894,10 +997,12 @@ public final int appendStruct(boolean isNull) {
@VisibleForTesting
protected int MAX_CAPACITY = Integer.MAX_VALUE;

protected boolean unsafeDirectCopy;

/**
* Data type for this column.
*/
protected final DataType type;
protected DataType type;

/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
Expand Down Expand Up @@ -929,17 +1034,17 @@ public final int appendStruct(boolean isNull) {
/**
* If this is a nested type (array or struct), the column for the child data.
*/
protected final ColumnVector[] childColumns;
protected ColumnVector[] childColumns;

/**
* Reusable Array holder for getArray().
*/
protected final Array resultArray;
protected Array resultArray;

/**
* Reusable Struct holder for getStruct().
*/
protected final ColumnarBatch.Row resultStruct;
protected ColumnarBatch.Row resultStruct;

/**
* The Dictionary for this column.
Expand Down Expand Up @@ -991,14 +1096,20 @@ public ColumnVector getDictionaryIds() {
* type.
*/
protected ColumnVector(int capacity, DataType type, MemoryMode memMode) {
this(capacity, type, memMode, false);
}

protected ColumnVector(int capacity, DataType type, MemoryMode memMode, boolean unsafeDirectCopy) {
this.capacity = capacity;
this.type = type;
this.unsafeDirectCopy = unsafeDirectCopy;

if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType
|| DecimalType.isByteArrayDecimalType(type)) {
|| DecimalType.isByteArrayDecimalType(type)
|| unsafeDirectCopy && (type instanceof MapType || type instanceof StructType)) {
DataType childType;
int childCapacity = capacity;
if (type instanceof ArrayType) {
if (!unsafeDirectCopy && type instanceof ArrayType) {
childType = ((ArrayType)type).elementType();
} else {
childType = DataTypes.ByteType;
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.vectorized;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.*;

Expand Down Expand Up @@ -43,7 +44,7 @@
* - There are many TODOs for the existing APIs. They should throw a not implemented exception.
* - Compaction: The batch and columns should be able to compact based on a selection vector.
*/
public final class ColumnarBatch {
public final class ColumnarBatch implements Serializable {
private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
private static MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;

Expand Down