-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: Add support for array operations in PySpark backend #1983
Conversation
|
|
||
|
|
||
| @compiles(ops.ArraySlice) | ||
| def compile_array_slice(t, expr, scope, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pyspark slice() requires us to specify the length. In Ibis, end might be None and length would need to be calculated for every array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anonying, ok this is fine (for now)
| def test_array_collect(client): | ||
| table = client.table('array_table') | ||
| expr = table.group_by(table.key).aggregate( | ||
| collected=table.array_int.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This collects the arrays in array_int column after grouping by key. So for example, k1 corresponds to [1, 2, 3] and [] so the result will be one array [1, 2, 3].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it just concat all the arrays? i.e. if k1 corresponds [1, 2 ,3] and [4, 5] does it return [1, 2, 3, 4 ,5]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I misspoke. The result is a collection of arrays (e.g. [[1, 2, 3], [4, 5]]). Updated the test data to also collect a non-empty array for the same key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. I think you can rewrite some of the rules using built-in functions instead.
a4a9aed
to
49be246
Compare
6bb4945
to
09cae75
Compare
|
Rebased on top of latest Pyspark backend changes. |
…tional row to pyspark test array data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small changes requested
ibis/pyspark/compiler.py
Outdated
| spark_type = ibis_array_dtype_to_spark_dtype(op.arg.type()) | ||
|
|
||
| @F.udf(spark_type) | ||
| def slice(array): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please name this something else so it doesn't clash with the slice builtin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
ibis/pyspark/compiler.py
Outdated
| @@ -14,6 +14,8 @@ | |||
| from ibis import interval | |||
| from ibis.pyspark.operations import PySparkTable | |||
| from ibis.spark.compiler import SparkContext, SparkDialect | |||
| from ibis.spark.datatypes import ibis_array_dtype_to_spark_dtype, \ | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since i asked for another change, i'll also ask that the \ be replaced with ( and )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @hjoo
Implemented array operations for PySpark backend to pass all tests in `ibis/tests/all/test_array.py` as well as additional tests added in `ibis/pyspark/tests/test_array.py`. Features that are supported: - Array casting - Array length - Array slice - Array concat - Array repeat - Array collect Author: Hyonjee <hyonjee.joo@twosigma.com> Closes ibis-project#1983 from hjoo/pyspark-arrays and squashes the following commits: d3d69f7 [Hyonjee] rename slice, fix multi-line import 930391e [Hyonjee] update pyspark array repeat op to use pyspark sql functions, add additional row to pyspark test array data 09cae75 [Hyonjee] move pyspark test client fixture into conftest b615d89 [Hyonjee] use ibis_dtype_to_spark_dtype in pyspark cast 136c6b7 [Hyonjee] fix typo 0a304f3 [Hyonjee] rename pyspark test arry table 2deca61 [Hyonjee] xfail PySpark backend for test_scalar_param_array test a7a252c [Hyonjee] pyspark backend array operations and tests
Implemented array operations for PySpark backend to pass all tests in
ibis/tests/all/test_array.pyas well as additional tests added inibis/pyspark/tests/test_array.py.Features that are supported: