Skip to content

Commit

Permalink
[FLINK-10145][table] Add replace supported in TableAPI and SQL
Browse files Browse the repository at this point in the history
Change-Id: I47e5ca5b75f374d824df4f12aff61382fef7a186
  • Loading branch information
Guibo-Pan committed Aug 21, 2018
1 parent 8486161 commit 095283a
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 28 deletions.
Expand Up @@ -460,13 +460,11 @@ trait ImplicitExpressionOperations {
}

/**
* Create a new string of the given string with non-overlapping occurrences
* Creates a new string of the given string with non-overlapping occurrences
* of given search replaced with replacement.
*/
def replace(search: Expression,
replacement: Expression) = {
def replace(search: Expression, replacement: Expression) =
Replace(expr, search, replacement)
}

/**
* Returns the length of a string.
Expand Down
Expand Up @@ -118,13 +118,6 @@ object BuiltInMethods {
Types.lookupMethod(
classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]])

val REPLACE = Types.lookupMethod(
classOf[ScalarFunctions],
"replace",
classOf[String],
classOf[String],
classOf[String])

val LPAD = Types.lookupMethod(
classOf[ScalarFunctions],
"lpad",
Expand Down
Expand Up @@ -147,10 +147,10 @@ object FunctionGenerator {
BuiltInMethod.OVERLAY.method)

addSqlFunctionMethod(
ScalarSqlFunctions.REPLACE,
REPLACE,
Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
STRING_TYPE_INFO,
BuiltInMethods.REPLACE)
BuiltInMethod.REPLACE.method)

addSqlFunctionMethod(
FROM_BASE64,
Expand Down
Expand Up @@ -480,6 +480,6 @@ case class Replace(str: Expression,
override def toString: String = s"($str).replace($search, $replacement)"

override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.call(ScalarSqlFunctions.REPLACE, children.map(_.toRexNode))
relBuilder.call(SqlStdOperatorTable.REPLACE, children.map(_.toRexNode))
}
}
Expand Up @@ -67,13 +67,6 @@ object ScalarSqlFunctions {
OperandTypes.ONE_OR_MORE,
SqlFunctionCategory.STRING)

val REPLACE = new SqlFunction(
"REPLACE",
SqlKind.OTHER_FUNCTION,
ReturnTypes.ARG0_NULLABLE_VARYING,
null,
OperandTypes.STRING_STRING_STRING,
SqlFunctionCategory.STRING)

val LOG = new SqlFunction(
"LOG",
Expand Down
Expand Up @@ -442,7 +442,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.RAND_INTEGER,
ScalarSqlFunctions.CONCAT,
ScalarSqlFunctions.CONCAT_WS,
ScalarSqlFunctions.REPLACE,
SqlStdOperatorTable.REPLACE,
ScalarSqlFunctions.BIN,
ScalarSqlFunctions.HEX,
SqlStdOperatorTable.TIMESTAMP_ADD,
Expand Down
Expand Up @@ -121,15 +121,15 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"null")

testAllApis(
'f0.replace('f33, ""),
"f0.replace(f33, '')",
"REPLACE(f0, f33, '')",
'f0.replace(Null(Types.STRING), ""),
"f0.replace(Null(STRING), '')",
"REPLACE(f0, NULLIF('', ''), '')",
"null")

testAllApis(
'f0.replace("", 'f33),
"f0.replace('', f33)",
"REPLACE(f0, '', f33)",
'f0.replace(" ", Null(Types.STRING)),
"f0.replace(' ', Null(STRING))",
"REPLACE(f0, ' ', NULLIF('', ''))",
"null")
}

Expand Down

0 comments on commit 095283a

Please sign in to comment.