Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12854][SQL] Implement complex types support in ColumnarBatch #10820

Closed
wants to merge 5 commits into from

Conversation

nongli
Copy link
Contributor

@nongli nongli commented Jan 18, 2016

This patch adds support for complex types for ColumnarBatch. ColumnarBatch supports structs
and arrays. There is a simple mapping between the richer catalyst types to these two. Strings
are treated as an array of bytes.

ColumnarBatch will contain a column for each node of the schema. Non-complex schemas consists
of just leaf nodes. Structs represent an internal node with one child for each field. Arrays
are internal nodes with one child. Structs just contain nullability. Arrays contain offsets
and lengths into the child array. This structure is able to handle arbitrary nesting. It has
the key property that we maintain columnar throughout and that primitive types are only stored
in the leaf nodes and contiguous across rows. For example, if the schema is

array<array<int>>

There are three columns in the schema. The internal nodes each have one children. The leaf node contains all the int data stored consecutively.

As part of this, this patch adds append APIs in addition to the Put APIs (e.g. putLong(rowid, v)
vs appendLong(v)). These APIs are necessary when the batch contains variable length elements.
The vectors are not fixed length and will grow as necessary. This should make the usage a lot
simpler for the writer.

…atch.

WIP: this patch adds some random row generation. The test code needs to be cleaned up as it
duplicates functionality from else where. The non-test code should be good to review.

This patch adds support for complex types for ColumnarBatch. ColumnarBatch supports structs
and arrays. There is a simple mapping between the richer catalyst types to these two. Strings
are treated as an array of bytes.

ColumnarBatch will contain a column for each node of the schema. Non-complex schemas consists
of just leaf nodes. Structs represent an internal node with one child for each field. Arrays
are internal nodes with one child. Structs just contain nullability. Arrays contain offsets
and lengths into the child array. This structure is able to handle arbitrary nesting. It has
the key property that we maintain columnar throughout and that primitive types are only stored
in the leaf nodes and contiguous across rows. For example, if the schema is array<array<int>>,
all of the int data is stored consecutively.

As part of this, this patch adds append APIs in addition to the Put APIs (e.g. putLong(rowid, v)
vs appendLong(v)). These APIs are necessary when the batch contains variable length elements.
The vectors are not fixed length and will grow as necessary. This should make the usage a lot
simpler for the writer.
@SparkQA
Copy link

SparkQA commented Jan 19, 2016

Test build #49636 has finished for PR 10820 at commit 518d9bc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public static final class Array
    • public static final class Struct
    • public class ColumnVectorUtils
    • public static final class Row extends InternalRow

@rxin
Copy link
Contributor

rxin commented Jan 19, 2016

if the schema is array>,? array of int?

* Most of the APIs take the rowId as a parameter. This is the local 0-based row id for values
* To handle nested schemas, ColumnVector has two types: Arrays and Structs. In both cases these
* columns have child columns. All of the data is stored in the child columns and the parent column
* contains nullability, and in the case of Arrays, the lengths and offsets into the child column.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain how lengths and offsets are stored? also is there a single "parent" column that encodes nullability, length, and offset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the comment when I rev the PR but the answer is:

I'm not sure what single "parent" means. The array is a column that stores nullability, lengths and offsets. The child column stores the values, including their nullability. Either one can be independently nullable or not.

Lengths and offsets are encoded like 'ints'.

@kiszk
Copy link
Member

kiszk commented Jan 20, 2016

Will ColumnarBatch become a column version of UnsafeRow? Or, it will become more general columnar store?

@nongli
Copy link
Contributor Author

nongli commented Jan 25, 2016

@kiszk We're looking to make the execution engine represent data in a columnar way. I'm not sure what you mean by more general columnar store. I usually think of that more for storage.

@nongli nongli changed the title [WIP] [SPARK-12854][SQL] Implement complex types support in ColumnarBatch [SPARK-12854][SQL] Implement complex types support in ColumnarBatch Jan 25, 2016
@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50029 has finished for PR 10820 at commit f579889.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -105,6 +105,17 @@ public static void freeMemory(long address) {
_UNSAFE.freeMemory(address);
}

public static long reallocateMemory(long address, long oldSize, long newSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also work for on-heap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have this in the same group of functions as freeMemory, allocateMemory. What would the on heap version look like?

int[] newLengths = new int[newCapacity];
int[] newOffsets = new int[newCapacity];
if (this.arrayLengths != null) {
System.arraycopy(this.arrayLengths, 0, newLengths, 0, elementsAppended);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it compare to Unsafe.copyMemory()?

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50044 has finished for PR 10820 at commit c58eedf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50046 has finished for PR 10820 at commit f579889.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50051 has finished for PR 10820 at commit 2918077.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50080 has finished for PR 10820 at commit f378335.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public static final class Array extends ArrayData
    • public static final class Struct extends InternalRow

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50119 has finished for PR 10820 at commit 3d11c37.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

String Read/Write: Avg Time(ms) Avg Rate(M/s) Relative Rate
-------------------------------------------------------------------------------------
On Heap 457.0 35.85 1.00 X
Off Heap 1206.0 13.59 0.38 X
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utf8string supports reading offheap too so you can probably improve this substantially.

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #2460 has finished for PR 10820 at commit 3d11c37.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 27, 2016

Test build #2464 has finished for PR 10820 at commit 3d11c37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jan 27, 2016

Going to merge this in master. Thanks.

@asfgit asfgit closed this in 5551273 Jan 27, 2016
@nongli nongli deleted the spark-12854 branch January 27, 2016 04:11
childType = ((ArrayType)type).elementType();
} else {
childType = DataTypes.ByteType;
childCapacity *= DEFAULT_ARRAY_LENGTH;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only grow the capacity for non-array type?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants