-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22284][SQL] Fix 64KB JVM bytecode limit problem in calculating hash for nested structs #19563
Conversation
Test build #83010 has finished for PR 19563 at commit
|
Test build #83017 has finished for PR 19563 at commit
|
@cloud-fan would it be possible to review this? |
}.mkString("\n") | ||
} | ||
val args = if (ctx.INPUT_ROW != null) { | ||
Seq(("InternalRow", input), ("InternalRow", ctx.INPUT_ROW)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I cannot understand why you need to pass ctx.INPUT_ROW
as an argument, might you please explain me? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I conservatively pass ctx.INPUT_ROW
. When I revisit this question, I believe that elements in struct
would not use ctx.INPUT_ROW
.
nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx) | ||
}.mkString("\n") | ||
} | ||
ctx.splitExpressions(hashes, "apply", ("InternalRow", input) :: Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then I think that here the best option would be ctx.splitExpressions(input, hashes)
which contains additional safety checks and I think is easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, done
Test build #83082 has finished for PR 19563 at commit
|
new GenericInternalRow(Array[Any]( | ||
UTF8String.fromString((j * L + i).toString)))) | ||
.toArray[Any])).toArray[Any]) | ||
var inner1 = new StructType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about avoiding the usage of var
here and in the other places by passing a Seq
of fields in the constructor?
The fields may be created using range generation and map
instead of for
loops.
I think in this way we would be more compliant to general functional Scala style, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, done
Test build #83185 has finished for PR 19563 at commit
|
thanks for addressing the comments @kiszk , now it LGTM |
val murmursHashEval1 = Murmur3Hash(exprs1, seed).eval(wideRow1) | ||
assert(murmur3HashPlan1(wideRow1).getInt(0) == murmursHashEval1) | ||
|
||
val wideRow2 = new GenericInternalRow(Seq.tabulate(O)(k => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this case totally covers the previous case, can we just keep this and remove wideRow1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done
LGTM |
Test build #83690 has finished for PR 19563 at commit
|
thanks, merging to master/2.2! |
… hash for nested structs ## What changes were proposed in this pull request? This PR avoids to generate a huge method for calculating a murmur3 hash for nested structs. This PR splits a huge method (e.g. `apply_4`) into multiple smaller methods. Sample program ``` val structOfString = new StructType().add("str", StringType) var inner = new StructType() for (_ <- 0 until 800) { inner = inner1.add("structOfString", structOfString) } var schema = new StructType() for (_ <- 0 until 50) { schema = schema.add("structOfStructOfStrings", inner) } GenerateMutableProjection.generate(Seq(Murmur3Hash(exprs, 42))) ``` Without this PR ``` /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private InternalRow mutableRow; /* 009 */ private int value; /* 010 */ private int value_0; ... /* 034 */ public java.lang.Object apply(java.lang.Object _i) { /* 035 */ InternalRow i = (InternalRow) _i; /* 036 */ /* 037 */ /* 038 */ /* 039 */ value = 42; /* 040 */ apply_0(i); /* 041 */ apply_1(i); /* 042 */ apply_2(i); /* 043 */ apply_3(i); /* 044 */ apply_4(i); /* 045 */ nestedClassInstance.apply_5(i); ... /* 089 */ nestedClassInstance8.apply_49(i); /* 090 */ value_0 = value; /* 091 */ /* 092 */ // copy all the results into MutableRow /* 093 */ mutableRow.setInt(0, value_0); /* 094 */ return mutableRow; /* 095 */ } /* 096 */ /* 097 */ /* 098 */ private void apply_4(InternalRow i) { /* 099 */ /* 100 */ boolean isNull5 = i.isNullAt(4); /* 101 */ InternalRow value5 = isNull5 ? null : (i.getStruct(4, 800)); /* 102 */ if (!isNull5) { /* 103 */ /* 104 */ if (!value5.isNullAt(0)) { /* 105 */ /* 106 */ final InternalRow element6400 = value5.getStruct(0, 1); /* 107 */ /* 108 */ if (!element6400.isNullAt(0)) { /* 109 */ /* 110 */ final UTF8String element6401 = element6400.getUTF8String(0); /* 111 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6401.getBaseObject(), element6401.getBaseOffset(), element6401.numBytes(), value); /* 112 */ /* 113 */ } /* 114 */ /* 115 */ /* 116 */ } /* 117 */ /* 118 */ /* 119 */ if (!value5.isNullAt(1)) { /* 120 */ /* 121 */ final InternalRow element6402 = value5.getStruct(1, 1); /* 122 */ /* 123 */ if (!element6402.isNullAt(0)) { /* 124 */ /* 125 */ final UTF8String element6403 = element6402.getUTF8String(0); /* 126 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6403.getBaseObject(), element6403.getBaseOffset(), element6403.numBytes(), value); /* 127 */ /* 128 */ } /* 128 */ } /* 129 */ /* 130 */ /* 131 */ } /* 132 */ /* 133 */ /* 134 */ if (!value5.isNullAt(2)) { /* 135 */ /* 136 */ final InternalRow element6404 = value5.getStruct(2, 1); /* 137 */ /* 138 */ if (!element6404.isNullAt(0)) { /* 139 */ /* 140 */ final UTF8String element6405 = element6404.getUTF8String(0); /* 141 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6405.getBaseObject(), element6405.getBaseOffset(), element6405.numBytes(), value); /* 142 */ /* 143 */ } /* 144 */ /* 145 */ /* 146 */ } /* 147 */ ... /* 12074 */ if (!value5.isNullAt(798)) { /* 12075 */ /* 12076 */ final InternalRow element7996 = value5.getStruct(798, 1); /* 12077 */ /* 12078 */ if (!element7996.isNullAt(0)) { /* 12079 */ /* 12080 */ final UTF8String element7997 = element7996.getUTF8String(0); /* 12083 */ } /* 12084 */ /* 12085 */ /* 12086 */ } /* 12087 */ /* 12088 */ /* 12089 */ if (!value5.isNullAt(799)) { /* 12090 */ /* 12091 */ final InternalRow element7998 = value5.getStruct(799, 1); /* 12092 */ /* 12093 */ if (!element7998.isNullAt(0)) { /* 12094 */ /* 12095 */ final UTF8String element7999 = element7998.getUTF8String(0); /* 12096 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element7999.getBaseObject(), element7999.getBaseOffset(), element7999.numBytes(), value); /* 12097 */ /* 12098 */ } /* 12099 */ /* 12100 */ /* 12101 */ } /* 12102 */ /* 12103 */ } /* 12104 */ /* 12105 */ } /* 12106 */ /* 12106 */ /* 12107 */ /* 12108 */ private void apply_1(InternalRow i) { ... ``` With this PR ``` /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private InternalRow mutableRow; /* 009 */ private int value; /* 010 */ private int value_0; /* 011 */ ... /* 034 */ public java.lang.Object apply(java.lang.Object _i) { /* 035 */ InternalRow i = (InternalRow) _i; /* 036 */ /* 037 */ /* 038 */ /* 039 */ value = 42; /* 040 */ nestedClassInstance11.apply50_0(i); /* 041 */ nestedClassInstance11.apply50_1(i); ... /* 088 */ nestedClassInstance11.apply50_48(i); /* 089 */ nestedClassInstance11.apply50_49(i); /* 090 */ value_0 = value; /* 091 */ /* 092 */ // copy all the results into MutableRow /* 093 */ mutableRow.setInt(0, value_0); /* 094 */ return mutableRow; /* 095 */ } /* 096 */ ... /* 37717 */ private void apply4_0(InternalRow value5, InternalRow i) { /* 37718 */ /* 37719 */ if (!value5.isNullAt(0)) { /* 37720 */ /* 37721 */ final InternalRow element6400 = value5.getStruct(0, 1); /* 37722 */ /* 37723 */ if (!element6400.isNullAt(0)) { /* 37724 */ /* 37725 */ final UTF8String element6401 = element6400.getUTF8String(0); /* 37726 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6401.getBaseObject(), element6401.getBaseOffset(), element6401.numBytes(), value); /* 37727 */ /* 37728 */ } /* 37729 */ /* 37730 */ /* 37731 */ } /* 37732 */ /* 37733 */ if (!value5.isNullAt(1)) { /* 37734 */ /* 37735 */ final InternalRow element6402 = value5.getStruct(1, 1); /* 37736 */ /* 37737 */ if (!element6402.isNullAt(0)) { /* 37738 */ /* 37739 */ final UTF8String element6403 = element6402.getUTF8String(0); /* 37740 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6403.getBaseObject(), element6403.getBaseOffset(), element6403.numBytes(), value); /* 37741 */ /* 37742 */ } /* 37743 */ /* 37744 */ /* 37745 */ } /* 37746 */ /* 37747 */ if (!value5.isNullAt(2)) { /* 37748 */ /* 37749 */ final InternalRow element6404 = value5.getStruct(2, 1); /* 37750 */ /* 37751 */ if (!element6404.isNullAt(0)) { /* 37752 */ /* 37753 */ final UTF8String element6405 = element6404.getUTF8String(0); /* 37754 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6405.getBaseObject(), element6405.getBaseOffset(), element6405.numBytes(), value); /* 37755 */ /* 37756 */ } /* 37757 */ /* 37758 */ /* 37759 */ } /* 37760 */ /* 37761 */ } ... /* 218470 */ /* 218471 */ private void apply50_4(InternalRow i) { /* 218472 */ /* 218473 */ boolean isNull5 = i.isNullAt(4); /* 218474 */ InternalRow value5 = isNull5 ? null : (i.getStruct(4, 800)); /* 218475 */ if (!isNull5) { /* 218476 */ apply4_0(value5, i); /* 218477 */ apply4_1(value5, i); /* 218478 */ apply4_2(value5, i); ... /* 218742 */ nestedClassInstance.apply4_266(value5, i); /* 218743 */ } /* 218744 */ /* 218745 */ } ``` ## How was this patch tested? Added new test to `HashExpressionsSuite` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #19563 from kiszk/SPARK-22284. (cherry picked from commit f2da738) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… hash for nested structs ## What changes were proposed in this pull request? This PR avoids to generate a huge method for calculating a murmur3 hash for nested structs. This PR splits a huge method (e.g. `apply_4`) into multiple smaller methods. Sample program ``` val structOfString = new StructType().add("str", StringType) var inner = new StructType() for (_ <- 0 until 800) { inner = inner1.add("structOfString", structOfString) } var schema = new StructType() for (_ <- 0 until 50) { schema = schema.add("structOfStructOfStrings", inner) } GenerateMutableProjection.generate(Seq(Murmur3Hash(exprs, 42))) ``` Without this PR ``` /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private InternalRow mutableRow; /* 009 */ private int value; /* 010 */ private int value_0; ... /* 034 */ public java.lang.Object apply(java.lang.Object _i) { /* 035 */ InternalRow i = (InternalRow) _i; /* 036 */ /* 037 */ /* 038 */ /* 039 */ value = 42; /* 040 */ apply_0(i); /* 041 */ apply_1(i); /* 042 */ apply_2(i); /* 043 */ apply_3(i); /* 044 */ apply_4(i); /* 045 */ nestedClassInstance.apply_5(i); ... /* 089 */ nestedClassInstance8.apply_49(i); /* 090 */ value_0 = value; /* 091 */ /* 092 */ // copy all the results into MutableRow /* 093 */ mutableRow.setInt(0, value_0); /* 094 */ return mutableRow; /* 095 */ } /* 096 */ /* 097 */ /* 098 */ private void apply_4(InternalRow i) { /* 099 */ /* 100 */ boolean isNull5 = i.isNullAt(4); /* 101 */ InternalRow value5 = isNull5 ? null : (i.getStruct(4, 800)); /* 102 */ if (!isNull5) { /* 103 */ /* 104 */ if (!value5.isNullAt(0)) { /* 105 */ /* 106 */ final InternalRow element6400 = value5.getStruct(0, 1); /* 107 */ /* 108 */ if (!element6400.isNullAt(0)) { /* 109 */ /* 110 */ final UTF8String element6401 = element6400.getUTF8String(0); /* 111 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6401.getBaseObject(), element6401.getBaseOffset(), element6401.numBytes(), value); /* 112 */ /* 113 */ } /* 114 */ /* 115 */ /* 116 */ } /* 117 */ /* 118 */ /* 119 */ if (!value5.isNullAt(1)) { /* 120 */ /* 121 */ final InternalRow element6402 = value5.getStruct(1, 1); /* 122 */ /* 123 */ if (!element6402.isNullAt(0)) { /* 124 */ /* 125 */ final UTF8String element6403 = element6402.getUTF8String(0); /* 126 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6403.getBaseObject(), element6403.getBaseOffset(), element6403.numBytes(), value); /* 127 */ /* 128 */ } /* 128 */ } /* 129 */ /* 130 */ /* 131 */ } /* 132 */ /* 133 */ /* 134 */ if (!value5.isNullAt(2)) { /* 135 */ /* 136 */ final InternalRow element6404 = value5.getStruct(2, 1); /* 137 */ /* 138 */ if (!element6404.isNullAt(0)) { /* 139 */ /* 140 */ final UTF8String element6405 = element6404.getUTF8String(0); /* 141 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6405.getBaseObject(), element6405.getBaseOffset(), element6405.numBytes(), value); /* 142 */ /* 143 */ } /* 144 */ /* 145 */ /* 146 */ } /* 147 */ ... /* 12074 */ if (!value5.isNullAt(798)) { /* 12075 */ /* 12076 */ final InternalRow element7996 = value5.getStruct(798, 1); /* 12077 */ /* 12078 */ if (!element7996.isNullAt(0)) { /* 12079 */ /* 12080 */ final UTF8String element7997 = element7996.getUTF8String(0); /* 12083 */ } /* 12084 */ /* 12085 */ /* 12086 */ } /* 12087 */ /* 12088 */ /* 12089 */ if (!value5.isNullAt(799)) { /* 12090 */ /* 12091 */ final InternalRow element7998 = value5.getStruct(799, 1); /* 12092 */ /* 12093 */ if (!element7998.isNullAt(0)) { /* 12094 */ /* 12095 */ final UTF8String element7999 = element7998.getUTF8String(0); /* 12096 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element7999.getBaseObject(), element7999.getBaseOffset(), element7999.numBytes(), value); /* 12097 */ /* 12098 */ } /* 12099 */ /* 12100 */ /* 12101 */ } /* 12102 */ /* 12103 */ } /* 12104 */ /* 12105 */ } /* 12106 */ /* 12106 */ /* 12107 */ /* 12108 */ private void apply_1(InternalRow i) { ... ``` With this PR ``` /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private InternalRow mutableRow; /* 009 */ private int value; /* 010 */ private int value_0; /* 011 */ ... /* 034 */ public java.lang.Object apply(java.lang.Object _i) { /* 035 */ InternalRow i = (InternalRow) _i; /* 036 */ /* 037 */ /* 038 */ /* 039 */ value = 42; /* 040 */ nestedClassInstance11.apply50_0(i); /* 041 */ nestedClassInstance11.apply50_1(i); ... /* 088 */ nestedClassInstance11.apply50_48(i); /* 089 */ nestedClassInstance11.apply50_49(i); /* 090 */ value_0 = value; /* 091 */ /* 092 */ // copy all the results into MutableRow /* 093 */ mutableRow.setInt(0, value_0); /* 094 */ return mutableRow; /* 095 */ } /* 096 */ ... /* 37717 */ private void apply4_0(InternalRow value5, InternalRow i) { /* 37718 */ /* 37719 */ if (!value5.isNullAt(0)) { /* 37720 */ /* 37721 */ final InternalRow element6400 = value5.getStruct(0, 1); /* 37722 */ /* 37723 */ if (!element6400.isNullAt(0)) { /* 37724 */ /* 37725 */ final UTF8String element6401 = element6400.getUTF8String(0); /* 37726 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6401.getBaseObject(), element6401.getBaseOffset(), element6401.numBytes(), value); /* 37727 */ /* 37728 */ } /* 37729 */ /* 37730 */ /* 37731 */ } /* 37732 */ /* 37733 */ if (!value5.isNullAt(1)) { /* 37734 */ /* 37735 */ final InternalRow element6402 = value5.getStruct(1, 1); /* 37736 */ /* 37737 */ if (!element6402.isNullAt(0)) { /* 37738 */ /* 37739 */ final UTF8String element6403 = element6402.getUTF8String(0); /* 37740 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6403.getBaseObject(), element6403.getBaseOffset(), element6403.numBytes(), value); /* 37741 */ /* 37742 */ } /* 37743 */ /* 37744 */ /* 37745 */ } /* 37746 */ /* 37747 */ if (!value5.isNullAt(2)) { /* 37748 */ /* 37749 */ final InternalRow element6404 = value5.getStruct(2, 1); /* 37750 */ /* 37751 */ if (!element6404.isNullAt(0)) { /* 37752 */ /* 37753 */ final UTF8String element6405 = element6404.getUTF8String(0); /* 37754 */ value = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(element6405.getBaseObject(), element6405.getBaseOffset(), element6405.numBytes(), value); /* 37755 */ /* 37756 */ } /* 37757 */ /* 37758 */ /* 37759 */ } /* 37760 */ /* 37761 */ } ... /* 218470 */ /* 218471 */ private void apply50_4(InternalRow i) { /* 218472 */ /* 218473 */ boolean isNull5 = i.isNullAt(4); /* 218474 */ InternalRow value5 = isNull5 ? null : (i.getStruct(4, 800)); /* 218475 */ if (!isNull5) { /* 218476 */ apply4_0(value5, i); /* 218477 */ apply4_1(value5, i); /* 218478 */ apply4_2(value5, i); ... /* 218742 */ nestedClassInstance.apply4_266(value5, i); /* 218743 */ } /* 218744 */ /* 218745 */ } ``` ## How was this patch tested? Added new test to `HashExpressionsSuite` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes apache#19563 from kiszk/SPARK-22284. (cherry picked from commit f2da738) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This PR avoids to generate a huge method for calculating a murmur3 hash for nested structs. This PR splits a huge method (e.g.
apply_4
) into multiple smaller methods.Sample program
Without this PR
With this PR
How was this patch tested?
Added new test to
HashExpressionsSuite