diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 5aad5b1e5036e4..d2a2e8d812e633 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -460,13 +460,11 @@ trait ImplicitExpressionOperations { } /** - * Create a new string of the given string with non-overlapping occurrences + * Creates a new string of the given string with non-overlapping occurrences * of given search replaced with replacement. */ - def replace(search: Expression, - replacement: Expression) = { + def replace(search: Expression, replacement: Expression) = Replace(expr, search, replacement) - } /** * Returns the length of a string. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index 057f8a7bfa2401..7eb91d3806d269 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -118,13 +118,6 @@ object BuiltInMethods { Types.lookupMethod( classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]]) - val REPLACE = Types.lookupMethod( - classOf[ScalarFunctions], - "replace", - classOf[String], - classOf[String], - classOf[String]) - val LPAD = Types.lookupMethod( classOf[ScalarFunctions], "lpad", diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 586f58383d21e0..f79ed0bd25ab74 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -147,10 +147,10 @@ object FunctionGenerator { BuiltInMethod.OVERLAY.method) addSqlFunctionMethod( - ScalarSqlFunctions.REPLACE, + REPLACE, Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO), STRING_TYPE_INFO, - BuiltInMethods.REPLACE) + BuiltInMethod.REPLACE.method) addSqlFunctionMethod( FROM_BASE64, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala index af17b9ba73f98a..e47f1052492f67 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala @@ -480,6 +480,6 @@ case class Replace(str: Expression, override def toString: String = s"($str).replace($search, $replacement)" override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder.call(ScalarSqlFunctions.REPLACE, children.map(_.toRexNode)) + relBuilder.call(SqlStdOperatorTable.REPLACE, children.map(_.toRexNode)) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala index 277a6d1953d2f3..92fd175c1e689e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala @@ -67,13 +67,6 @@ object ScalarSqlFunctions { OperandTypes.ONE_OR_MORE, SqlFunctionCategory.STRING) - val REPLACE = new SqlFunction( - "REPLACE", - SqlKind.OTHER_FUNCTION, - ReturnTypes.ARG0_NULLABLE_VARYING, - null, - OperandTypes.STRING_STRING_STRING, - SqlFunctionCategory.STRING) val LOG = new SqlFunction( "LOG", diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index a222bc7596ce48..b067cee84a5238 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -442,7 +442,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.RAND_INTEGER, ScalarSqlFunctions.CONCAT, ScalarSqlFunctions.CONCAT_WS, - ScalarSqlFunctions.REPLACE, + SqlStdOperatorTable.REPLACE, ScalarSqlFunctions.BIN, ScalarSqlFunctions.HEX, SqlStdOperatorTable.TIMESTAMP_ADD, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 8021410d7ef4f5..e7316351aafe97 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -121,15 +121,15 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "null") testAllApis( - 'f0.replace('f33, ""), - "f0.replace(f33, '')", - "REPLACE(f0, f33, '')", + 'f0.replace(Null(Types.STRING), ""), + "f0.replace(Null(STRING), '')", + "REPLACE(f0, NULLIF('', ''), '')", "null") testAllApis( - 'f0.replace("", 'f33), - "f0.replace('', f33)", - "REPLACE(f0, '', f33)", + 'f0.replace(" ", Null(Types.STRING)), + "f0.replace(' ', Null(STRING))", + "REPLACE(f0, ' ', NULLIF('', ''))", "null") }