diff --git a/docs/content/docs/dev/table/types.md b/docs/content/docs/dev/table/types.md
index c6b54ea70d909..9ba676e7c442e 100644
--- a/docs/content/docs/dev/table/types.md
+++ b/docs/content/docs/dev/table/types.md
@@ -1517,7 +1517,7 @@ The matrix below describes the supported cast pairs, where "Y" means supported,
| Input\Target | `CHAR`¹/
`VARCHAR`¹/
`STRING` | `BINARY`¹/
`VARBINARY`¹/
`BYTES` | `BOOLEAN` | `DECIMAL` | `TINYINT` | `SMALLINT` | `INTEGER` | `BIGINT` | `FLOAT` | `DOUBLE` | `DATE` | `TIME` | `TIMESTAMP` | `TIMESTAMP_LTZ` | `INTERVAL` | `ARRAY` | `MULTISET` | `MAP` | `ROW` | `STRUCTURED` | `RAW` |
|:---------------------------------------|:-------------------------------------:|:----------------------------------------:|:---------:|:---------:|:---------:|:----------:|:---------:|:--------:|:-------:|:--------:|:------:|:------:|:-----------:|:---------------:|:----------:|:-------:|:----------:|:-----:|:-----:|:------------:|:-----:|
| `CHAR`/
`VARCHAR`/
`STRING` | Y | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | N | N | N | N | N | N | N |
-| `BINARY`/
`VARBINARY`/
`BYTES` | Y | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N |
+| `BINARY`/
`VARBINARY`/
`BYTES` | Y | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y |
| `BOOLEAN` | Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N | N | N | N | N | N | N | N | N |
| `DECIMAL` | Y | N | N | Y | Y | Y | Y | Y | Y | Y | N | N | N | N | N | N | N | N | N | N | N |
| `TINYINT` | Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N² | N² | N | N | N | N | N | N | N |
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index 0503b16da58ca..d9043597c87a3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -224,6 +224,8 @@ public final class LogicalTypeCasts {
.implicitFrom(INTERVAL_DAY_TIME)
.explicitFromFamily(EXACT_NUMERIC, CHARACTER_STRING)
.build();
+
+ castTo(RAW).explicitFromFamily(BINARY_STRING).build();
}
/**
@@ -334,10 +336,10 @@ private static boolean supportsCasting(
return supportsStructuredCasting(
sourceType, targetType, (s, t) -> supportsCasting(s, t, allowExplicit));
- } else if (sourceRoot == RAW
+ } else if ((sourceRoot == RAW
&& !targetType.is(BINARY_STRING)
- && !targetType.is(CHARACTER_STRING)
- || targetRoot == RAW) {
+ && !targetType.is(CHARACTER_STRING))
+ || targetRoot == RAW && !sourceType.is(BINARY_STRING)) {
// the two raw types are not equal (from initial invariant), casting is not possible
return false;
} else if (sourceRoot == SYMBOL || targetRoot == SYMBOL) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
index aa18be73c2f0b..c6a7bb6275fce 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
@@ -256,6 +256,12 @@ public static Stream testData() {
new RawType<>(Integer.class, IntSerializer.INSTANCE),
VarCharType.STRING_TYPE,
false,
+ true),
+ // binary to raw
+ Arguments.of(
+ new BinaryType(),
+ new RawType<>(Integer.class, IntSerializer.INSTANCE),
+ false,
true));
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexLiteral.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexLiteral.java
index dfb68088c9f61..c397c9ae0f700 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexLiteral.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexLiteral.java
@@ -358,6 +358,8 @@ public static boolean valueMatchesType(Comparable value, SqlTypeName typeName, b
// Literal of type ANY is not legal. "CAST(2 AS ANY)" remains
// an integer literal surrounded by a cast function.
return false;
+ case OTHER:
+ return value instanceof ByteString;
default:
throw Util.unexpected(typeName);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexSimplify.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexSimplify.java
index ea59497adfdeb..1c59827576aad 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexSimplify.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexSimplify.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.rex;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
+
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
@@ -232,6 +234,9 @@ public RexNode simplifyPreservingType(
rexBuilder.typeFactory, e2.getType(), e.getType())) {
return e2;
}
+ if (e.getType().getSqlTypeName() == SqlTypeName.OTHER) {
+ return e;
+ }
final RexNode e3 = rexBuilder.makeCast(e.getType(), e2, matchNullability);
if (e3.equals(e)) {
return e;
@@ -2024,7 +2029,9 @@ private RexNode simplifySearch(RexCall call, RexUnknownAs unknownAs) {
private RexNode simplifyCast(RexCall e) {
RexNode operand = e.getOperands().get(0);
operand = simplify(operand, UNKNOWN);
- if (sameTypeOrNarrowsNullability(e.getType(), operand.getType())) {
+ if (sameTypeOrNarrowsNullability(e.getType(), operand.getType())
+ || e.getType() instanceof RawRelDataType) {
+ // || e.getType().getSqlTypeName() == SqlTypeName.OTHER) {
return operand;
}
if (RexUtil.isLosslessCast(operand)) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCastFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCastFunction.java
index 170bf64691560..48445b928f675 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCastFunction.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCastFunction.java
@@ -166,6 +166,8 @@ private boolean canCastFrom(RelDataType toType, RelDataType fromType) {
case MULTISET:
case STRUCTURED:
case ROW:
+ case BINARY:
+ case VARBINARY:
case OTHER:
// We use our casting checker logic only for these types,
// as the differences with calcite casting checker logic generates issues
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToRawCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToRawCastRule.java
new file mode 100644
index 0000000000000..cc7ed3493b972
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToRawCastRule.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.binary.BinaryRawValueData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeRoot#RAW} to cast rule. */
+class BinaryToRawCastRule extends AbstractNullAwareCodeGeneratorCastRule {
+
+ static final BinaryToRawCastRule INSTANCE = new BinaryToRawCastRule();
+
+ private BinaryToRawCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeFamily.BINARY_STRING)
+ .target(LogicalTypeRoot.RAW)
+ .build());
+ }
+
+ @Override
+ public boolean canFail(LogicalType inputLogicalType, LogicalType targetLogicalType) {
+ return true;
+ }
+
+ /* Example generated code for BINARY(3):
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
+ result$1 = org.apache.flink.table.data.binary.BinaryRawValueData.fromBytes(_myInput);
+ isNull$0 = result$1 == null;
+ } else {
+ result$1 = null;
+ }
+
+ */
+
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ // Get serializer for RAW type
+ return new CastRuleUtils.CodeWriter()
+ .assignStmt(
+ returnVariable,
+ staticCall(BinaryRawValueData.class, "fromBytes", inputTerm))
+ .toString();
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index c2681f8dab6cb..7bc8f4ab2880b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -88,6 +88,7 @@ public class CastRuleProvider {
// To binary rules
.addRule(BinaryToBinaryCastRule.INSTANCE)
.addRule(RawToBinaryCastRule.INSTANCE)
+ .addRule(BinaryToRawCastRule.INSTANCE)
// Collection rules
.addRule(ArrayToArrayCastRule.INSTANCE)
.addRule(MapToMapAndMultisetToMultisetCastRule.INSTANCE)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
index 094171da899ee..e618000130129 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.functions;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.annotation.DataTypeHint;
@@ -39,6 +40,7 @@
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.RAW;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIME;
@@ -54,6 +56,10 @@
/** Tests for {@link BuiltInFunctionDefinitions#CAST} regarding {@link DataTypes#ROW}. */
class CastFunctionMiscITCase extends BuiltInFunctionTestBase {
+ private static final String INT_TYPE_SERIALIZER_BASE_64 =
+ "AE5vcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2Uu"
+ + "SW50U2VyaWFsaXplciRJbnRTZXJpYWxpemVyU25hcHNob3QAAAAD";
+
@Override
Configuration getConfiguration() {
return new Configuration()
@@ -240,6 +246,16 @@ Stream getTestSetSpecs() {
call("LocalDateTimeToRaw", $("f0")).cast(STRING()),
"2020-11-11T18:08:01.123",
STRING()),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.CAST, "cast BINARY to RAW")
+ .onFieldsWithData(new Object[] {new byte[] {0, 1, -30, 64}})
+ .andDataTypes(BYTES())
+ .testResult(
+ $("f0").cast(RAW(Integer.class, new IntSerializer())),
+ "CAST(f0 AS RAW('java.lang.Integer', '"
+ + INT_TYPE_SERIALIZER_BASE_64
+ + "'))",
+ 123456,
+ RAW(Integer.class, new IntSerializer())),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.TRY_CAST, "try cast from STRING to TIME")
.onFieldsWithData("Flink", "12:34:56")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 564cc581d05d8..f906a4d122de8 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.functions.casting;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.table.api.DataTypes;
@@ -31,6 +32,7 @@
import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryRawValueData;
import org.apache.flink.table.data.binary.BinaryStringDataUtil;
import org.apache.flink.table.data.utils.CastExecutor;
import org.apache.flink.table.planner.functions.CastFunctionITCase;
@@ -159,6 +161,12 @@ class CastRulesTest {
new StructuredType.StructuredAttribute(
"d", ARRAY(STRING()).getLogicalType())))
.build());
+ private static final BinaryRawValueData DEFAULT_RAW =
+ BinaryRawValueData.fromObject(123456);
+
+ static {
+ DEFAULT_RAW.ensureMaterialized(new IntSerializer());
+ }
Stream testCases() {
return Stream.of(
@@ -1461,7 +1469,11 @@ Stream testCases() {
TIMESTAMP_STRING,
TIMESTAMP_STRING,
TIMESTAMP_STRING
- }))));
+ }))),
+ CastTestSpecBuilder.testCastTo(RAW(Integer.class, new IntSerializer()))
+ .fromCase(BINARY(4), new byte[] {0, 1, -30, 64}, DEFAULT_RAW)
+ .fromCase(VARBINARY(10), new byte[] {0, 1, -30, 64}, DEFAULT_RAW)
+ .fromCase(BYTES(), new byte[] {0, 1, -30, 64}, DEFAULT_RAW));
}
@TestFactory