Skip to content

Conversation

shenzhu
Copy link
Contributor

@shenzhu shenzhu commented Nov 26, 2021

What is the purpose of the change

FLINK-24413: Fix trimming when casting to CHAR and VARCHAR

Brief change log

  • Update casting logic in StringToBinaryCastRule.java
  • Update unit tests

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is already covered by existing tests, such as CastRulesTest.java.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 26, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit aa962bd (Fri Nov 26 03:25:12 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@shenzhu shenzhu marked this pull request as ready for review November 26, 2021 08:38
} else if (targetLogicalType instanceof VarBinaryType) {
targetLength = ((VarBinaryType) targetLogicalType).getLength();
}
targetLength = Math.min(targetLength, inputLength);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove that, and instead below have a check like:

if (targetLength >= inputLength) {
    return methodCall(inputTerm, "toBytes");
} else {
    return staticCall(
            Arrays.class, "copyOfRange", methodCall(inputTerm, "toBytes"), 0, targetLength);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @matriv , thanks for your review! I will update the code here.

@matriv
Copy link
Contributor

matriv commented Nov 26, 2021

@shenzhu Could please adjust the title of your PR to:

[FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

?

@shenzhu shenzhu force-pushed the szhu/FLINK-24419/binary-not-trim branch from 87fd0c5 to 8e8aaa7 Compare November 26, 2021 20:10
@shenzhu shenzhu changed the title [FLINK-24419][Table SQL/API] Casting to a CHAR() and VARCHAR() doesn't trim the string to the specified precision [FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY Nov 26, 2021
@shenzhu
Copy link
Contributor Author

shenzhu commented Nov 26, 2021

@shenzhu Could please adjust the title of your PR to:

[FLINK-24419][table-planner] Trim to precision when casting to BINARY/VARBINARY

?

Sure, just updated PR and commit message, thank you!

@shenzhu
Copy link
Contributor Author

shenzhu commented Nov 27, 2021

Hey @matriv , I updated this PR based on your review, would you mind taking a look at it when you have a moment? Thanks!

Copy link
Contributor

@matriv matriv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @shenzhu I left a couple more comments, but overall the approach is not 100% correct, because it applies the trimming only when casting from string types. It should also apply it when casting between binary types themselves, i.e. a byte array with 10 bytes and type BINARY(10), casted to let's say a BINARY(5) (or VARBINARY(5)) should be trimmed to 5 bytes. So this logic should be applied in for all supported casts to BINARY/VARBINARY.

LogicalType inputLogicalType,
LogicalType targetLogicalType) {
return methodCall(inputTerm, "toBytes");
int inputLength = Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use int inputLength = LogicalTypeChecks.getLength(inputLogicalType) directly without instanceof.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

inputLength = ((VarCharType) inputLogicalType).getLength();
}

int targetLength = Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@shenzhu
Copy link
Contributor Author

shenzhu commented Nov 30, 2021

Hi @shenzhu I left a couple more comments, but overall the approach is not 100% correct, because it applies the trimming only when casting from string types. It should also apply it when casting between binary types themselves, i.e. a byte array with 10 bytes and type BINARY(10), casted to let's say a BINARY(5) (or VARBINARY(5)) should be trimmed to 5 bytes. So this logic should be applied in for all supported casts to BINARY/VARBINARY.

Hey @matriv , thanks for your review!

I checked the codebase and found currently we support casting from STRING/BINARY/VARBINARY/RAW to BINARY/VARBINARY. For these supported casts, STRING to BINARY/VARBINARY is supported by StringToBinaryCastRule, and the rest is supported via old casting rules in ScalarOperatorGens(BINARY|VARBINARY and RAW).

If I understand it correctly, we have two options for this task:

  1. Create BinaryToBinaryCastRule class and RawToBinaryCastRule class to follow the new casting rules
  2. Update the casting logic in ScalaOperatorGens for BINARY/VARBINARY and RAW to add truncation logic

I'm a little prefer Option 1 because seems that's the direction the community is trying to move forward(?), what do you think about this?

Thanks for your help!

@matriv
Copy link
Contributor

matriv commented Nov 30, 2021

@shenzhu Thx again for your time and effort here.
It would be great to go for option 1. since as you already mentioned we're trying to move all the logic in ScalarOperatorGens into the new java structure under the CastRule interface.

@shenzhu shenzhu force-pushed the szhu/FLINK-24419/binary-not-trim branch from d182991 to e7d585c Compare December 1, 2021 04:58
@shenzhu
Copy link
Contributor Author

shenzhu commented Dec 1, 2021

@shenzhu Thx again for your time and effort here. It would be great to go for option 1. since as you already mentioned we're trying to move all the logic in ScalarOperatorGens into the new java structure under the CastRule interface.

Hey @matriv , thanks for your feedback!
I updated this PR with option 1, would you mind taking a look at it when you have a moment?

Copy link
Contributor

@matriv matriv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you @shenzhu !

import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;

/** {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeFamily#BINARY_STRING} cast rule. */
public class BinaryToBinaryCastRule
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to make it public

Comment on lines 84 to 86
targetLength
+ " >= "
+ accessField(deserializedByteArrayTerm, "length"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you swap this condition as it's easier to read?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why swap? To me at least, it's easier to say, if (condition) then <normal> else <doSomethingmore>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With swap I mean accessField(deserializedByteArrayTerm, "length") + " <= " + targetLength, which IMO is more readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than adding the method CastRuleUtils#accessField, I think it makes sense to just add a method like CastRuleUtils#accessArrayLength. Can you also check if in the other rules the array length is used and replace it with CastRuleUtils#accessArrayLength

/* Example generated code for BINARY(3):
byte[] deserializedByteArray$0 = result$2.toBytes(typeSerializer$5);
if (deserializedByteArray$0 != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need this null check, perhaps can you add a test case where the raw value is null and see if this null check is really needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review! I think you are right, I printed the generated code and found this null check is not required

externalResult$0 = (byte[]) function_org$apache$flink$table$planner$functions$CastFunctionMiscITCase$IntegerToRaw$fd3c18d5f34fde99503159b4b5be6594
    .eval(false ? null : ((java.lang.Integer)((int) 123456)));

isNull$2 = externalResult$0 == null;
result$2 = null;
if (!isNull$2) {
    result$2 = (org.apache.flink.table.data.binary.BinaryRawValueData) converter$1.toInternalOrNull((byte[]) externalResult$0);
}

// --- Cast section generated by org.apache.flink.table.planner.functions.casting.RawToBinaryCastRule
isNull$3 = isNull$2;
if (!isNull$3) {
    byte[] deserializedByteArray$0 = result$2.toBytes(typeSerializer$5);
    if (deserializedByteArray$0 != null) {
        if (deserializedByteArray$0.length <= 3) {
            result$4 = deserializedByteArray$0;
        } else {
            result$4 = java.util.Arrays.copyOfRange(deserializedByteArray$0, 0, 3);
        }
    } else {
        result$4 = null;
    }
    isNull$3 = result$4 == null;
} else {
    result$4 = null;
}

// --- End cast section

int inputLength = LogicalTypeChecks.getLength(inputLogicalType);
int targetLength = LogicalTypeChecks.getLength(targetLogicalType);

if (targetLength >= inputLength) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swap this condition

int inputLength = LogicalTypeChecks.getLength(inputLogicalType);
int targetLength = LogicalTypeChecks.getLength(targetLogicalType);

if (targetLength >= inputLength) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swap this condition

Comment on lines 49 to 74
int inputLength = LogicalTypeChecks.getLength(inputLogicalType);
int targetLength = LogicalTypeChecks.getLength(targetLogicalType);

if (targetLength >= inputLength) {
return inputTerm;
} else {
return staticCall(Arrays.class, "copyOfRange", inputTerm, 0, targetLength);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately you can't assume that the input length is the same as the one defined in the type. In our codegen domain we can have a byte[] of length 10 with type defined as VARBINARY(20) or even BINARY(20). If you look carefully, even the RawToBinaryCastRule you introduced behaves the same when casting to BINARY(20) a raw which array has just size 10.

Hence you need the check at runtime to avoid an unnecessary copy, for example in this case:

  • input VARBINARY(30) with value byte[].length == 10
  • target VARBINARY(20)

The first branch of the if is fine IMO, because it's reasonable to assume that if the input length is minor or equal to the target length, then it's fine to just return the same term. The second branch on the other hand should do a length check with the ternary operator and skip the copy in case the runtime length is already less or equal to the target length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @slinkydeveloper! I totally missed that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I will update here to check during runtime

import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;

/** {@link LogicalTypeRoot#RAW} to {@link LogicalTypeFamily#BINARY_STRING} cast rule. */
public class RawToBinaryCastRule extends AbstractNullAwareCodeGeneratorCastRule<Object, byte[]> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove public

Comment on lines 52 to 108
int inputLength = LogicalTypeChecks.getLength(inputLogicalType);
int targetLength = LogicalTypeChecks.getLength(targetLogicalType);

if (targetLength >= inputLength) {
return methodCall(inputTerm, "toBytes");
} else {
return staticCall(
Arrays.class, "copyOfRange", methodCall(inputTerm, "toBytes"), 0, targetLength);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above about the unnecessary copy, but because you invoke toBytes() which is potentially expensive, convert this rule to a AbstractNullAwareCodeGeneratorCastRule and save the toBytes result in a local variable

@slinkydeveloper
Copy link
Contributor

slinkydeveloper commented Dec 2, 2021

Unless I'm mistaken, this PR is also fixing this issue https://issues.apache.org/jira/browse/FLINK-25051 correct? Can you double check @shenzhu? My understanding is that the existing codebase doesn't have an inverse BINARY -> RAW casting, right?

@matriv
Copy link
Contributor

matriv commented Dec 2, 2021

Unless I'm mistaken, this PR is also fixing this issue https://issues.apache.org/jira/browse/FLINK-25051 correct?

Yep, true, I asked @shenzhu to do it (see previous comments). @shenzhu Could you please split your PR in 2 commits?
One commit that does the porting and references FLINK-25051 and one that applies the trimming on top.

Can you double check @shenzhu? My understanding is that the existing codebase doesn't have an inverse BINARY -> RAW casting, right?

This is not supported yet: https://issues.apache.org/jira/browse/FLINK-24577 is still open

@shenzhu shenzhu force-pushed the szhu/FLINK-24419/binary-not-trim branch 3 times, most recently from bd5b627 to 56515bd Compare December 3, 2021 05:23
Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just some minor comments and then the PR looks good! Thank you for your contribution, it looks great!

Comment on lines 109 to 110
org.apache.flink.connector.file.src.reader.TextLineFormat.createReader(org.apache.flink.configuration.Configuration, org.apache.flink.core.fs.FSDataInputStream): Returned leaf type org.apache.flink.connector.file.src.reader.StreamFormat$Reader does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.file.src.reader.TextLineFormat.createReader(org.apache.flink.configuration.Configuration, org.apache.flink.core.fs.FSDataInputStream): Returned leaf type org.apache.flink.connector.file.src.reader.TextLineFormat$Reader does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this since it's being addressed already https://issues.apache.org/jira/browse/FLINK-25150

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got thanks! I will remove these lines.

Comment on lines +285 to +302
.fromCase(CHAR(3), "foo", new byte[] {102, 111})
.fromCase(VARCHAR(5), "Flink", new byte[] {70, 108})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for the non trim case where input length < target length? Both for the "type case" and for the "runtime case"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your detailed review!
I added more test casts to cover "type case" and "runtime case" when converting to VARBINARY, however, when converting to BINARY(3), seems Calcite 1.26 will pad the result to make sure it has the same length as target type, so converting CHAR(1) with value f to BINARY(3) will still result in {102, 0, 0}.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shenzhu Yep, this is something we have to implement as well. I'll open a separate issue for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in case of this CastFunctionITCase, if we have padding it's totally fine, as calcite performs optimizations out of our control.

But i'm pretty sure that test case should work as described by me in CastRulesTest, without padding

@shenzhu shenzhu force-pushed the szhu/FLINK-24419/binary-not-trim branch 4 times, most recently from e356372 to 45b7547 Compare December 7, 2021 03:46
@shenzhu shenzhu force-pushed the szhu/FLINK-24419/binary-not-trim branch 2 times, most recently from 605c0ab to 0095d26 Compare December 9, 2021 07:20
@matriv
Copy link
Contributor

matriv commented Dec 9, 2021

@shenzhu Again thx for your effort!

Please rebase with master and then:

  @Override
   protected Configuration configuration() {
       return super.configuration()
               .set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId())
               .set(
                       ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
                       LegacyCastBehaviour.DISABLED);
   }
``` to make sure that we always use the new behaviour for the IT tests.

STRING(),
fromString("Apache"),
new byte[] {65, 112, 97, 99, 104, 101}),
.fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the fromCase with the legacy behaviour flag and keep testing both with false and with true.

@shenzhu shenzhu force-pushed the szhu/FLINK-24419/binary-not-trim branch from 0095d26 to 28b096c Compare December 10, 2021 03:49
@shenzhu
Copy link
Contributor Author

shenzhu commented Dec 10, 2021

@shenzhu Again thx for your effort!

Please rebase with master and then:

  @Override
   protected Configuration configuration() {
       return super.configuration()
               .set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId())
               .set(
                       ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
                       LegacyCastBehaviour.DISABLED);
   }
``` to make sure that we always use the new behaviour for the IT tests.

Sure, thanks for your feedback! I will update this PR.

@shenzhu
Copy link
Contributor Author

shenzhu commented Dec 12, 2021

Hey @AHeise , would you mind taking a look at this PR when you have a moment? Thanks!

Copy link
Contributor

@matriv matriv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for addressing the comments!
I've left a couple of minor comments but otherwise LGTM

return className(clazz) + "." + fieldName;
}

static String accessArrayLength(String instanceTerm) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'd prefer: arrayLength without the access.

.fromCase(CHAR(3), fromString("foo"), new byte[] {102, 111})
.fromCase(CHAR(1), fromString("f"), new byte[] {102})
.fromCase(CHAR(3), fromString("f"), new byte[] {102})
.fromCase(VARCHAR(5), fromString("Flink"), new byte[] {70, 108})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Here and below, I would use the last arg with false to be more visible how the test behaves with legacy behaviour enabled and disabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Here and below, I would use the last arg with false to be more visible how the test behaves with legacy behaviour enabled and disabled.

Good point! Thanks for your review, I will update here.

@shenzhu shenzhu force-pushed the szhu/FLINK-24419/binary-not-trim branch from 28b096c to 0a89d73 Compare December 14, 2021 02:15
Copy link
Contributor

@matriv matriv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thank you @shenzhu !

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM I will fix my minor comments while merging

.build());
}

/* Example generated code for BINARY(@):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ => 2

String returnVariable,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
// Get length of target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment

LogicalType inputLogicalType,
LogicalType targetLogicalType) {
return methodCall(inputTerm, "toBytes");
// Get length of target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment

twalthr pushed a commit to twalthr/flink that referenced this pull request Dec 14, 2021
@shenzhu shenzhu force-pushed the szhu/FLINK-24419/binary-not-trim branch from 0a89d73 to 7dca486 Compare December 15, 2021 05:34
@twalthr twalthr closed this in 3ab9802 Dec 15, 2021
niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants