Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/docs/dev/table/types.md
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ The matrix below describes the supported cast pairs, where "Y" means supported,
| Input\Target | `CHAR`¹/<br/>`VARCHAR`¹/<br/>`STRING` | `BINARY`¹/<br/>`VARBINARY`¹/<br/>`BYTES` | `BOOLEAN` | `DECIMAL` | `TINYINT` | `SMALLINT` | `INTEGER` | `BIGINT` | `FLOAT` | `DOUBLE` | `DATE` | `TIME` | `TIMESTAMP` | `TIMESTAMP_LTZ` | `INTERVAL` | `ARRAY` | `MULTISET` | `MAP` | `ROW` | `STRUCTURED` | `RAW` |
|:---------------------------------------|:-------------------------------------:|:----------------------------------------:|:---------:|:---------:|:---------:|:----------:|:---------:|:--------:|:-------:|:--------:|:------:|:------:|:-----------:|:---------------:|:----------:|:-------:|:----------:|:-----:|:-----:|:------------:|:-----:|
| `CHAR`/<br/>`VARCHAR`/<br/>`STRING` | Y | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | N | N | N | N | N | N | N |
| `BINARY`/<br/>`VARBINARY`/<br/>`BYTES` | Y | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N |
| `BINARY`/<br/>`VARBINARY`/<br/>`BYTES` | Y | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y |
| `BOOLEAN` | Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N | N | N | N | N | N | N | N | N |
| `DECIMAL` | Y | N | N | Y | Y | Y | Y | Y | Y | Y | N | N | N | N | N | N | N | N | N | N | N |
| `TINYINT` | Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N² | N² | N | N | N | N | N | N | N |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public final class LogicalTypeCasts {
.implicitFrom(INTERVAL_DAY_TIME)
.explicitFromFamily(EXACT_NUMERIC, CHARACTER_STRING)
.build();

castTo(RAW).explicitFromFamily(BINARY_STRING).build();
}

/**
Expand Down Expand Up @@ -334,10 +336,10 @@ private static boolean supportsCasting(
return supportsStructuredCasting(
sourceType, targetType, (s, t) -> supportsCasting(s, t, allowExplicit));

} else if (sourceRoot == RAW
} else if ((sourceRoot == RAW
&& !targetType.is(BINARY_STRING)
&& !targetType.is(CHARACTER_STRING)
|| targetRoot == RAW) {
&& !targetType.is(CHARACTER_STRING))
|| targetRoot == RAW && !sourceType.is(BINARY_STRING)) {
// the two raw types are not equal (from initial invariant), casting is not possible
return false;
} else if (sourceRoot == SYMBOL || targetRoot == SYMBOL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ public static Stream<Arguments> testData() {
new RawType<>(Integer.class, IntSerializer.INSTANCE),
VarCharType.STRING_TYPE,
false,
true),
// binary to raw
Arguments.of(
new BinaryType(),
new RawType<>(Integer.class, IntSerializer.INSTANCE),
false,
true));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ public static boolean valueMatchesType(Comparable value, SqlTypeName typeName, b
// Literal of type ANY is not legal. "CAST(2 AS ANY)" remains
// an integer literal surrounded by a cast function.
return false;
case OTHER:
return value instanceof ByteString;
default:
throw Util.unexpected(typeName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.calcite.rex;

import org.apache.flink.table.planner.plan.schema.RawRelDataType;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -232,6 +234,9 @@ public RexNode simplifyPreservingType(
rexBuilder.typeFactory, e2.getType(), e.getType())) {
return e2;
}
if (e.getType().getSqlTypeName() == SqlTypeName.OTHER) {
return e;
}
final RexNode e3 = rexBuilder.makeCast(e.getType(), e2, matchNullability);
if (e3.equals(e)) {
return e;
Expand Down Expand Up @@ -2024,7 +2029,9 @@ private RexNode simplifySearch(RexCall call, RexUnknownAs unknownAs) {
private RexNode simplifyCast(RexCall e) {
RexNode operand = e.getOperands().get(0);
operand = simplify(operand, UNKNOWN);
if (sameTypeOrNarrowsNullability(e.getType(), operand.getType())) {
if (sameTypeOrNarrowsNullability(e.getType(), operand.getType())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change in this line and in RexLiteral is a blocker for this PR in my opinion. We should fix these things in Calcite first and only port the changes to the Flink code base, once merged in Calcite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've spent quite some time here, and couldn't find a workaround.

I could modify the PR and only allow it for TableAPI for the moment, and try to fix this in Calcite first.
If it's accepted, have a follow up PR to enable this for SQL in flink.

|| e.getType() instanceof RawRelDataType) {
// || e.getType().getSqlTypeName() == SqlTypeName.OTHER) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need this instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, OTHER catches other types in our type system, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For us, currently, only RAW is associated with OTHER

return operand;
}
if (RexUtil.isLosslessCast(operand)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ private boolean canCastFrom(RelDataType toType, RelDataType fromType) {
case MULTISET:
case STRUCTURED:
case ROW:
case BINARY:
case VARBINARY:
case OTHER:
// We use our casting checker logic only for these types,
// as the differences with calcite casting checker logic generates issues
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.functions.casting;

import org.apache.flink.table.data.binary.BinaryRawValueData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;

/** {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeRoot#RAW} to cast rule. */
class BinaryToRawCastRule extends AbstractNullAwareCodeGeneratorCastRule<byte[], Object> {

static final BinaryToRawCastRule INSTANCE = new BinaryToRawCastRule();

private BinaryToRawCastRule() {
super(
CastRulePredicate.builder()
.input(LogicalTypeFamily.BINARY_STRING)
.target(LogicalTypeRoot.RAW)
.build());
}

@Override
public boolean canFail(LogicalType inputLogicalType, LogicalType targetLogicalType) {
return true;
}

/* Example generated code for BINARY(3):

isNull$0 = _myInputIsNull;
if (!isNull$0) {
result$1 = org.apache.flink.table.data.binary.BinaryRawValueData.fromBytes(_myInput);
isNull$0 = result$1 == null;
} else {
result$1 = null;
}

*/

@Override
protected String generateCodeBlockInternal(
CodeGeneratorCastRule.Context context,
String inputTerm,
String returnVariable,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
// Get serializer for RAW type
return new CastRuleUtils.CodeWriter()
.assignStmt(
returnVariable,
staticCall(BinaryRawValueData.class, "fromBytes", inputTerm))
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class CastRuleProvider {
// To binary rules
.addRule(BinaryToBinaryCastRule.INSTANCE)
.addRule(RawToBinaryCastRule.INSTANCE)
.addRule(BinaryToRawCastRule.INSTANCE)
// Collection rules
.addRule(ArrayToArrayCastRule.INSTANCE)
.addRule(MapToMapAndMultisetToMultisetCastRule.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.functions;

import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.annotation.DataTypeHint;
Expand All @@ -39,6 +40,7 @@
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.MAP;
import static org.apache.flink.table.api.DataTypes.RAW;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIME;
Expand All @@ -54,6 +56,10 @@
/** Tests for {@link BuiltInFunctionDefinitions#CAST} regarding {@link DataTypes#ROW}. */
class CastFunctionMiscITCase extends BuiltInFunctionTestBase {

private static final String INT_TYPE_SERIALIZER_BASE_64 =
"AE5vcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2Uu"
+ "SW50U2VyaWFsaXplciRJbnRTZXJpYWxpemVyU25hcHNob3QAAAAD";

@Override
Configuration getConfiguration() {
return new Configuration()
Expand Down Expand Up @@ -240,6 +246,16 @@ Stream<TestSetSpec> getTestSetSpecs() {
call("LocalDateTimeToRaw", $("f0")).cast(STRING()),
"2020-11-11T18:08:01.123",
STRING()),
TestSetSpec.forFunction(BuiltInFunctionDefinitions.CAST, "cast BINARY to RAW")
.onFieldsWithData(new Object[] {new byte[] {0, 1, -30, 64}})
.andDataTypes(BYTES())
.testResult(
$("f0").cast(RAW(Integer.class, new IntSerializer())),
"CAST(f0 AS RAW('java.lang.Integer', '"
+ INT_TYPE_SERIALIZER_BASE_64
+ "'))",
123456,
RAW(Integer.class, new IntSerializer())),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.TRY_CAST, "try cast from STRING to TIME")
.onFieldsWithData("Flink", "12:34:56")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.functions.casting;

import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.table.api.DataTypes;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryRawValueData;
import org.apache.flink.table.data.binary.BinaryStringDataUtil;
import org.apache.flink.table.data.utils.CastExecutor;
import org.apache.flink.table.planner.functions.CastFunctionITCase;
Expand Down Expand Up @@ -159,6 +161,12 @@ class CastRulesTest {
new StructuredType.StructuredAttribute(
"d", ARRAY(STRING()).getLogicalType())))
.build());
private static final BinaryRawValueData<Integer> DEFAULT_RAW =
BinaryRawValueData.fromObject(123456);

static {
DEFAULT_RAW.ensureMaterialized(new IntSerializer());
}

Stream<CastTestSpecBuilder> testCases() {
return Stream.of(
Expand Down Expand Up @@ -1461,7 +1469,11 @@ Stream<CastTestSpecBuilder> testCases() {
TIMESTAMP_STRING,
TIMESTAMP_STRING,
TIMESTAMP_STRING
}))));
}))),
CastTestSpecBuilder.testCastTo(RAW(Integer.class, new IntSerializer()))
.fromCase(BINARY(4), new byte[] {0, 1, -30, 64}, DEFAULT_RAW)
.fromCase(VARBINARY(10), new byte[] {0, 1, -30, 64}, DEFAULT_RAW)
.fromCase(BYTES(), new byte[] {0, 1, -30, 64}, DEFAULT_RAW));
}

@TestFactory
Expand Down