-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark-14138][SQL] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames #11984
Conversation
group a lot of calls into a method
Test build #54274 has finished for PR 11984 at commit
|
Test build #54276 has finished for PR 11984 at commit
|
Jenkins, retest this please |
Test build #54279 has finished for PR 11984 at commit
|
Test build #54277 has finished for PR 11984 at commit
|
Test build #54287 has finished for PR 11984 at commit
|
Test build #54288 has finished for PR 11984 at commit
|
@davies , would it be possible to have a chance to look this? |
} | ||
|
||
/* 4000 = 64000 bytes / 16 (up to 16 bytes per one call)) */ | ||
val numberOfStatementsThreshold = 4000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A java method will not be JITted if it's over 8K, so we may need smaller threshold here. Could you also manual check that (for performance)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies , thank you for your comment. I did not know the limitation. Now, I confirmed these methods are compiled as follows:
hotspot_pid19296.log:<nmethod compile_id='10059' compiler='C1' level='3' entry='0x00007f03a9574500' size='3024' address='0x00007f03a95742d0' relocation_offset='296' insts_offset='560' stub_offset='2096' scopes_data_offset='2472' scopes_pcs_offset='2680' dependencies_offset='2984' nul_chk_table_offset='2992' oops_offset='2424' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator hasNext ()Z' bytes='92' count='384' iicount='384' stamp='25.140'/>
hotspot_pid19296.log:<nmethod compile_id='11143' compiler='C1' level='3' entry='0x00007f03a8ec0680' size='3656' address='0x00007f03a8ec0450' relocation_offset='296' insts_offset='560' stub_offset='2352' scopes_data_offset='2752' scopes_pcs_offset='3192' dependencies_offset='3592' nul_chk_table_offset='3600' oops_offset='2664' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator next ()Lorg/apache/spark/sql/catalyst/InternalRow;' bytes='88' count='384' iicount='384' stamp='34.011'/>
hotspot_pid19296.log:<nmethod compile_id='11144' compiler='C1' level='3' entry='0x00007f03a9a5dbc0' size='226544' address='0x00007f03a9a5a890' relocation_offset='296' insts_offset='13104' stub_offset='155280' scopes_data_offset='163488' scopes_pcs_offset='198456' dependencies_offset='222520' nul_chk_table_offset='222528' oops_offset='163432' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator extractors0$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='6867' count='391' iicount='391' stamp='34.105'/>
hotspot_pid19296.log:<nmethod compile_id='11255' compiler='C1' level='3' entry='0x00007f03aa327e00' size='226632' address='0x00007f03aa324ad0' relocation_offset='296' insts_offset='13104' stub_offset='155280' scopes_data_offset='163472' scopes_pcs_offset='198544' dependencies_offset='222608' nul_chk_table_offset='222616' oops_offset='163432' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator extractors2$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='7001' count='521' iicount='521' stamp='37.163'/>
hotspot_pid19296.log:<nmethod compile_id='11256' compiler='C1' level='3' entry='0x00007f03aa35f380' size='226664' address='0x00007f03aa35c050' relocation_offset='296' insts_offset='13104' stub_offset='155280' scopes_data_offset='163472' scopes_pcs_offset='198552' dependencies_offset='222632' nul_chk_table_offset='222640' oops_offset='163432' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator extractors4$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='7001' count='530' iicount='530' stamp='37.286'/>
hotspot_pid19296.log:<nmethod compile_id='11257' compiler='C1' level='3' entry='0x00007f03aa396900' size='226664' address='0x00007f03aa3935d0' relocation_offset='296' insts_offset='13104' stub_offset='155280' scopes_data_offset='163472' scopes_pcs_offset='198552' dependencies_offset='222632' nul_chk_table_offset='222640' oops_offset='163432' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator extractors5$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='7001' count='541' iicount='541' stamp='37.427'/>
hotspot_pid19296.log:<nmethod compile_id='11263' compiler='C1' level='3' entry='0x00007f03aa3cde80' size='226632' address='0x00007f03aa3cab50' relocation_offset='296' insts_offset='13104' stub_offset='155280' scopes_data_offset='163472' scopes_pcs_offset='198544' dependencies_offset='222608' nul_chk_table_offset='222616' oops_offset='163432' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator extractors1$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='7001' count='547' iicount='547' stamp='37.516'/>
hotspot_pid19296.log:<nmethod compile_id='11264' compiler='C1' level='3' entry='0x00007f03aa405400' size='226664' address='0x00007f03aa4020d0' relocation_offset='296' insts_offset='13104' stub_offset='155280' scopes_data_offset='163472' scopes_pcs_offset='198552' dependencies_offset='222632' nul_chk_table_offset='222640' oops_offset='163432' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator extractors3$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='7001' count='555' iicount='555' stamp='37.607'/>
hotspot_pid19296.log:<nmethod compile_id='10750' compiler='C1' level='3' entry='0x00007f03a9fe9dc0' size='340208' address='0x00007f03a9fe5790' relocation_offset='296' insts_offset='17968' stub_offset='182384' scopes_data_offset='193120' scopes_pcs_offset='304680' dependencies_offset='335480' handler_table_offset='335488' nul_chk_table_offset='337840' oops_offset='192920' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator accessors0$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='5367' count='231' iicount='231' stamp='29.373'/>
hotspot_pid19296.log:<nmethod compile_id='10751' compiler='C1' level='3' entry='0x00007f03aa03cec0' size='340528' address='0x00007f03aa038890' relocation_offset='296' insts_offset='17968' stub_offset='182608' scopes_data_offset='193296' scopes_pcs_offset='305000' dependencies_offset='335800' handler_table_offset='335808' nul_chk_table_offset='338160' oops_offset='193144' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator accessors1$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='5501' count='239' iicount='239' stamp='29.464'/>
hotspot_pid19296.log:<nmethod compile_id='11053' compiler='C1' level='3' entry='0x00007f03aa1c74c0' size='340528' address='0x00007f03aa1c2e90' relocation_offset='296' insts_offset='17968' stub_offset='182608' scopes_data_offset='193296' scopes_pcs_offset='305000' dependencies_offset='335800' handler_table_offset='335808' nul_chk_table_offset='338160' oops_offset='193144' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator accessors2$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='5501' count='367' iicount='367' stamp='32.552'/>
hotspot_pid19296.log:<nmethod compile_id='11056' compiler='C1' level='3' entry='0x00007f03aa227c40' size='340528' address='0x00007f03aa223610' relocation_offset='296' insts_offset='17968' stub_offset='182608' scopes_data_offset='193296' scopes_pcs_offset='305000' dependencies_offset='335800' handler_table_offset='335808' nul_chk_table_offset='338160' oops_offset='193144' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator accessors5$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='5501' count='370' iicount='370' stamp='32.699'/>
hotspot_pid19296.log:<nmethod compile_id='11054' compiler='C1' level='3' entry='0x00007f03aa27ec40' size='340528' address='0x00007f03aa27a610' relocation_offset='296' insts_offset='17968' stub_offset='182608' scopes_data_offset='193296' scopes_pcs_offset='305000' dependencies_offset='335800' handler_table_offset='335808' nul_chk_table_offset='338160' oops_offset='193144' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator accessors3$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='5501' count='370' iicount='370' stamp='32.948'/>
hotspot_pid19296.log:<nmethod compile_id='11055' compiler='C1' level='3' entry='0x00007f03aa2d1e80' size='340528' address='0x00007f03aa2cd850' relocation_offset='296' insts_offset='17968' stub_offset='182608' scopes_data_offset='193296' scopes_pcs_offset='305000' dependencies_offset='335800' handler_table_offset='335808' nul_chk_table_offset='338160' oops_offset='193144' method='org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator accessors4$ (Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificColumnarIterator;)V' bytes='5501' count='369' iicount='369' stamp='33.083'/>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I confirmed this by running the following program:
val df = sc.parallelize(1 to 100).toDF()
val aggr = {1 to 3000}.map(colnum => avg(df.col("_1")).as(s"col_$colnum"))
val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache()
var i = 0
for (i <- 0 to 110)
res.collect()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They could be jitted after decrease it to 500, right?
val shortCls = accessorCls.substring(accessorCls.lastIndexOf(".") + 1) | ||
dt match { | ||
case t if ctx.isPrimitiveType(dt) => | ||
s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortCls)}($index);" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should just call ColumnAccessor.apply()
(making it accessible in generated java code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain this in detail? I cannot understand your suggestion. Should this call we put ``ColumnAccessor.apply()` in this method or generated java code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$accessorName = ($accessorCls) ColumnAccessor.apply(columnTypes[$index], ByteBuffer.wrap(batch.buffers()[columnIndexes[$index]]));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may change the number of bytes per columns, you need to redo the calculation and test it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, a generated method getIntColumnAccessor()
still calls ``ColumnAccessor.apply()`.
Do you want to directly call ColumnAccessor.apply()
from a method hasNext()
instead of calling it thru getIntColumnAccessor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to call ColumnAccessor.apply()
to avoid these complicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your motivation. I will revert my changes to avoid these complicity for reducing bytecode size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you directly call ColumnAccessor.apply
, I think we don't need getXXXColumnAccessor
anymore?
Test build #54538 has finished for PR 11984 at commit
|
Test build #54599 has finished for PR 11984 at commit
|
Again, I confirmed whether these method are compiled.
|
* We should keep less than 8000 | ||
*/ | ||
val numberOfStatementsThreshold = 200 | ||
val (initializerAccessorFuncs, initializerAccessorCalls, extractorFuncs, extractorCalls) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could use ctx.addFunction
to simplify these
Test build #54606 has finished for PR 11984 at commit
|
Test build #54607 has finished for PR 11984 at commit
|
Test build #54654 has finished for PR 11984 at commit
|
Test build #54659 has finished for PR 11984 at commit
|
LGTM, merging this into master, and 1.6 (if no conflict). |
…xceed JVM size limit for cached DataFrames ## What changes were proposed in this pull request? This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using two approaches: 1. Generate and call ```getTYPEColumnAccessor()``` for each type, which is actually used, for instantiating accessors 2. Group a lot of method calls (more than 4000) into a method ## How was this patch tested? Added a new unit test to ```InMemoryColumnarQuerySuite``` Here is generate code ```java /* 033 */ private org.apache.spark.sql.execution.columnar.CachedBatch batch = null; /* 034 */ /* 035 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor; /* 036 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor1; /* 037 */ /* 038 */ public SpecificColumnarIterator() { /* 039 */ this.nativeOrder = ByteOrder.nativeOrder(); /* 030 */ this.mutableRow = new MutableUnsafeRow(rowWriter); /* 041 */ } /* 042 */ /* 043 */ public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes, /* 044 */ boolean columnNullables[]) { /* 044 */ this.input = input; /* 046 */ this.columnTypes = columnTypes; /* 047 */ this.columnIndexes = columnIndexes; /* 048 */ } /* 049 */ /* 050 */ /* 051 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor getIntColumnAccessor(int idx) { /* 052 */ byte[] buffer = batch.buffers()[columnIndexes[idx]]; /* 053 */ return new org.apache.spark.sql.execution.columnar.IntColumnAccessor(ByteBuffer.wrap(buffer).order(nativeOrder)); /* 054 */ } /* 055 */ /* 056 */ /* 057 */ /* 058 */ /* 059 */ /* 060 */ /* 061 */ public boolean hasNext() { /* 062 */ if (currentRow < numRowsInBatch) { /* 063 */ return true; /* 064 */ } /* 065 */ if (!input.hasNext()) { /* 066 */ return false; /* 067 */ } /* 068 */ /* 069 */ batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); /* 070 */ currentRow = 0; /* 071 */ numRowsInBatch = batch.numRows(); /* 072 */ accessor = getIntColumnAccessor(0); /* 073 */ accessor1 = getIntColumnAccessor(1); /* 074 */ /* 075 */ return hasNext(); /* 076 */ } /* 077 */ /* 078 */ public InternalRow next() { /* 079 */ currentRow += 1; /* 080 */ bufferHolder.reset(); /* 081 */ rowWriter.zeroOutNullBytes(); /* 082 */ accessor.extractTo(mutableRow, 0); /* 083 */ accessor1.extractTo(mutableRow, 1); /* 084 */ unsafeRow.setTotalSize(bufferHolder.totalSize()); /* 085 */ return unsafeRow; /* 086 */ } ``` (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #11984 from kiszk/SPARK-14138.
@kiszk Could you create another patch for 1.6 ? |
@davies , thank you for merging it. Do you want me to create another patch for master? This is because you have merged this into 1.6. |
@kiszk Just realized that this PR is against 1.6 branch, could you always create PR for master first? |
@kiszk Please send a PR for master, and close this PR, thanks! |
@davies , sorry for made a mistake. I will first create a PR for the master next time. |
…xceed JVM size limit for cached DataFrames ## What changes were proposed in this pull request? This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using two approaches: 1. Generate and call ```getTYPEColumnAccessor()``` for each type, which is actually used, for instantiating accessors 2. Group a lot of method calls (more than 4000) into a method ## How was this patch tested? Added a new unit test to ```InMemoryColumnarQuerySuite``` Here is generate code ```java /* 033 */ private org.apache.spark.sql.execution.columnar.CachedBatch batch = null; /* 034 */ /* 035 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor; /* 036 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor1; /* 037 */ /* 038 */ public SpecificColumnarIterator() { /* 039 */ this.nativeOrder = ByteOrder.nativeOrder(); /* 030 */ this.mutableRow = new MutableUnsafeRow(rowWriter); /* 041 */ } /* 042 */ /* 043 */ public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes, /* 044 */ boolean columnNullables[]) { /* 044 */ this.input = input; /* 046 */ this.columnTypes = columnTypes; /* 047 */ this.columnIndexes = columnIndexes; /* 048 */ } /* 049 */ /* 050 */ /* 051 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor getIntColumnAccessor(int idx) { /* 052 */ byte[] buffer = batch.buffers()[columnIndexes[idx]]; /* 053 */ return new org.apache.spark.sql.execution.columnar.IntColumnAccessor(ByteBuffer.wrap(buffer).order(nativeOrder)); /* 054 */ } /* 055 */ /* 056 */ /* 057 */ /* 058 */ /* 059 */ /* 060 */ /* 061 */ public boolean hasNext() { /* 062 */ if (currentRow < numRowsInBatch) { /* 063 */ return true; /* 064 */ } /* 065 */ if (!input.hasNext()) { /* 066 */ return false; /* 067 */ } /* 068 */ /* 069 */ batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); /* 070 */ currentRow = 0; /* 071 */ numRowsInBatch = batch.numRows(); /* 072 */ accessor = getIntColumnAccessor(0); /* 073 */ accessor1 = getIntColumnAccessor(1); /* 074 */ /* 075 */ return hasNext(); /* 076 */ } /* 077 */ /* 078 */ public InternalRow next() { /* 079 */ currentRow += 1; /* 080 */ bufferHolder.reset(); /* 081 */ rowWriter.zeroOutNullBytes(); /* 082 */ accessor.extractTo(mutableRow, 0); /* 083 */ accessor1.extractTo(mutableRow, 1); /* 084 */ unsafeRow.setTotalSize(bufferHolder.totalSize()); /* 085 */ return unsafeRow; /* 086 */ } ``` (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes apache#11984 from kiszk/SPARK-14138. (cherry picked from commit f12f11e)
What changes were proposed in this pull request?
This PR reduces Java byte code size of method in
SpecificColumnarIterator
by using two approaches:getTYPEColumnAccessor()
for each type, which is actually used, for instantiating accessorsHow was this patch tested?
Added a new unit test to
InMemoryColumnarQuerySuite
Here is generate code
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)