Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-17321][table] Add support casting of map to map and multiset to multiset #18287

Closed
wants to merge 5 commits into from

Conversation

snuyanzin
Copy link
Contributor

What is the purpose of the change

This PR adds support of casting maps to maps and multisets to multisets

Brief change log

  • Casting rule org.apache.flink.table.planner.functions.casting.MapToMapAndMultisetToMultisetCastRule
  • Tests in org.apache.flink.table.planner.functions.casting.CastRulesTest

Verifying this change

This change added tests and can be verified as follows:

  • Added tests in CastRulesTest that validate casting of maps to maps, multisets to multisets

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 6, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 6, 2022

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit a65fb5e (Thu Jan 06 13:06:01 UTC 2022)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick scan, and it looks good. Will do a proper review later.

Just one question, can you add one single test case for map to map and one for multiset to multiset in CastFunctionITCase, as it tests that it works through the stack? Because I'm pretty sure it doesn't, as you need to change LogicalTypeCasts as well.

@snuyanzin
Copy link
Contributor Author

I added test for map and multiset to CastFunctionITCase and for map it is ok while for multiset it fails with

rg.apache.flink.table.api.ValidationException: Could not cast the value of the 1 column: [ map(1, 1) ] of a row: [ null, map(1, 1) ] to the requested type: MULTISET<INT>

	at org.apache.flink.table.operations.utils.ValuesOperationFactory.lambda$null$4(ValuesOperationFactory.java:130)
	at java.util.Optional.orElseThrow(Optional.java:290)
	at org.apache.flink.table.operations.utils.ValuesOperationFactory.lambda$convertTopLevelExpressionToExpectedRowType$5(ValuesOperationFactory.java:127)
	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)

It seems the reason is not LogicalTypeCasts but absent converter for multiset here

I will have a look at it later today

@slinkydeveloper
Copy link
Contributor

Hey @snuyanzin have you checked this issue? There might be something helpful for you in the patch i posted there https://issues.apache.org/jira/browse/FLINK-25428.

If the multiset test is a blocker for you, and there's no easy fix for that, you can ignore it now, as it's still not well supported by the whole stack, so perhaps you can just leave it the test there commented.

@snuyanzin
Copy link
Contributor Author

snuyanzin commented Jan 6, 2022

@slinkydeveloper thanks for pointing the issue, I had a look however it seems it does not help with current multiset issue.
After some debugging I noticed there is a lack of support of multisets during resolving of multiset expressions.
I added some things to make casting working.
Not sure if it makes sense to have under this issue or under a separate one

@slinkydeveloper
Copy link
Contributor

slinkydeveloper commented Jan 7, 2022

@snuyanzin I rather prefer that the multiset e2e casting gets worked out in a separate issue, as it requires significant more testing to check that it works throughout the stack, and as you have shown in the commits, it also requires new function definitions to expose it to the user. That's something we should rather address in a separate context.

As a goal for this task i would say we want:

  • CastRulesTest tests for array to array, row to row, map to map and multiset to multiset
  • CastFunctionITCase tests for array to array, row to row and map to map.

@snuyanzin
Copy link
Contributor Author

ok, I removed multiset related fix from this PR and created a separate issue for that https://issues.apache.org/jira/browse/FLINK-25567

Now it should match the goal, please let me know if not

Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're good, but there's a mystery here worth to be investigated before merging 😄

Comment on lines +1149 to +1154
CastTestSpecBuilder.testCastTo(MAP(STRING(), STRING()))
.fromCase(MAP(FLOAT(), DOUBLE()), null, null)
.fromCase(
MAP(INT(), INT()),
Collections.singletonMap(1, 2),
Collections.singletonMap("1", "2"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I'm surprised that this works without changing anything in LogicalTypeCasts... Can you add a test case for failure? For example:

CastTestSpecBuilder
  .testCastTo(MAP(STRING(), STRING()))
  .fail(MAP(STRING(), ROW(INT())), whateverValue)

Same for ROW to ROW and ARRAY to ARRAY. Just take whatever inner invalid tuple and see what exception you get. If it fails within the single CastRule implementations, then something is wrong with LogicalTypeCasts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get, should it fail?

SELECT CAST(MAP['a', row(1)] AS MAP<STRING, STRING>);

I thought it should return something like that

{a=(1)}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not fail, the goal of this issue is to allow such casting. The thing is that I'm actually surprised it doesn't fail without touching LogicalTypeCasts, which is the class that checks valid/invalid casts.

EDIT: I've done some debugging, LogicalTypeCasts#supportsConstructedCasting does the job and just checks for the equality of the type root and for the castability of the children. So it works 😄

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, for jumping in so late.

index, valueArrayTerm, innerInputValueType),
"false",
// Null check is done at the array access level
innerInputValueType.copy(false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slinkydeveloper didn't we change this behavior recently and use a new method that deals with NullType?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, perhaps can you fix it when merging?

final String value = newName("value");

return new CastRuleUtils.CodeWriter()
.declStmt(className(Map.class), map, constructorCall(HashMap.class))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if a HashMap is safe to use because not all internal data structures have proper hashCode/equals. By looking at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#generateMap, I also don't see any HashMap. A safer solution sounds like reusing the array to array casting logic and apply it keyArray and valueArray? In general, we don't guarantee unique keys for maps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is safe at this level, as the input map is assumed to be valid and should not have multiple values for the same key anyway.

Copy link
Contributor

@twalthr twalthr Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cast to MAP<CHAR(1), X> could produce multiple keys. But after a offline discussion, I think it is safe to use a HashMap here and the problem is rather in the ScalarOperatorGens#generateMap.

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will apply the remaining issue while merging. Thanks @snuyanzin.

case MAP_VALUE_CONSTRUCTOR | MULTISET_VALUE =>
generateMapOrMultiset(ctx, resultType, operands)
// maps
case MAP_VALUE_CONSTRUCTOR =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snuyanzin Looking forward to a PR for this commit. Maybe you can also address the comment about the value constructor in this PR (using HashMap for deduplication of key).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @twalthr , thanks for highlighting this
yes I will try to handle this

@twalthr twalthr closed this in f5c99c6 Jan 18, 2022
niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
@snuyanzin snuyanzin deleted the flink17321 branch May 5, 2022 11:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants