Skip to content

Commit

Permalink
[FLINK-10145][table] Add replace supported in TableAPI and SQL
Browse files Browse the repository at this point in the history
Change-Id: I2b6d95d63b438e3d3e9d176ab5c18c4718fdeb1e
  • Loading branch information
Guibo-Pan committed Aug 19, 2018
1 parent 60c8757 commit 4b262a4
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 13 deletions.
6 changes: 3 additions & 3 deletions docs/dev/table/functions.md
Expand Up @@ -2408,7 +2408,7 @@ REPLACE(string, search, replacement)
</td>
<td>
<p>Returns a new string from <i>string</i> replaced <i>search</i>(non-overlapping) with <i>replacement</i>.</p>
<p>E.g., <code>REPLACE('hello world', 'world', 'flink')</code> returns "hello flink"; <code>REPLACE('ababab', 'abab', 'z')</code> returns "zab"</p>
<p>E.g., <code>REPLACE("hello world", "world", "flink")</code> returns "hello flink"; <code>REPLACE("ababab", "abab", "z")</code> returns "zab"</p>
</td>
</tr>

Expand Down Expand Up @@ -2612,7 +2612,7 @@ STRING.replace(STRING1, STRING2)
</td>
<td>
<p>Returns a new string from <i>STRING</i> replaced <i>STRING1</i>(non-overlapping) with <i>STRING2</i>.</p>
<p>E.g., <code>'hello world'.replace('world', 'flink')</code> returns 'hello flink'; <code>'ababab'.replace('abab', 'z')</code> returns "zab"</p>
<p>E.g., <code>"hello world".replace("world", "flink")</code> returns "hello flink"; <code>"ababab".replace("abab", "z")</code> returns "zab"</p>
</td>
</tr>

Expand Down Expand Up @@ -2816,7 +2816,7 @@ STRING.replace(STRING1, STRING2)
</td>
<td>
<p>Returns a new string from <i>STRING</i> replaced <i>STRING1</i>(non-overlapping) with <i>STRING2</i>.</p>
<p>E.g., <code>'hello world'.replace('world', 'flink')</code> returns 'hello flink'; <code>'ababab'.replace('abab', 'z')</code> returns "zab"</p>
<p>E.g., <code>"hello world".replace("world", "flink")</code> returns "hello flink"; <code>"ababab".replace("abab", "z")</code> returns "zab"</p>
</td>
</tr>

Expand Down
Expand Up @@ -462,10 +462,6 @@ trait ImplicitExpressionOperations {
/**
* Create a new string of the given string with non-overlapping occurrences
* of given search replaced with replacement.
*
* @param search
* @param replacement
* @return replaced string
*/
def replace(search: Expression,
replacement: Expression) = {
Expand Down
Expand Up @@ -118,6 +118,13 @@ object BuiltInMethods {
Types.lookupMethod(
classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]])

val REPLACE = Types.lookupMethod(
classOf[ScalarFunctions],
"replace",
classOf[String],
classOf[String],
classOf[String])

val LPAD = Types.lookupMethod(
classOf[ScalarFunctions],
"lpad",
Expand Down
Expand Up @@ -147,10 +147,10 @@ object FunctionGenerator {
BuiltInMethod.OVERLAY.method)

addSqlFunctionMethod(
REPLACE,
ScalarSqlFunctions.REPLACE,
Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
STRING_TYPE_INFO,
BuiltInMethod.REPLACE.method)
BuiltInMethods.REPLACE)

addSqlFunctionMethod(
FROM_BASE64,
Expand Down
Expand Up @@ -412,8 +412,8 @@ case class ToBase64(child: Expression) extends UnaryExpression with InputTypeSpe
}

/**
* Returns the string `str` with all non-overlapping occurrences
* of `search` replaced with `replacement`.
* Returns the string str with all non-overlapping occurrences
* of search replaced with replacement.
*/
case class Replace(str: Expression,
search: Expression,
Expand All @@ -431,6 +431,6 @@ case class Replace(str: Expression,
override def toString: String = s"($str).replace($search, $replacement)"

override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.call(SqlStdOperatorTable.REPLACE, children.map(_.toRexNode))
relBuilder.call(ScalarSqlFunctions.REPLACE, children.map(_.toRexNode))
}
}
Expand Up @@ -67,6 +67,14 @@ object ScalarSqlFunctions {
OperandTypes.ONE_OR_MORE,
SqlFunctionCategory.STRING)

val REPLACE = new SqlFunction(
"REPLACE",
SqlKind.OTHER_FUNCTION,
ReturnTypes.ARG0_NULLABLE_VARYING,
null,
OperandTypes.STRING_STRING_STRING,
SqlFunctionCategory.STRING)

val LOG = new SqlFunction(
"LOG",
SqlKind.OTHER_FUNCTION,
Expand Down
Expand Up @@ -92,6 +92,17 @@ object ScalarFunctions {
sb.toString
}

/**
* Returns the string str with all non-overlapping occurrences
* of search replaced with replacement.
*/
def replace(str: String, search: String, replacement: String): String = {
if (str == null || search == null || replacement == null) {
return null
}
str.replace(search, replacement)
}

/**
* Returns the natural logarithm of "x".
*/
Expand Down
Expand Up @@ -396,7 +396,6 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.POSITION,
SqlStdOperatorTable.CHAR_LENGTH,
SqlStdOperatorTable.CHARACTER_LENGTH,
SqlStdOperatorTable.REPLACE,
SqlStdOperatorTable.UPPER,
SqlStdOperatorTable.LOWER,
SqlStdOperatorTable.INITCAP,
Expand Down Expand Up @@ -441,6 +440,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.RAND_INTEGER,
ScalarSqlFunctions.CONCAT,
ScalarSqlFunctions.CONCAT_WS,
ScalarSqlFunctions.REPLACE,
ScalarSqlFunctions.BIN,
ScalarSqlFunctions.HEX,
SqlStdOperatorTable.TIMESTAMP_ADD,
Expand Down
Expand Up @@ -107,6 +107,30 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"f0.replace(' ', '_')",
"REPLACE(f0, ' ', '_')",
"This_is_a_test_String.")

testAllApis(
'f0.replace("i", ""),
"f0.replace('i', '')",
"REPLACE(f0, 'i', '')",
"Ths s a test Strng.")

testAllApis(
'f33.replace("i", ""),
"f33.replace('i', '')",
"REPLACE(f33, 'i', '')",
"null")

testAllApis(
'f0.replace('f33, ""),
"f0.replace(f33, '')",
"REPLACE(f0, f33, '')",
"null")

testAllApis(
'f0.replace("", 'f33),
"f0.replace('', f33)",
"REPLACE(f0, '', f33)",
"null")
}

@Test
Expand Down

0 comments on commit 4b262a4

Please sign in to comment.