New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23926][SQL] Extending reverse function to support ArrayType arguments #21034

Closed
wants to merge 7 commits into
base: master
from

Conversation

Projects
None yet
7 participants
@mn-mikke
Contributor

mn-mikke commented Apr 10, 2018

What changes were proposed in this pull request?

This PR extends reverse functions to be able to operate over array columns and covers:

  • Introduction of Reverse expression that represents logic for reversing arrays and also strings
  • Removal of StringReverse expression
  • A wrapper for PySpark

How was this patch tested?

New tests added into:

  • CollectionExpressionsSuite
  • DataFrameFunctionsSuite

Codegen examples

Primitive type

val df = Seq(
  Seq(1, 3, 4, 2),
  null
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(reverse($"i")).debugCodegen

Result:

/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = inputadapter_value.copy();
/* 051 */           for(int k = 0; k < project_length / 2; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             boolean isNullAtK = project_value.isNullAt(k);
/* 054 */             boolean isNullAtL = project_value.isNullAt(l);
/* 055 */             if(!isNullAtK) {
/* 056 */               int el = project_value.getInt(k);
/* 057 */               if(!isNullAtL) {
/* 058 */                 project_value.setInt(k, project_value.getInt(l));
/* 059 */               } else {
/* 060 */                 project_value.setNullAt(k);
/* 061 */               }
/* 062 */               project_value.setInt(l, el);
/* 063 */             } else if (!isNullAtL) {
/* 064 */               project_value.setInt(k, project_value.getInt(l));
/* 065 */               project_value.setNullAt(l);
/* 066 */             }
/* 067 */           }
/* 068 */
/* 069 */         }

Non-primitive type

val df = Seq(
  Seq("a", "c", "d", "b"),
  null
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(reverse($"s")).debugCodegen

Result:

/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(new Object[project_length]);
/* 051 */           for(int k = 0; k < project_length; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             project_value.update(k, inputadapter_value.getUTF8String(l));
/* 054 */           }
/* 055 */
/* 056 */         }
lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType
override def checkInputDataTypes(): TypeCheckResult = {

This comment has been minimized.

@viirya

viirya Apr 11, 2018

Contributor

I think you can just make the inputTypes as Seq(TypeCollection(StringType, ArrayType)). ExpectsInputTypes can check the types.

This comment has been minimized.

@mn-mikke

mn-mikke Apr 11, 2018

Contributor

Great idea!

@gatorsmile

This comment has been minimized.

Member

gatorsmile commented Apr 11, 2018

ok to test

@SparkQA

This comment has been minimized.

SparkQA commented Apr 11, 2018

Test build #89175 has finished for PR 21034 at commit 3a76d87.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
@kiszk

This comment has been minimized.

Contributor

kiszk commented Apr 11, 2018

retest this please

@SparkQA

This comment has been minimized.

SparkQA commented Apr 11, 2018

Test build #89183 has finished for PR 21034 at commit 3a76d87.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
> SELECT _FUNC_(array(2, 1, 4, 3));
[3, 4, 1, 2]
""",
since = "2.4.0")

This comment has been minimized.

@HyukjinKwon

HyukjinKwon Apr 11, 2018

Member

Hmmm. I think the function reverse in SQL is there from 1.5 - SPARK-8258. Can we use 1.5.0 here and add a note via note field saying array type is supported from 2.4.0? there's an example for note:

note = """
Use RLIKE to match with standard regular expressions.
""")

}
s"""
|final int $length = $childName.numElements();

This comment has been minimized.

@kiszk

kiszk Apr 11, 2018

Contributor

nit: more spaces

@SparkQA

This comment has been minimized.

SparkQA commented Apr 11, 2018

Test build #89209 has finished for PR 21034 at commit 28ed664.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

SparkQA commented Apr 11, 2018

Test build #89219 has finished for PR 21034 at commit 638a0ff.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

SparkQA commented Apr 12, 2018

Test build #89256 has finished for PR 21034 at commit 30af8cd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • logInfo(s\"Failed to load main class $childMainClass.\")
  • error(s\"Cannot load main class from JAR $primaryResource\")
  • error(\"No main class set in JAR; please specify one with --class\")
  • checkArgument(mainClass != null, \"Missing example class name.\");
  • sealed trait Node extends Serializable
  • sealed trait ClassificationNode extends Node
  • sealed trait RegressionNode extends Node
  • sealed trait LeafNode extends Node
  • sealed trait InternalNode extends Node
  • class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, HasHandleInvalid, JavaMLReadable,
  • class KolmogorovSmirnovTest(object):
  • case class ExprCode(var code: String, var isNull: ExprValue, var value: ExprValue)
  • case class SubExprEliminationState(isNull: ExprValue, value: ExprValue)
  • |class SpecificUnsafeProjection extends $
  • trait JavaCode
  • trait ExprValue extends JavaCode
  • case class SimpleExprValue(expr: String, javaType: Class[_]) extends ExprValue
  • case class VariableValue(variableName: String, javaType: Class[_]) extends ExprValue
  • case class GlobalValue(value: String, javaType: Class[_]) extends ExprValue
  • class LiteralValue(val value: String, val javaType: Class[_]) extends ExprValue with Serializable
@SparkQA

This comment has been minimized.

SparkQA commented Apr 13, 2018

Test build #89301 has finished for PR 21034 at commit ad17d49.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

SparkQA commented Apr 13, 2018

Test build #89345 has finished for PR 21034 at commit aca511c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@mn-mikke

This comment has been minimized.

Contributor

mn-mikke commented Apr 15, 2018

Any other comments?

@SparkQA

This comment has been minimized.

SparkQA commented Apr 16, 2018

Test build #89396 has finished for PR 21034 at commit 460fea6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@ueshin

This comment has been minimized.

Member

ueshin commented Apr 17, 2018

LGTM.

@ueshin

This comment has been minimized.

Member

ueshin commented Apr 18, 2018

Thanks! merging to master.

@mn-mikke

This comment has been minimized.

Contributor

mn-mikke commented Apr 18, 2018

Thanks everybody for code reviews!

@asfgit asfgit closed this in f81fa47 Apr 18, 2018

pepinoflo added a commit to pepinoflo/spark that referenced this pull request May 15, 2018

[SPARK-23926][SQL] Extending reverse function to support ArrayType ar…
…guments

## What changes were proposed in this pull request?

This PR extends `reverse` functions to be able to operate over array columns and covers:
- Introduction of `Reverse` expression that represents logic for reversing arrays and also strings
- Removal of `StringReverse` expression
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(1, 3, 4, 2),
  null
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(reverse($"i")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = inputadapter_value.copy();
/* 051 */           for(int k = 0; k < project_length / 2; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             boolean isNullAtK = project_value.isNullAt(k);
/* 054 */             boolean isNullAtL = project_value.isNullAt(l);
/* 055 */             if(!isNullAtK) {
/* 056 */               int el = project_value.getInt(k);
/* 057 */               if(!isNullAtL) {
/* 058 */                 project_value.setInt(k, project_value.getInt(l));
/* 059 */               } else {
/* 060 */                 project_value.setNullAt(k);
/* 061 */               }
/* 062 */               project_value.setInt(l, el);
/* 063 */             } else if (!isNullAtL) {
/* 064 */               project_value.setInt(k, project_value.getInt(l));
/* 065 */               project_value.setNullAt(l);
/* 066 */             }
/* 067 */           }
/* 068 */
/* 069 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq("a", "c", "d", "b"),
  null
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(reverse($"s")).debugCodegen
```
Result:
```
/* 032 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */         null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */         boolean filter_value = true;
/* 037 */
/* 038 */         if (!(!inputadapter_isNull)) {
/* 039 */           filter_value = inputadapter_isNull;
/* 040 */         }
/* 041 */         if (!filter_value) continue;
/* 042 */
/* 043 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */         boolean project_isNull = inputadapter_isNull;
/* 046 */         ArrayData project_value = null;
/* 047 */
/* 048 */         if (!inputadapter_isNull) {
/* 049 */           final int project_length = inputadapter_value.numElements();
/* 050 */           project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(new Object[project_length]);
/* 051 */           for(int k = 0; k < project_length; k++) {
/* 052 */             int l = project_length - k - 1;
/* 053 */             project_value.update(k, inputadapter_value.getUTF8String(l));
/* 054 */           }
/* 055 */
/* 056 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes apache#21034 from mn-mikke/feature/array-api-reverse-to-master.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment