Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import org.apache.flink.table.types.logical.TimeType;

import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
Expand All @@ -64,6 +62,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.planner.typeutils.SymbolUtil.commonToCalcite;
import static org.apache.flink.table.planner.utils.TimestampStringUtils.fromLocalDateTime;
import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;

Expand Down Expand Up @@ -175,9 +174,9 @@ public RexNode visit(ValueLiteralExpression valueLiteral) {
default:
value = extractValue(valueLiteral, Object.class);
if (value instanceof TimePointUnit) {
value = timePointUnitToTimeUnit((TimePointUnit) value);
value = commonToCalcite((TimePointUnit) value);
} else if (value instanceof TimeIntervalUnit) {
value = intervalUnitToUnitRange((TimeIntervalUnit) value);
value = commonToCalcite((TimeIntervalUnit) value);
}
break;
}
Expand Down Expand Up @@ -264,70 +263,6 @@ public DataTypeFactory getDataTypeFactory() {
};
}

private static TimeUnit timePointUnitToTimeUnit(TimePointUnit unit) {
switch (unit) {
case YEAR:
return TimeUnit.YEAR;
case MONTH:
return TimeUnit.MONTH;
case DAY:
return TimeUnit.DAY;
case HOUR:
return TimeUnit.HOUR;
case MINUTE:
return TimeUnit.MINUTE;
case SECOND:
return TimeUnit.SECOND;
case QUARTER:
return TimeUnit.QUARTER;
case WEEK:
return TimeUnit.WEEK;
case MILLISECOND:
return TimeUnit.MILLISECOND;
case MICROSECOND:
return TimeUnit.MICROSECOND;
default:
throw new UnsupportedOperationException("TimePointUnit is: " + unit);
}
}

private static TimeUnitRange intervalUnitToUnitRange(TimeIntervalUnit intervalUnit) {
switch (intervalUnit) {
case YEAR:
return TimeUnitRange.YEAR;
case YEAR_TO_MONTH:
return TimeUnitRange.YEAR_TO_MONTH;
case QUARTER:
return TimeUnitRange.QUARTER;
case MONTH:
return TimeUnitRange.MONTH;
case WEEK:
return TimeUnitRange.WEEK;
case DAY:
return TimeUnitRange.DAY;
case DAY_TO_HOUR:
return TimeUnitRange.DAY_TO_HOUR;
case DAY_TO_MINUTE:
return TimeUnitRange.DAY_TO_MINUTE;
case DAY_TO_SECOND:
return TimeUnitRange.DAY_TO_SECOND;
case HOUR:
return TimeUnitRange.HOUR;
case SECOND:
return TimeUnitRange.SECOND;
case HOUR_TO_MINUTE:
return TimeUnitRange.HOUR_TO_MINUTE;
case HOUR_TO_SECOND:
return TimeUnitRange.HOUR_TO_SECOND;
case MINUTE:
return TimeUnitRange.MINUTE;
case MINUTE_TO_SECOND:
return TimeUnitRange.MINUTE_TO_SECOND;
default:
throw new UnsupportedOperationException("TimeIntervalUnit is: " + intervalUnit);
}
}

/**
* Extracts a value from a literal. Including planner-specific instances such as {@link
* DecimalData}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.planner.typeutils.SymbolUtil;

import org.apache.calcite.sql.SqlJsonConstructorNullClause;

Expand All @@ -33,20 +34,10 @@ public static SqlJsonConstructorNullClause getOnNullArgument(
CallExpression call, int argumentIdx) {
return ((ValueLiteralExpression) call.getChildren().get(argumentIdx))
.getValueAs(JsonOnNull.class)
.map(JsonConverterUtil::convertOnNull)
.map(SymbolUtil::commonToCalcite)
.map(SqlJsonConstructorNullClause.class::cast)
.orElseThrow(() -> new TableException("Missing argument for ON NULL."));
}

private static SqlJsonConstructorNullClause convertOnNull(JsonOnNull onNull) {
switch (onNull) {
case NULL:
return SqlJsonConstructorNullClause.NULL_ON_NULL;
case ABSENT:
return SqlJsonConstructorNullClause.ABSENT_ON_NULL;
default:
throw new TableException("Unknown ON NULL behavior: " + onNull);
}
}

private JsonConverterUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.JsonExistsOnError;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.planner.typeutils.SymbolUtil;

import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlJsonExistsErrorBehavior;

import java.util.LinkedList;
import java.util.List;
Expand All @@ -47,7 +46,7 @@ public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertCon
if (call.getChildren().size() >= 3) {
((ValueLiteralExpression) call.getChildren().get(2))
.getValueAs(JsonExistsOnError.class)
.map(this::convertErrorBehavior)
.map(SymbolUtil::commonToCalcite)
.ifPresent(
onErrorBehavior ->
operands.add(
Expand All @@ -58,19 +57,4 @@ public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertCon

return context.getRelBuilder().call(FlinkSqlOperatorTable.JSON_EXISTS, operands);
}

private SqlJsonExistsErrorBehavior convertErrorBehavior(JsonExistsOnError onError) {
switch (onError) {
case TRUE:
return SqlJsonExistsErrorBehavior.TRUE;
case FALSE:
return SqlJsonExistsErrorBehavior.FALSE;
case UNKNOWN:
return SqlJsonExistsErrorBehavior.UNKNOWN;
case ERROR:
return SqlJsonExistsErrorBehavior.ERROR;
default:
throw new TableException("Unknown ON ERROR behavior: " + onError);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.planner.typeutils.SymbolUtil;

import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlJsonQueryEmptyOrErrorBehavior;
import org.apache.calcite.sql.SqlJsonQueryWrapperBehavior;

import java.util.LinkedList;
import java.util.List;
Expand All @@ -46,26 +45,26 @@ public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertCon
operands.add(context.toRexNode(call.getChildren().get(0)));
operands.add(context.toRexNode(call.getChildren().get(1)));

final SqlJsonQueryWrapperBehavior wrappingBehavior =
final Enum<?> wrappingBehavior =
((ValueLiteralExpression) call.getChildren().get(2))
.getValueAs(JsonQueryWrapper.class)
.map(this::convertWrappingBehavior)
.map(SymbolUtil::commonToCalcite)
.orElseThrow(
() ->
new TableException(
"Missing argument for wrapping behavior."));
final SqlJsonQueryEmptyOrErrorBehavior onEmpty =
final Enum<?> onEmpty =
((ValueLiteralExpression) call.getChildren().get(3))
.getValueAs(JsonQueryOnEmptyOrError.class)
.map(this::convertEmptyOrErrorBehavior)
.map(SymbolUtil::commonToCalcite)
.orElseThrow(
() ->
new TableException(
"Missing argument for ON EMPTY behavior."));
final SqlJsonQueryEmptyOrErrorBehavior onError =
final Enum<?> onError =
((ValueLiteralExpression) call.getChildren().get(4))
.getValueAs(JsonQueryOnEmptyOrError.class)
.map(this::convertEmptyOrErrorBehavior)
.map(SymbolUtil::commonToCalcite)
.orElseThrow(
() ->
new TableException(
Expand All @@ -79,33 +78,4 @@ public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertCon
.getRexBuilder()
.makeCall(FlinkSqlOperatorTable.JSON_QUERY, operands);
}

private SqlJsonQueryWrapperBehavior convertWrappingBehavior(JsonQueryWrapper wrappingBehavior) {
switch (wrappingBehavior) {
case WITHOUT_ARRAY:
return SqlJsonQueryWrapperBehavior.WITHOUT_ARRAY;
case CONDITIONAL_ARRAY:
return SqlJsonQueryWrapperBehavior.WITH_CONDITIONAL_ARRAY;
case UNCONDITIONAL_ARRAY:
return SqlJsonQueryWrapperBehavior.WITH_UNCONDITIONAL_ARRAY;
default:
throw new TableException("Unknown wrapping behavior: " + wrappingBehavior);
}
}

private SqlJsonQueryEmptyOrErrorBehavior convertEmptyOrErrorBehavior(
JsonQueryOnEmptyOrError onEmptyOrError) {
switch (onEmptyOrError) {
case NULL:
return SqlJsonQueryEmptyOrErrorBehavior.NULL;
case EMPTY_ARRAY:
return SqlJsonQueryEmptyOrErrorBehavior.EMPTY_ARRAY;
case EMPTY_OBJECT:
return SqlJsonQueryEmptyOrErrorBehavior.EMPTY_OBJECT;
case ERROR:
return SqlJsonQueryEmptyOrErrorBehavior.ERROR;
default:
throw new TableException("Unknown ON EMPTY/ERROR behavior: " + onEmptyOrError);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.module.CoreModule;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction;
import org.apache.flink.table.planner.typeutils.SymbolUtil.SerializableSymbol;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -90,6 +91,7 @@
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.SQL_KIND_LITERAL;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.SQL_KIND_PATTERN_INPUT_REF;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.SQL_KIND_REX_CALL;
import static org.apache.flink.table.planner.typeutils.SymbolUtil.serializableToCalcite;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** JSON deserializer for {@link RexNode}. refer to {@link RexNodeJsonSerializer} for serializer. */
Expand Down Expand Up @@ -223,10 +225,8 @@ private Object toLiteralValue(
.getValue();
case SYMBOL:
JsonNode classNode = literalNode.get(FIELD_NAME_CLASS);
return getEnum(
classNode.asText(),
valueNode.asText(),
SerdeContext.get(ctx).getClassLoader());
return serializableToCalcite(
SerializableSymbol.of(classNode.asText(), valueNode.asText()));
case ROW:
case MULTISET:
ArrayNode valuesNode = (ArrayNode) valueNode;
Expand All @@ -240,17 +240,6 @@ private Object toLiteralValue(
}
}

@SuppressWarnings("unchecked")
private static <T extends Enum<T>> T getEnum(
String clazz, String name, ClassLoader classLoader) {
try {
Class<T> c = (Class<T>) Class.forName(clazz, true, classLoader);
return Enum.valueOf(c, name);
} catch (ClassNotFoundException e) {
throw new TableException("Unknown class: " + clazz);
}
}

@SuppressWarnings({"rawtypes", "unchecked", "UnstableApiUsage"})
private Sarg<?> toSarg(
JsonNode jsonNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction;
import org.apache.flink.table.planner.typeutils.SymbolUtil.SerializableSymbol;
import org.apache.flink.table.utils.EncodingUtils;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
Expand All @@ -47,6 +48,8 @@
import java.io.IOException;
import java.math.BigDecimal;

import static org.apache.flink.table.planner.typeutils.SymbolUtil.calciteToSerializable;

/**
* JSON serializer for {@link RexNode}. refer to {@link RexNodeJsonDeserializer} for deserializer.
*/
Expand Down Expand Up @@ -237,8 +240,9 @@ private void serialize(
gen.writeNumberField(FIELD_NAME_VALUE, ((BigDecimal) value).longValue());
break;
case SYMBOL:
gen.writeStringField(FIELD_NAME_VALUE, ((Enum<?>) value).name());
gen.writeStringField(FIELD_NAME_CLASS, value.getClass().getName());
final SerializableSymbol symbol = calciteToSerializable((Enum<?>) value);
gen.writeStringField(FIELD_NAME_VALUE, symbol.getValue());
gen.writeStringField(FIELD_NAME_CLASS, symbol.getKind());
break;
case SARG:
serialize((Sarg<?>) value, elementTypeName, gen, serializerProvider);
Expand Down
Loading