-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30356][SQL] Codegen support for the function str_to_map #27013
Conversation
cc: @cloud-fan @maropu @HyukjinKwon, thanks for reviewing and happy holidays! 🎇 |
Looks ok to me if the tests passed. |
Test build #115802 has finished for PR 27013 at commit
|
Test build #115808 has finished for PR 27013 at commit
|
|int $idx = 0; | ||
|UTF8String[] $keyValues = $text.split($pd, -1); | ||
|while ($idx < $keyValues.length) { | ||
| UTF8String[] $kv = $keyValues[$idx].split($kvd, 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't need create fresh name for kv.
@@ -403,10 +403,11 @@ case class CreateNamedStruct(children: Seq[Expression]) extends Expression { | |||
{"a":"1","b":"2","c":"3"} | |||
> SELECT _FUNC_('a'); | |||
{"a":null} | |||
""") | |||
""", | |||
since = "2.0.1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I go to confirm this since
tag, I looked at the original PR #13990.
The author implemented codegen version actually, but did remove it based on the comment: #13990 (comment)
I think the comment makes sense to me. To add or not codegen looks not a big deal. This implementation looks simple, anyway.
I have no special option to add it or not. cc @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Thanks for the pointer. I have no strong opinion, too, so I'll leave it to the @cloud-fan descision.
Could we elaborate more on what the patch is doing in the section of |
thanks for suggestion @HeartSaVioR |
s""" | ||
|int $idx = 0; | ||
|UTF8String[] $keyValues = $text.split($pd, -1); | ||
|while ($idx < $keyValues.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use for loop? then we don't need to declare int idx
outside of the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make idx
as freshName
because
Line 132 in c411579
InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; |
i
in scope, seems only happen in test cases?
not related to this PR, |
spark-sql> explain codegen select v, str_to_map(v) from values ('abc:a:a,:'), (null), (''), ('1:2') t(v)
> ;
Found 0 WholeStageCodegen subtrees.
Time taken: 0.104 seconds, Fetched 1 row(s) does this mean that whole stage codegen to be off? |
Yea, I know we disable whole stage codegen when seeing |
Test build #115832 has finished for PR 27013 at commit
|
Test build #115843 has finished for PR 27013 at commit
|
retest this please |
Test build #115849 has finished for PR 27013 at commit
|
thanks, merging to master! |
### What changes were proposed in this pull request? `str_to_map ` has not implemented with codegen support, which prevents a query that contains this expression from being whole stage codegen-ed. This PR removes `CodegenFallBack` from `StringToMap`, add the codegen support for it. ### Why are the changes needed? improve codegen coverage and gain better perfomance ### Does this PR introduce any user-facing change? no ### How was this patch tested? 1. pass ComplexTypeSuite 2. manually review generated code ```java -- !query 12 explain codegen select v, str_to_map(v) from values ('abc:a:a,:'), (null), (''), ('1:2') t(v) -- !query 12 schema struct<plan:string> -- !query 12 output Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 (maxMethodCodeSize:511; maxConstantPoolSize:188(0.29% used); numInnerClasses:0) == *Project [v#x, str_to_map(v#x, ,, :) AS str_to_map(v, ,, :)#x] +- *LocalTableScan [v#x] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator localtablescan_input_0; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1]; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[] project_mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[2]; /* 012 */ /* 013 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 014 */ this.references = references; /* 015 */ } /* 016 */ /* 017 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 018 */ partitionIndex = index; /* 019 */ this.inputs = inputs; /* 020 */ localtablescan_input_0 = inputs[0]; /* 021 */ project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64); /* 022 */ project_mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(project_mutableStateArray_0[0], 8); /* 023 */ project_mutableStateArray_1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(project_mutableStateArray_0[0], 8); /* 024 */ /* 025 */ } /* 026 */ /* 027 */ private void project_doConsume_0(InternalRow localtablescan_row_0, UTF8String project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException { /* 028 */ boolean project_isNull_1 = true; /* 029 */ MapData project_value_1 = null; /* 030 */ /* 031 */ if (!project_exprIsNull_0_0) { /* 032 */ project_isNull_1 = false; // resultCode could change nullability. /* 033 */ /* 034 */ int project_i_0 = 0; /* 035 */ UTF8String[] project_kvs_0 = project_expr_0_0.split(((UTF8String) references[2] /* literal */), -1); /* 036 */ while (project_i_0 < project_kvs_0.length) { /* 037 */ UTF8String[] kv = project_kvs_0[project_i_0].split(((UTF8String) references[3] /* literal */), 2); /* 038 */ UTF8String key = kv[0]; /* 039 */ UTF8String value = null; /* 040 */ if (kv.length == 2) { /* 041 */ value = kv[1]; /* 042 */ } /* 043 */ ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[1] /* mapBuilder */).put(key, value); /* 044 */ project_i_0++; /* 045 */ } /* 046 */ project_value_1 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[1] /* mapBuilder */).build(); /* 047 */ /* 048 */ } /* 049 */ project_mutableStateArray_0[0].reset(); /* 050 */ /* 051 */ project_mutableStateArray_0[0].zeroOutNullBytes(); /* 052 */ /* 053 */ if (project_exprIsNull_0_0) { /* 054 */ project_mutableStateArray_0[0].setNullAt(0); /* 055 */ } else { /* 056 */ project_mutableStateArray_0[0].write(0, project_expr_0_0); /* 057 */ } /* 058 */ /* 059 */ if (project_isNull_1) { /* 060 */ project_mutableStateArray_0[0].setNullAt(1); /* 061 */ } else { /* 062 */ final MapData project_tmpInput_0 = project_value_1; /* 063 */ if (project_tmpInput_0 instanceof UnsafeMapData) { /* 064 */ project_mutableStateArray_0[0].write(1, (UnsafeMapData) project_tmpInput_0); /* 065 */ } else { /* 066 */ // Remember the current cursor so that we can calculate how many bytes are /* 067 */ // written later. /* 068 */ final int project_previousCursor_0 = project_mutableStateArray_0[0].cursor(); /* 069 */ /* 070 */ // preserve 8 bytes to write the key array numBytes later. /* 071 */ project_mutableStateArray_0[0].grow(8); /* 072 */ project_mutableStateArray_0[0].increaseCursor(8); /* 073 */ /* 074 */ // Remember the current cursor so that we can write numBytes of key array later. /* 075 */ final int project_tmpCursor_0 = project_mutableStateArray_0[0].cursor(); /* 076 */ /* 077 */ final ArrayData project_tmpInput_1 = project_tmpInput_0.keyArray(); /* 078 */ if (project_tmpInput_1 instanceof UnsafeArrayData) { /* 079 */ project_mutableStateArray_0[0].write((UnsafeArrayData) project_tmpInput_1); /* 080 */ } else { /* 081 */ final int project_numElements_0 = project_tmpInput_1.numElements(); /* 082 */ project_mutableStateArray_1[0].initialize(project_numElements_0); /* 083 */ /* 084 */ for (int project_index_0 = 0; project_index_0 < project_numElements_0; project_index_0++) { /* 085 */ project_mutableStateArray_1[0].write(project_index_0, project_tmpInput_1.getUTF8String(project_index_0)); /* 086 */ } /* 087 */ } /* 088 */ /* 089 */ // Write the numBytes of key array into the first 8 bytes. /* 090 */ Platform.putLong( /* 091 */ project_mutableStateArray_0[0].getBuffer(), /* 092 */ project_tmpCursor_0 - 8, /* 093 */ project_mutableStateArray_0[0].cursor() - project_tmpCursor_0); /* 094 */ /* 095 */ final ArrayData project_tmpInput_2 = project_tmpInput_0.valueArray(); /* 096 */ if (project_tmpInput_2 instanceof UnsafeArrayData) { /* 097 */ project_mutableStateArray_0[0].write((UnsafeArrayData) project_tmpInput_2); /* 098 */ } else { /* 099 */ final int project_numElements_1 = project_tmpInput_2.numElements(); /* 100 */ project_mutableStateArray_1[1].initialize(project_numElements_1); /* 101 */ /* 102 */ for (int project_index_1 = 0; project_index_1 < project_numElements_1; project_index_1++) { /* 103 */ if (project_tmpInput_2.isNullAt(project_index_1)) { /* 104 */ project_mutableStateArray_1[1].setNull8Bytes(project_index_1); /* 105 */ } else { /* 106 */ project_mutableStateArray_1[1].write(project_index_1, project_tmpInput_2.getUTF8String(project_index_1)); /* 107 */ } /* 108 */ /* 109 */ } /* 110 */ } /* 111 */ /* 112 */ project_mutableStateArray_0[0].setOffsetAndSizeFromPreviousCursor(1, project_previousCursor_0); /* 113 */ } /* 114 */ } /* 115 */ append((project_mutableStateArray_0[0].getRow())); /* 116 */ /* 117 */ } /* 118 */ /* 119 */ protected void processNext() throws java.io.IOException { /* 120 */ while ( localtablescan_input_0.hasNext()) { /* 121 */ InternalRow localtablescan_row_0 = (InternalRow) localtablescan_input_0.next(); /* 122 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 123 */ boolean localtablescan_isNull_0 = localtablescan_row_0.isNullAt(0); /* 124 */ UTF8String localtablescan_value_0 = localtablescan_isNull_0 ? /* 125 */ null : (localtablescan_row_0.getUTF8String(0)); /* 126 */ /* 127 */ project_doConsume_0(localtablescan_row_0, localtablescan_value_0, localtablescan_isNull_0); /* 128 */ if (shouldStop()) return; /* 129 */ } /* 130 */ } /* 131 */ /* 132 */ } -- !query 13 select v, str_to_map(v) from values ('abc:a:a,:'), (null), (''), ('1:2') t(v) -- !query 13 schema struct<v:string,str_to_map(v, ,, :):map<string,string>> -- !query 13 output {"":null} 1:2 {"1":"2"} NULL NULL abc:a:a,: {"":"","abc":"a:a"} ``` Closes apache#27013 from yaooqinn/SPARK-30356. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan @yaooqinn @maropu @viirya the optimization of |
Seems like we don't support I'm a bit hesitant to revert this, as it doesn't fix the fundamental issue that whole-stage-codegen can be slower because of the lack of @viirya how hard it is to support |
We don't support subexpression elimination in whole-stage codegen. It is not a problem for this PR specially. I added subexpression elimination to Hash Aggregation codegen. So currently Hash Aggregation is the only one operator supporting subexpression elimination in whole-stage codegen. Using the same code, it is not hard to support subexpression elimination in other operators such as ProjectExec. But we should notice that for ProjectExec, we don't evaluate all expressions in project list at once in ProjectExec. Instead we defer the evaluation because not all expressions will be evaluated. For subexpression elimination, we need to evaluate subexpressions in ProjectExec. That's said, we might evaluate these expressions more early than current approach. |
Sounds fine. Can you create a PR for it? We can also run TPCDS benchmark to check it. |
OK, will create a PR for it later. |
@cloud-fan Created #29975 for it. |
What changes were proposed in this pull request?
str_to_map
has not implemented with codegen support, which prevents a query that contains this expression from being whole stage codegen-ed.This PR removes
CodegenFallBack
fromStringToMap
, add the codegen support for it.Why are the changes needed?
improve codegen coverage and gain better perfomance
Does this PR introduce any user-facing change?
no
How was this patch tested?
pass ComplexTypeSuite
manually review generated code