Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16060][SQL] Vectorized Orc reader #13775

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2861ac2
Add vectorized Orc reader support.
viirya Jun 16, 2016
eee8eca
import.
viirya Jun 17, 2016
b753d09
If column is repeating, always using row id 0.
viirya Jun 18, 2016
7d26f5e
Fix bugs of getBinary and numFields.
viirya Jun 19, 2016
74fe936
Remove unnecessary change.
viirya Jun 20, 2016
7e7bb6c
Remove unnecessary change.
viirya Jun 20, 2016
20b832e
Add Apache license headers.
viirya Jun 20, 2016
855bcfd
Adjust exception.
viirya Jun 22, 2016
66ab632
Avoid creating String in getUTF8String.
viirya Jun 23, 2016
4c14278
Merge remote-tracking branch 'upstream/master' into vectorized-orc-re…
viirya Jun 28, 2016
b067658
Address comment.
viirya Aug 10, 2016
aabad7d
Merge remote-tracking branch 'upstream/master' into vectorized-orc-re…
viirya Aug 11, 2016
06066eb
Don't rely on progress to indicate last batch.
viirya Aug 11, 2016
ed780f6
Merge remote-tracking branch 'upstream/master' into vectorized-orc-re…
viirya Oct 6, 2016
7a47360
Address comments.
viirya Nov 3, 2016
3895a98
Address comments.
viirya Nov 22, 2016
255c02e
Merge remote-tracking branch 'upstream/master' into vectorized-orc-re…
viirya Nov 22, 2016
c24169d
Implement few newly added methods of InternalRow.
viirya Nov 22, 2016
c297678
Support return Spark ColumnarBatch.
viirya Nov 23, 2016
8638a0e
Add test for OrcColumnVector.
viirya Nov 24, 2016
55bb19f
Expand OrcQuerySuite to test vectorized Orc reader.
viirya Nov 24, 2016
160e924
Add test for VectorizedSparkOrcNewRecordReader.
viirya Nov 25, 2016
bd15842
Add partition column test.
viirya Nov 25, 2016
0ac61b7
Expand tests.
viirya Nov 25, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ public MapData getMap(int ordinal) {
/**
* Returns the decimal for rowId.
*/
public final Decimal getDecimal(int rowId, int precision, int scale) {
public Decimal getDecimal(int rowId, int precision, int scale) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
return Decimal.createUnsafe(getInt(rowId), precision, scale);
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
Expand Down Expand Up @@ -617,7 +617,7 @@ public final void putDecimal(int rowId, Decimal value, int precision) {
/**
* Returns the UTF8String for rowId.
*/
public final UTF8String getUTF8String(int rowId) {
public UTF8String getUTF8String(int rowId) {
if (dictionary == null) {
ColumnVector.Array a = getByteArray(rowId);
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
Expand All @@ -630,7 +630,7 @@ public final UTF8String getUTF8String(int rowId) {
/**
* Returns the byte array for rowId.
*/
public final byte[] getBinary(int rowId) {
public byte[] getBinary(int rowId) {
if (dictionary == null) {
ColumnVector.Array array = getByteArray(rowId);
byte[] bytes = new byte[array.length];
Expand Down Expand Up @@ -980,6 +980,14 @@ public ColumnVector getDictionaryIds() {
return dictionaryIds;
}

public ColumnVector(DataType type) {
this.capacity = 0;
this.type = type;
this.childColumns = null;
this.resultArray = null;
this.resultStruct = null;
}

/**
* Sets up the common state and also handles creating the child columns if this is a nested
* type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,18 @@ public void filterNullsInColumn(int ordinal) {
nullFilteredColumns.add(ordinal);
}

/**
* A public Ctor which accepts allocated ColumnVectors.
*/
public ColumnarBatch(ColumnVector[] columns, int maxRows) {
this.columns = columns;
this.capacity = maxRows;
this.schema = null;
this.nullFilteredColumns = new HashSet<>();
this.filteredRows = new boolean[maxRows];
this.row = new Row(this);
}

private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) {
this.schema = schema;
this.capacity = maxRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ORC_VECTORIZED_READER_ENABLED =
SQLConfigBuilder("spark.sql.orc.enableVectorizedReader")
.doc("Enables vectorized orc reader.")
.booleanConf
.createWithDefault(false)

val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
Expand Down Expand Up @@ -692,6 +698,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)

def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)

def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)

def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
Expand Down
Loading