Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class CastRuleProvider {
.addRule(DateToStringCastRule.INSTANCE)
.addRule(IntervalToStringCastRule.INSTANCE)
.addRule(ArrayToStringCastRule.INSTANCE)
.addRule(MapToStringCastRule.INSTANCE)
.addRule(MapAndMultisetToStringCastRule.INSTANCE)
.addRule(RowToStringCastRule.INSTANCE)
.addRule(RawToStringCastRule.INSTANCE)
// Collection rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;

import java.util.function.Consumer;

import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
Expand All @@ -33,25 +37,33 @@
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.strLiteral;

/** {@link LogicalTypeRoot#MAP} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
class MapToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
/**
* {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET} to {@link
* LogicalTypeFamily#CHARACTER_STRING} cast rule.
*/
class MapAndMultisetToStringCastRule
extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {

static final MapToStringCastRule INSTANCE = new MapToStringCastRule();
static final MapAndMultisetToStringCastRule INSTANCE = new MapAndMultisetToStringCastRule();

private MapToStringCastRule() {
private MapAndMultisetToStringCastRule() {
super(
CastRulePredicate.builder()
.predicate(
(input, target) ->
input.is(LogicalTypeRoot.MAP)
&& target.is(LogicalTypeFamily.CHARACTER_STRING)
&& CastRuleProvider.exists(
((MapType) input).getKeyType(), target)
&& CastRuleProvider.exists(
((MapType) input).getValueType(), target))
.predicate(MapAndMultisetToStringCastRule::isMapOrMultiset)
.build());
}

private static boolean isMapOrMultiset(LogicalType input, LogicalType target) {
return target.is(LogicalTypeFamily.CHARACTER_STRING)
&& ((input.is(LogicalTypeRoot.MAP)
&& CastRuleProvider.exists(((MapType) input).getKeyType(), target)
&& CastRuleProvider.exists(
((MapType) input).getValueType(), target))
|| (input.is(LogicalTypeRoot.MULTISET)
&& CastRuleProvider.exists(
((MultisetType) input).getElementType(), target)));
}

/* Example generated code for MAP<STRING, INTERVAL MONTH>:

isNull$0 = _myInputIsNull;
Expand Down Expand Up @@ -97,8 +109,14 @@ protected String generateCodeBlockInternal(
String returnVariable,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
final LogicalType keyType = ((MapType) inputLogicalType).getKeyType();
final LogicalType valueType = ((MapType) inputLogicalType).getValueType();
final LogicalType keyType =
inputLogicalType.is(LogicalTypeRoot.MULTISET)
? ((MultisetType) inputLogicalType).getElementType()
: ((MapType) inputLogicalType).getKeyType();
final LogicalType valueType =
inputLogicalType.is(LogicalTypeRoot.MULTISET)
? INT().getLogicalType()
: ((MapType) inputLogicalType).getValueType();

final String builderTerm = newName("builder");
context.declareClassField(
Expand Down Expand Up @@ -137,6 +155,23 @@ protected String generateCodeBlockInternal(
valueType.copy(false),
targetLogicalType);

Consumer<CastRuleUtils.CodeWriter> appendNonNullValue =
bodyWriter ->
bodyWriter
// If value not null, extract it and
// execute the cast
.assignStmt(
valueTerm,
rowFieldReadAccess(
indexTerm,
valueArrayTerm,
valueType))
.append(valueCast)
.stmt(
methodCall(
builderTerm,
"append",
valueCast.getReturnTerm()));
loopBodyWriter
// Write the comma
.ifStmt(
Expand Down Expand Up @@ -184,32 +219,20 @@ protected String generateCodeBlockInternal(
builderTerm,
"append",
NULL_STR_LITERAL)))
.stmt(methodCall(builderTerm, "append", strLiteral("=")))
.ifStmt(
"!" + valueIsNullTerm,
thenBodyWriter ->
thenBodyWriter
// If value not null, extract it and
// execute the cast
.assignStmt(
valueTerm,
rowFieldReadAccess(
indexTerm,
valueArrayTerm,
valueType))
.append(valueCast)
.stmt(
methodCall(
builderTerm,
"append",
valueCast
.getReturnTerm())),
elseBodyWriter ->
elseBodyWriter.stmt(
methodCall(
builderTerm,
"append",
NULL_STR_LITERAL)));
.stmt(methodCall(builderTerm, "append", strLiteral("=")));
if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
appendNonNullValue.accept(loopBodyWriter);
} else {
loopBodyWriter.ifStmt(
"!" + valueIsNullTerm,
appendNonNullValue,
elseBodyWriter ->
elseBodyWriter.stmt(
methodCall(
builderTerm,
"append",
NULL_STR_LITERAL)));
}
})
.stmt(methodCall(builderTerm, "append", strLiteral("}")))
// Assign the result value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.apache.flink.table.api.DataTypes.INTERVAL;
import static org.apache.flink.table.api.DataTypes.MAP;
import static org.apache.flink.table.api.DataTypes.MONTH;
import static org.apache.flink.table.api.DataTypes.MULTISET;
import static org.apache.flink.table.api.DataTypes.RAW;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.SECOND;
Expand Down Expand Up @@ -445,6 +446,12 @@ Stream<CastTestSpecBuilder> testCases() {
entry(StringData.fromString("a"), -123),
entry(StringData.fromString("b"), 123)),
StringData.fromString("{a=-10-03, b=+10-03}"))
.fromCase(
MULTISET(STRING()),
mapData(
entry(StringData.fromString("a"), 1),
entry(StringData.fromString("b"), 1)),
StringData.fromString("{a=1, b=1}"))
.fromCase(
MAP(STRING().nullable(), INTERVAL(MONTH()).nullable()),
mapData(entry(null, -123), entry(StringData.fromString("b"), null)),
Expand Down