Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flink): fix compilation of memtable with nested data #8751

Merged
merged 6 commits into from
Mar 27, 2024

Conversation

chloeh13q
Copy link
Contributor

@chloeh13q chloeh13q commented Mar 23, 2024

Description of changes

This PR aims to fix the compilation of memtables with nested data.

What was broken

In particular, Flink does not support the STRUCT(1 AS `a`) aliasing syntax to define named STRUCTs. In order to do so, we must use a workaround using CAST, e.g.,

SELECT CAST(('a', 1) as ROW<a STRING, b INT>);

However, Flink also does not allow you to directly construct ARRAYs of named STRUCTs using the ARRAY[] constructor. This is a bug that I identified and I have filed it with the Flink community (JIRA ticket ref: https://issues.apache.org/jira/browse/FLINK-34898).

For the time being, we will need to use another CAST workaound that casts the entire nested array, e.g.,

SELECT cast(ARRAY[ROW(1)] as ARRAY<ROW<a INT>>);  -- instead of ARRAY[CAST(ROW(1) AS ROW<a INT>)]

How to fix

To summarize,

  • if it’s an array of named structs
    CAST(ARRAY[] AS ARRAY<ROW<>, ROW<>>)
  • if it’s named structs
    CAST(ROW() AS ROW<datatype of each field>)
  • if it’s unnamed structs (but I'm not sure how to write this in Ibis)
    ROW()

I thought of two approaches to this:

  1. Rewrite the operator mapping in the Flink backend (i.e., change the visit_NonNullLiteral() method)
  2. Rewrite the translation rule in Flink's Generator

I found both implementations in different scenarios and decided to go with option (2).

Issues closed

#8516

@chloeh13q chloeh13q force-pushed the fix/flink-memtable branch 2 times, most recently from 1ab6833 to fd86d28 Compare March 23, 2024 03:02
def test_create_memtable(con, data, schema, expected):
t = ibis.memtable(data, schema=ibis.schema(schema))
# cannot use con.execute(t) directly because of some behavioral discrepancy between
# `TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raised an issue on Flink JIRA

@chloeh13q chloeh13q marked this pull request as ready for review March 23, 2024 03:22
Copy link
Member

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gnarly -- nice job tracking this down!
Can you also add a test that will start to XPASS when upstream supports using the ARRAY constructor with named structs?

ibis/backends/tests/test_map.py Outdated Show resolved Hide resolved
ibis/backends/sql/dialects.py Outdated Show resolved Hide resolved
ibis/backends/flink/tests/test_memtable.py Outdated Show resolved Hide resolved
ibis/backends/sql/dialects.py Outdated Show resolved Hide resolved
ibis/backends/tests/test_map.py Outdated Show resolved Hide resolved
@chloeh13q
Copy link
Contributor Author

@gforsyth Thanks for the review! I addressed all of the comments.

Copy link
Member

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I was staring at this a bit more and I think I have a cleaner way to handle the unaliasing pass -- lmk what you think.

Comment on lines 171 to 185
first_arg = seq_get(expression.expressions, 0)
if isinstance(first_arg, sge.Struct):
# it's an array of structs
named_structs = False
for arg in expression.expressions:
for e in arg.expressions:
if isinstance(e, sge.Alias):
named_structs = True
# get rid of aliasing because we want to compile this as CAST instead
args = deepcopy(expression.expressions)
if named_structs:
for arg in args:
arg.set("expressions", [e.this for e in arg.expressions])

format_values = ", ".join([self.sql(arg) for arg in args])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
first_arg = seq_get(expression.expressions, 0)
if isinstance(first_arg, sge.Struct):
# it's an array of structs
named_structs = False
for arg in expression.expressions:
for e in arg.expressions:
if isinstance(e, sge.Alias):
named_structs = True
# get rid of aliasing because we want to compile this as CAST instead
args = deepcopy(expression.expressions)
if named_structs:
for arg in args:
arg.set("expressions", [e.this for e in arg.expressions])
format_values = ", ".join([self.sql(arg) for arg in args])
first_arg = seq_get(expression.expressions, 0).copy()
if isinstance(first_arg, sge.Struct):
for arg in expression.expressions:
arg.set("expressions", [e.unalias() for e in arg.expressions])
format_values = ", ".join(
[self.sql(arg) for arg in expression.expressions]
)

there's an unalias method on sqlglot objects that I think lets us simplify this a fair bit (but I might not be covering some corner cases you've run across)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer - I like this solution and it definitely makes the code much cleaner! I'm tempted to modify on a deepcopy of expression.expression to avoid unexpected consequences with passing this around...

@chloeh13q chloeh13q requested a review from gforsyth March 26, 2024 22:34
Copy link
Member

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for bearing with my multi-stage review, @chloeh13q !

@gforsyth gforsyth merged commit 364a6ee into ibis-project:main Mar 27, 2024
82 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants