Skip to content

Commit

Permalink
[FLINK-35406] Use inner serializer when casting RAW type to BINARY or…
Browse files Browse the repository at this point in the history
… STRING in cast rules
  • Loading branch information
docete committed May 21, 2024
1 parent 5f5722f commit 049fbd2
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,18 @@ protected String generateCodeBlockInternal(
if (context.legacyBehaviour()
|| !(couldTrim(targetLength) || (couldPad(targetLogicalType, targetLength)))) {
return new CastRuleUtils.CodeWriter()
.assignStmt(returnVariable, methodCall(inputTerm, "toBytes", typeSerializer))
.assignStmt(
returnVariable,
methodCall(
inputTerm, "toBytes", typeSerializer + ".getInnerSerializer()"))
.toString();
} else {
return new CastRuleUtils.CodeWriter()
.declStmt(
byte[].class,
deserializedByteArrayTerm,
methodCall(inputTerm, "toBytes", typeSerializer))
methodCall(
inputTerm, "toBytes", typeSerializer + ".getInnerSerializer()"))
.ifStmt(
arrayLength(deserializedByteArrayTerm) + " <= " + targetLength,
thenWriter -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected String generateCodeBlockInternal(
.declStmt(
Object.class,
deserializedObjTerm,
methodCall(inputTerm, "toObject", typeSerializer))
methodCall(inputTerm, "toObject", typeSerializer + ".getInnerSerializer()"))
.ifStmt(
deserializedObjTerm + " != null",
thenWriter ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

package org.apache.flink.table.planner.functions;

import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -191,6 +194,17 @@ Stream<TestSetSpec> getTestSetSpecs() {
.cast(VARBINARY(50)),
new byte[] {0, 1, -30, 64, 0, 70, 0, 108, 0, 105, 0, 110, 0, 107},
VARBINARY(50)),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.CAST,
"cast from RAW(LocalDateTime) to BINARY(13)")
.onFieldsWithData("2020-11-11T18:08:01.123")
.andDataTypes(STRING())
.withFunction(LocalDateTimeToRaw.class)
.testTableApiResult(
call("LocalDateTimeToRaw", $("f0")).cast(BINARY(13)),
serializeLocalDateTime(
LocalDateTime.parse("2020-11-11T18:08:01.123")),
BINARY(13)),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.CAST, "test the x'....' binary syntax")
.onFieldsWithData("foo")
Expand Down Expand Up @@ -223,6 +237,13 @@ Stream<TestSetSpec> getTestSetSpecs() {
call("CreateMultiset", $("f0")).cast(STRING()),
"{a=1, b=2}",
STRING()),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.CAST, "cast RAW(Integer) to STRING")
.onFieldsWithData(123456)
.andDataTypes(INT())
.withFunction(IntegerToRaw.class)
.testTableApiResult(
call("IntegerToRaw", $("f0")).cast(STRING()), "123456", STRING()),
TestSetSpec.forFunction(BuiltInFunctionDefinitions.CAST, "cast RAW to STRING")
.onFieldsWithData("2020-11-11T18:08:01.123")
.andDataTypes(STRING())
Expand Down Expand Up @@ -354,7 +375,12 @@ public UserPojo(Integer i, String s) {
/** Test Raw with built-in Java class. */
public static class IntegerToRaw extends ScalarFunction {

public @DataTypeHint(value = "RAW", bridgedTo = byte[].class) byte[] eval(Integer i) {
public @DataTypeHint(
value = "RAW",
bridgedTo = byte[].class,
rawSerializer = IntSerializer.class) byte[] eval(Integer i) {
// the behaviour of this function is similar to IntSerializer.serialize()
// so we should use IntSerializer instant of the default Kyro serializer.
ByteBuffer b = ByteBuffer.allocate(4);
b.putInt(i);
return b.array();
Expand Down Expand Up @@ -384,4 +410,15 @@ public static class LocalDateTimeToRaw extends ScalarFunction {
return LocalDateTime.parse(str);
}
}

public static byte[] serializeLocalDateTime(LocalDateTime localDateTime) {
DataOutputSerializer dos = new DataOutputSerializer(16);
LocalDateTimeSerializer serializer = new LocalDateTimeSerializer();
try {
serializer.serialize(localDateTime, dos);
} catch (IOException e) {
throw new RuntimeException(e);
}
return dos.getCopyOfBuffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.planner.functions.CastFunctionMiscITCase.LocalDateTimeToRaw;

import java.time.LocalDateTime;
import java.util.stream.Stream;

import static org.apache.flink.table.api.DataTypes.BINARY;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
import static org.apache.flink.table.planner.functions.CastFunctionMiscITCase.serializeLocalDateTime;

/**
* Tests for {@link BuiltInFunctionDefinitions#CAST} when legacy cast mode enabled regarding {@link
Expand Down Expand Up @@ -56,6 +62,17 @@ Stream<TestSetSpec> getTestSetSpecs() {
"Column 'EXPR$0' is NOT NULL, however, a null value is "
+ "being written into it. You can set job configuration "
+ "'table.exec.sink.not-null-enforcer'='DROP' to suppress "
+ "this exception and drop such records silently."));
+ "this exception and drop such records silently."),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.CAST,
"cast from RAW(LocalDateTime) to BINARY(13)")
.onFieldsWithData("2020-11-11T18:08:01.123")
.andDataTypes(STRING())
.withFunction(LocalDateTimeToRaw.class)
.testTableApiResult(
call("LocalDateTimeToRaw", $("f0")).cast(BINARY(13)),
serializeLocalDateTime(
LocalDateTime.parse("2020-11-11T18:08:01.123")),
BINARY(13)));
}
}

0 comments on commit 049fbd2

Please sign in to comment.