Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1371][WIP] Compression support for Spark SQL in-memory columnar storage #285

Closed
wants to merge 7 commits into from

Conversation

liancheng
Copy link
Contributor

JIRA issue: SPARK-1373

(Although tagged as WIP, this PR is structurally complete. The only things left unimplemented are 3 more compression algorithms: BooleanBitSet, IntDelta and LongDelta, which are trivial to add later in this or another separate PR.)

This PR contains compression support for Spark SQL in-memory columnar storage. Main interfaces include:

  • CompressionScheme

Each CompressionScheme represents a concrete compression algorithm, which basically consists of an Encoder for compression and a Decoder for decompression. Algorithms implemented include:

  • RunLengthEncoding
  • DictionaryEncoding

Algorithms to be implemented include:

  • BooleanBitSet
  • IntDelta
  • LongDelta
  • CompressibleColumnBuilder

A stackable ColumnBuilder trait used to build byte buffers for compressible columns. A best CompressionScheme that exhibits lowest compression ratio is chosen for each column according to statistical information gathered while elements are appended into the ColumnBuilder. However, if no CompressionScheme can achieve a compression ratio better than 80%, no compression will be done for this column to save CPU time.

Memory layout of the final byte buffer is showed below:

 .--------------------------- Column type ID (4 bytes)
 |   .----------------------- Null count N (4 bytes)
 |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
 |   |   |     .------------- Compression scheme ID (4 bytes)
 |   |   |     |   .--------- Compressed non-null elements
 V   V   V     V   V
+---+---+-----+---+---------+
|   |   | ... |   | ... ... |
+---+---+-----+---+---------+
 \-----------/ \-----------/
    header         body
  • CompressibleColumnAccessor

A stackable ColumnAccessor trait used to iterate (possibly) compressed data column.

  • ColumnStats

Used to collect statistical information while loading data into in-memory columnar table. Optimizations like partition pruning rely on this information.

Strictly speaking, ColumnStats related code is not part of the compression support. It's contained in this PR to ensure and validate the row-based API design (which is used to avoid boxing/unboxing cost whenever possible).

A major refactoring change since PR #205 is:

  • Refactored all getter/setter methods for primitive types in various places into ColumnType classes to remove duplicated code.

Primitive setters/getters for (Mutable)Rows are moved to ColumnTypes.
* Added two more compression schemes (RLE & dictionary encoding)
* Moved compression support code to columnar.compression
* Various refactoring
* Added test suites for RunLengthEncoding and DictionaryEncoding
* Completed ColumnStatsSuite
* Bug fix: RunLengthEncoding didn't encode the last run
* Refactored some test related code for clarity
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13633/

@@ -55,10 +56,8 @@ class ColumnTypeSuite extends FunSuite {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor existing comment: I find this style of testing produces very cryptic failures. When something breaks all you are going to get is 4 does not equal 8. Furthermore, because the failure is in a loop the stacktrace also won't be helpful in figuring out which datatype is wrong. Finally, the correct answer for GENERIC is 10 lines aways from the check, making it unnecessarily hard to read the test and see what the expected answers are.

I think something like this would be clearer, and the same number of lines of code:

def checkActualSize[A](t: ColumnType[A], v: A, expectedSize: Int) =
  if(t.actualSize(v) != expectedSize) { 
    fail(s"Wrong actualSize for $t, actual: ${t.actualSize(t)}, expected: $expected") 
  }

checkActualSize(INT, Int.MaxValue, 4)
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found expectResult is equivalent to this and is more concise. Updated all occurrences where I think is proper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh cool, I did not know about that. Much clearer!

@marmbrus
Copy link
Contributor

marmbrus commented Apr 1, 2014

Hi @liancheng,

This looks pretty good :) Thanks for working on it! I made a bunch of comments, but mostly just cause I tried to look at this pretty closely. I think the only really important one before we merge is to fix the visibility of IntColumnStats.

Just to make sure I understood the code correctly, compression is now turned on by default since the CompressibleColumnBuilder is mixed into all the NativeColumnBuilders, and when build() is called inside of the InMemoryColumnarTableScan the "staging buffer" will get compressed using the codec with the best compression ratio?

If that's correct then I think we can merge once the above issues is addressed.

There are a few followups that we can do in a separate PR:

  • Add the missing compression codecs
  • Push predicates into the table scan and use statistics where applicable to prune entire partitions.
  • We could also create a planning strategy that calculates the answer for applicable aggregates using these statistics.

I think the latter two are going to require some refactoring so that in-memory cached data becomes a logical concept and the actual table scan is created by the planning strategies once we know about predicates and available statistics. Regardless, this should be transparent to the user and easy to add later.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@liancheng
Copy link
Contributor Author

Thanks for the detailed review :) All issues addressed.

Just to make sure I understood the code correctly, compression is now turned on by default since the CompressibleColumnBuilder is mixed into all the NativeColumnBuilders, and when build() is called inside of the InMemoryColumnarTableScan the "staging buffer" will get compressed using the codec with the best compression ratio?

Exactly.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13684/

@marmbrus
Copy link
Contributor

marmbrus commented Apr 2, 2014

LGTM

@pwendell, this is ready to merge.

@pwendell
Copy link
Contributor

pwendell commented Apr 2, 2014

Merged, thanks!

@asfgit asfgit closed this in 1faa579 Apr 2, 2014
@liancheng liancheng deleted the memColumnarCompression branch April 3, 2014 09:28
andrewor14 pushed a commit to andrewor14/spark that referenced this pull request Apr 7, 2014
@marmbrus
Copy link
Contributor

Note that much of this code was adapted from Shark, including: https://github.com/amplab/shark/blob/master/src/test/scala/shark/memstore2/column/CompressionAlgorithmSuite.scala

pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
…r storage

JIRA issue: [SPARK-1373](https://issues.apache.org/jira/browse/SPARK-1373)

(Although tagged as WIP, this PR is structurally complete. The only things left unimplemented are 3 more compression algorithms: `BooleanBitSet`, `IntDelta` and `LongDelta`, which are trivial to add later in this or another separate PR.)

This PR contains compression support for Spark SQL in-memory columnar storage. Main interfaces include:

*   `CompressionScheme`

    Each `CompressionScheme` represents a concrete compression algorithm, which basically consists of an `Encoder` for compression and a `Decoder` for decompression. Algorithms implemented include:

    * `RunLengthEncoding`
    * `DictionaryEncoding`

    Algorithms to be implemented include:

    * `BooleanBitSet`
    * `IntDelta`
    * `LongDelta`

*   `CompressibleColumnBuilder`

    A stackable `ColumnBuilder` trait used to build byte buffers for compressible columns.  A best `CompressionScheme` that exhibits lowest compression ratio is chosen for each column according to statistical information gathered while elements are appended into the `ColumnBuilder`. However, if no `CompressionScheme` can achieve a compression ratio better than 80%, no compression will be done for this column to save CPU time.

    Memory layout of the final byte buffer is showed below:

    ```
     .--------------------------- Column type ID (4 bytes)
     |   .----------------------- Null count N (4 bytes)
     |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
     |   |   |     .------------- Compression scheme ID (4 bytes)
     |   |   |     |   .--------- Compressed non-null elements
     V   V   V     V   V
    +---+---+-----+---+---------+
    |   |   | ... |   | ... ... |
    +---+---+-----+---+---------+
     \-----------/ \-----------/
        header         body
    ```

*   `CompressibleColumnAccessor`

    A stackable `ColumnAccessor` trait used to iterate (possibly) compressed data column.

*   `ColumnStats`

    Used to collect statistical information while loading data into in-memory columnar table. Optimizations like partition pruning rely on this information.

    Strictly speaking, `ColumnStats` related code is not part of the compression support. It's contained in this PR to ensure and validate the row-based API design (which is used to avoid boxing/unboxing cost whenever possible).

A major refactoring change since PR apache#205 is:

* Refactored all getter/setter methods for primitive types in various places into `ColumnType` classes to remove duplicated code.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes apache#285 from liancheng/memColumnarCompression and squashes the following commits:

ed71bbd [Cheng Lian] Addressed all PR comments by @marmbrus
d3a4fa9 [Cheng Lian] Removed Ordering[T] in ColumnStats for better performance
5034453 [Cheng Lian] Bug fix, more tests, and more refactoring
c298b76 [Cheng Lian] Test suites refactored
2780d6a [Cheng Lian] [WIP] in-memory columnar compression support
211331c [Cheng Lian] WIP: in-memory columnar compression support
85cc59b [Cheng Lian] Refactored ColumnAccessors & ColumnBuilders to remove duplicate code
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
…pache#285)

* add cloud-provider-openstack acceptance job for manila-provisioner

* zuul.d/jobs and pipelines for cloud-provider-openstack-acceptance-test-manila-provisioner

* changed roles in manila-provisioner job

* fixed roles in cloud-provider-openstack manila-provisioner job
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants