Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11149] [SQL] Improve cache performance for primitive types #9145

Closed
wants to merge 7 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Oct 16, 2015

This PR improve the performance by:

  1. Generate an Iterator that take Iterator[CachedBatch] as input, and call accessors (unroll the loop for columns), avoid the expensive Iterator.flatMap.

  2. Use Unsafe.getInt/getLong/getFloat/getDouble instead of ByteBuffer.getInt/getLong/getFloat/getDouble, the later one actually read byte by byte.

  3. Remove the unnecessary copy() in Coalesce(), which is not related to memory cache, found during benchmark.

The following benchmark showed that we can speedup the columnar cache of int by 2x.

path = '/opt/tpcds/store_sales/'
int_cols = ['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk','ss_customer_sk']
df = sqlContext.read.parquet(path).select(int_cols).cache()
df.count()

t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t

@SparkQA
Copy link

SparkQA commented Oct 16, 2015

Test build #43824 has finished for PR 9145 at commit 7ee54a9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class ColumnarIterator extends Iterator[InternalRow]
    • class SpecificColumnarIterator extends $

@SparkQA
Copy link

SparkQA commented Oct 16, 2015

Test build #43834 has finished for PR 9145 at commit 8a49887.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class ColumnarIterator extends Iterator[InternalRow]
    • class SpecificColumnarIterator extends $

@SparkQA
Copy link

SparkQA commented Oct 16, 2015

Test build #43836 has finished for PR 9145 at commit 1ef3e18.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class ColumnarIterator extends Iterator[InternalRow]
    • class SpecificColumnarIterator extends $

@davies davies changed the title [WIP] Improve cache performance for primitive types [SPARK-11149] [SQL] Improve cache performance for primitive types Oct 16, 2015
@@ -44,11 +45,13 @@ private class CodeFormatter {
} else {
indentString
}
code.append(f"${currentLine}%03d ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, we can add /* ... */ to still enable pasting this into an IDE.


protected def create(columnTypes: Seq[DataType]): ColumnarIterator = {
val ctx = newCodeGenContext()
val (creaters, accesses) = columnTypes.zipWithIndex.map { case (dt, index) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creators

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually probably more clear to say

initializeAccessors and extractors

@SparkQA
Copy link

SparkQA commented Oct 16, 2015

Test build #43844 has finished for PR 9145 at commit 4511781.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class ColumnarIterator extends Iterator[InternalRow]
    • class SpecificColumnarIterator extends $

@rxin
Copy link
Contributor

rxin commented Oct 19, 2015

LGTM - although I didn't look super closely so might be good for an extra pair of eyes too.

@SparkQA
Copy link

SparkQA commented Oct 19, 2015

Test build #43936 has finished for PR 9145 at commit f9151cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ColumnarIterator extends Iterator[InternalRow]\n * class SpecificColumnarIterator extends $\n

@davies
Copy link
Contributor Author

davies commented Oct 20, 2015

I'm going to merge this first, unblock me to work on output UnsafeRow for columnar cache, because having Unsafe format inside a MutableRow could result unexpected behavior, any new comments will be address in follow up PR.

@asfgit asfgit closed this in 06e6b76 Oct 20, 2015
}

override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
row.setDouble(ordinal, buffer.getDouble())
row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer))
}

override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Around line 332, there is call to buffer.getShort()

Is it worth adding corresponding method to ByteBufferHelper ?

If so, I can send a PR.

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does not worth it.

@cloud-fan
Copy link
Contributor

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants