Skip to content

Commit

Permalink
[fixup] PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed Nov 5, 2021
1 parent 5b8ef0d commit 7dfe3c6
Show file tree
Hide file tree
Showing 16 changed files with 48 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.table.planner.functions.casting.rules.IdentityCastRule;
import org.apache.flink.table.planner.functions.casting.rules.IntervalToStringCastRule;
import org.apache.flink.table.planner.functions.casting.rules.MapToStringCastRule;
import org.apache.flink.table.planner.functions.casting.rules.NumberToStringCastRule;
import org.apache.flink.table.planner.functions.casting.rules.NumericToStringCastRule;
import org.apache.flink.table.planner.functions.casting.rules.RawToStringCastRule;
import org.apache.flink.table.planner.functions.casting.rules.RowToStringCastRule;
import org.apache.flink.table.planner.functions.casting.rules.TimeToStringCastRule;
Expand Down Expand Up @@ -58,7 +58,7 @@ public class CastRuleProvider {
// Numeric rules
.addRule(UpcastToBigIntCastRule.INSTANCE)
// To string rules
.addRule(NumberToStringCastRule.INSTANCE)
.addRule(NumericToStringCastRule.INSTANCE)
.addRule(BooleanToStringCastRule.INSTANCE)
.addRule(BinaryToStringCastRule.INSTANCE)
.addRule(TimestampToStringCastRule.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.types.logical.LogicalTypeFamily;

import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;

/**
* Base class for cast rules converting to {@link LogicalTypeFamily#CHARACTER_STRING} with code
Expand Down Expand Up @@ -55,6 +54,6 @@ String generateExpression(
final String stringExpr =
generateStringExpression(context, inputTerm, inputLogicalType, targetLogicalType);

return functionCall(BINARY_STRING_DATA_FROM_STRING(), stringExpr);
return CastRuleUtils.staticCall(BINARY_STRING_DATA_FROM_STRING(), stringExpr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ public CastCodeBlock generateCodeBlock(
if (isResultNullable) {
writer.ifStmt(
"!" + nullTerm,
thenWriter -> thenWriter.append(castCodeBlock),
thenWriter -> thenWriter.appendBlock(castCodeBlock),
elseWriter ->
elseWriter.assignStmt(returnTerm, primitiveDefaultValue(targetType)));
} else {
writer.append(castCodeBlock).append("\n");
writer.appendBlock(castCodeBlock);
}

return new CastCodeBlock(writer.toString(), returnTerm, nullTerm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.newArray;

Expand Down Expand Up @@ -97,7 +96,7 @@ protected String generateCodeBlockInternal(

if (innerTargetType.isNullable()) {
loopWriter.ifStmt(
"!" + functionCall(inputTerm + ".isNullAt", index),
"!" + methodCall(inputTerm, "isNullAt", index),
thenWriter ->
thenWriter
.append(codeBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.NULL_STR_LITERAL;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;

Expand Down Expand Up @@ -162,7 +161,7 @@ protected String generateCodeBlockInternal(
// Assign the result value
.assignStmt(
returnVariable,
functionCall(
CastRuleUtils.staticCall(
BINARY_STRING_DATA_FROM_STRING(),
methodCall(builderTerm, "toString")))
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.EMPTY_STR_LITERAL;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.stringConcat;

/** {@link LogicalTypeRoot#BOOLEAN} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
Expand All @@ -46,6 +46,6 @@ public String generateStringExpression(
String inputTerm,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
return stringConcat(strLiteral(), inputTerm);
return stringConcat(EMPTY_STR_LITERAL, inputTerm);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
import org.apache.flink.table.planner.functions.casting.CastRule;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.table.utils.EncodingUtils;

import java.lang.reflect.Method;
import java.util.Arrays;
Expand All @@ -40,15 +39,9 @@
final class CastRuleUtils {

static final String NULL_STR_LITERAL = strLiteral("NULL");
static final String EMPTY_STR_LITERAL = "\"\"";

static String functionCall(String functionName, Object... args) {
return functionName
+ "("
+ Arrays.stream(args).map(Object::toString).collect(Collectors.joining(", "))
+ ")";
}

static String functionCall(Method staticMethod, Object... args) {
static String staticCall(Method staticMethod, Object... args) {
return functionCall(CodeGenUtils.qualifyMethod(staticMethod), args);
}

Expand All @@ -60,6 +53,13 @@ static String methodCall(String instanceTerm, String methodName, Object... args)
return functionCall(instanceTerm + "." + methodName, args);
}

private static String functionCall(String functionName, Object... args) {
return functionName
+ "("
+ Arrays.stream(args).map(Object::toString).collect(Collectors.joining(", "))
+ ")";
}

static String newArray(String innerType, String arraySize) {
return "new " + innerType + "[" + arraySize + "]";
}
Expand All @@ -76,12 +76,8 @@ static String ternaryOperator(String condition, String ifTrue, String ifFalse) {
return "((" + condition + ") ? (" + ifTrue + ") : (" + ifFalse + "))";
}

static String strLiteral() {
return "\"\"";
}

static String strLiteral(String str) {
return "\"" + StringEscapeUtils.escapeJava(str) + "\"";
return "\"" + EncodingUtils.escapeJava(str) + "\"";
}

static final class CodeWriter {
Expand Down Expand Up @@ -178,7 +174,7 @@ public CodeWriter append(CastCodeBlock codeBlock) {
return this;
}

public CodeWriter append(String codeBlock) {
public CodeWriter appendBlock(String codeBlock) {
builder.append(codeBlock);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.UNIX_DATE_TO_STRING;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;

/** {@link LogicalTypeRoot#DATE} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
public class DateToStringCastRule extends AbstractCharacterFamilyTargetRule<Long> {
Expand All @@ -46,6 +46,6 @@ public String generateStringExpression(
String inputTerm,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
return functionCall(UNIX_DATE_TO_STRING(), inputTerm);
return staticCall(UNIX_DATE_TO_STRING(), inputTerm);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;

/**
* Identity cast rule. For more details on when the rule is applied, check {@link
Expand All @@ -48,7 +49,7 @@ private static boolean isIdentityCast(
}

// Identity cast applies if the two types are equals, except nullability
return inputLogicalType.copy(true).equals(targetLogicalType.copy(true));
return LogicalTypeCasts.supportsAvoidingCast(inputLogicalType, targetLogicalType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.INTERVAL_DAY_TIME_TO_STRING;
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.INTERVAL_YEAR_MONTH_TO_STRING;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;

/** {@link LogicalTypeFamily#INTERVAL} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
public class IntervalToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
Expand All @@ -53,6 +53,6 @@ public String generateStringExpression(
inputLogicalType.is(LogicalTypeRoot.INTERVAL_YEAR_MONTH)
? INTERVAL_YEAR_MONTH_TO_STRING()
: INTERVAL_DAY_TIME_TO_STRING();
return functionCall(method, inputTerm);
return staticCall(method, inputTerm);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.NULL_STR_LITERAL;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;

Expand Down Expand Up @@ -220,7 +219,7 @@ protected String generateCodeBlockInternal(
// Assign the result value
.assignStmt(
returnVariable,
functionCall(
CastRuleUtils.staticCall(
BINARY_STRING_DATA_FROM_STRING(),
methodCall(builderTerm, "toString")))
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;

import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.EMPTY_STR_LITERAL;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.stringConcat;

/** {@link LogicalTypeFamily#NUMERIC} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
public class NumberToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {
public class NumericToStringCastRule extends AbstractCharacterFamilyTargetRule<Object> {

public static final NumberToStringCastRule INSTANCE = new NumberToStringCastRule();
public static final NumericToStringCastRule INSTANCE = new NumericToStringCastRule();

private NumberToStringCastRule() {
private NumericToStringCastRule() {
super(
CastRulePredicate.builder()
.input(LogicalTypeFamily.NUMERIC)
Expand All @@ -45,6 +45,6 @@ public String generateStringExpression(
String inputTerm,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
return stringConcat(strLiteral(), inputTerm);
return stringConcat(EMPTY_STR_LITERAL, inputTerm);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import static org.apache.flink.table.codesplit.CodeSplitUtil.newName;
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;

/** {@link LogicalTypeRoot#RAW} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
Expand Down Expand Up @@ -62,7 +61,7 @@ protected String generateCodeBlockInternal(
thenWriter ->
thenWriter.assignStmt(
returnVariable,
functionCall(
CastRuleUtils.staticCall(
BINARY_STRING_DATA_FROM_STRING(),
methodCall(deserializedObjTerm, "toString"))),
elseWriter -> elseWriter.assignStmt(returnVariable, "null"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.NULL_STR_LITERAL;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;

Expand All @@ -47,22 +46,16 @@ public class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<
public static final RowToStringCastRule INSTANCE = new RowToStringCastRule();

private RowToStringCastRule() {
super(
CastRulePredicate.builder()
.predicate(
(input, target) ->
input.is(LogicalTypeRoot.ROW)
&& target.is(LogicalTypeFamily.CHARACTER_STRING)
&& ((RowType) input)
.getFields().stream()
.allMatch(
field ->
CastRuleProvider
.exists(
field
.getType(),
target)))
.build());
super(CastRulePredicate.builder().predicate(RowToStringCastRule::matches).build());
}

private static boolean matches(LogicalType input, LogicalType target) {
return input.is(LogicalTypeRoot.ROW)
&& target.is(LogicalTypeFamily.CHARACTER_STRING)
&& ((RowType) input)
.getFields().stream()
.allMatch(
field -> CastRuleProvider.exists(field.getType(), target));
}

/* Example generated code for ROW<`f0` INT, `f1` STRING>:
Expand Down Expand Up @@ -172,7 +165,7 @@ protected String generateCodeBlockInternal(
// Assign the result value
.assignStmt(
returnVariable,
functionCall(
CastRuleUtils.staticCall(
BINARY_STRING_DATA_FROM_STRING(),
methodCall(builderTerm, "toString")));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;

import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.UNIX_TIME_TO_STRING;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;

/**
* {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} to {@link LogicalTypeFamily#CHARACTER_STRING} cast
Expand All @@ -50,7 +49,7 @@ public String generateStringExpression(
String inputTerm,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
return functionCall(
return CastRuleUtils.staticCall(
UNIX_TIME_TO_STRING(), inputTerm, LogicalTypeChecks.getPrecision(inputLogicalType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.TIMESTAMP_TO_STRING_TIME_ZONE;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.accessStaticField;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
import static org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.staticCall;

/** {@link LogicalTypeFamily#TIMESTAMP} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. */
@Internal
Expand Down Expand Up @@ -59,6 +59,6 @@ public String generateStringExpression(
: accessStaticField(DateTimeUtils.class, "UTC_ZONE");
final int precision = LogicalTypeChecks.getPrecision(inputLogicalType);

return functionCall(TIMESTAMP_TO_STRING_TIME_ZONE(), inputTerm, zoneId, precision);
return staticCall(TIMESTAMP_TO_STRING_TIME_ZONE(), inputTerm, zoneId, precision);
}
}

0 comments on commit 7dfe3c6

Please sign in to comment.