Skip to content

Commit

Permalink
[FLINK-32706][table] Add built-in SPLIT_STRING function (#24365)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanyuzheng7 committed May 22, 2024
1 parent e502d23 commit 4c6571d
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 2 deletions.
5 changes: 4 additions & 1 deletion docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,10 @@ collection:
- sql: ARRAY_EXCEPT(array1, array2)
table: arrayOne.arrayExcept(arrayTwo)
description: Returns an ARRAY that contains the elements from array1 that are not in array2. If no elements remain after excluding the elements in array2 from array1, the function returns an empty ARRAY. If one or both arguments are NULL, the function returns NULL. The order of the elements from array1 is kept.

- sql: SPLIT(string, delimiter)
table: string.split(delimiter)
description: Returns an array of substrings by splitting the input string based on the given delimiter. If the delimiter is not found in the string, the original string is returned as the only element in the array. If the delimiter is empty, every character in the string is split. If the string or delimiter is null, a null value is returned. If the delimiter is found at the beginning or end of the string, or there are contiguous delimiters, then an empty string is added to the array.

json:
- sql: IS JSON [ { VALUE | SCALAR | ARRAY | OBJECT } ]
table: STRING.isJson([JsonType type])
Expand Down
1 change: 1 addition & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ advanced type helper functions
Expression.map_union
Expression.map_values
Expression.array_except
Expression.split


time definition functions
Expand Down
11 changes: 11 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,17 @@ def array_except(self, array) -> 'Expression':
"""
return _binary_op("arrayExcept")(self, array)

def split(self, delimiter) -> 'Expression':
"""
Returns an array of substrings by splitting the input string based on the given delimiter.
If the delimiter is not found in the string, the original string is returned as the only
element in the array. If the delimiter is empty, every character in the string is split.
If the string or delimiter is null, a null value is returned. If the delimiter is found a
t the beginning or end of the string, or there are contiguous delimiters, then an empty
string is added to the array.
"""
return _binary_op("split")(self, delimiter)

@property
def map_keys(self) -> 'Expression':
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIMILAR;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SINH;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT_INDEX;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SQRT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_POP;
Expand Down Expand Up @@ -1534,6 +1535,20 @@ public OutType arrayMin() {
return toApiSpecificExpression(unresolvedCall(ARRAY_MIN, toExpr()));
}

/**
* Returns an array of substrings by splitting the input string based on a given delimiter.
*
* <p>If the delimiter is not found in the string, the original string is returned as the only
* element in the array. If the delimiter is empty, every character in the string is split. If
* the string or delimiter is null, a null value is returned. If the delimiter is found at the
* beginning or end of the string, or there are contiguous delimiters, then an empty string is
* added to the array.
*/
public OutType split(InType delimiter) {
return toApiSpecificExpression(
unresolvedCall(SPLIT, toExpr(), objectToExpression(delimiter)));
}

/** Returns the keys of the map as an array. */
public OutType mapKeys() {
return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,18 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.ArrayMinFunction")
.build();

public static final BuiltInFunctionDefinition SPLIT =
BuiltInFunctionDefinition.newBuilder()
.name("SPLIT")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING)))
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.ARRAY(STRING()))))
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.SplitFunction")
.build();

public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
BuiltInFunctionDefinition.newBuilder()
.name("$REPLICATE_ROWS$1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ Stream<TestSetSpec> getTestSetSpecs() {
arraySliceTestCases(),
arrayMinTestCases(),
arraySortTestCases(),
arrayExceptTestCases())
arrayExceptTestCases(),
splitTestCases())
.flatMap(s -> s);
}

Expand Down Expand Up @@ -1721,4 +1722,83 @@ private Stream<TestSetSpec> arrayExceptTestCases() {
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_EXCEPT(<COMMON>, <COMMON>)"));
}

private Stream<TestSetSpec> splitTestCases() {
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.SPLIT)
.onFieldsWithData(
"123,123,23",
null,
",123,123",
",123,123,",
123,
"12345",
",123,,,123,")
.andDataTypes(
DataTypes.STRING().notNull(),
DataTypes.STRING(),
DataTypes.STRING().notNull(),
DataTypes.STRING().notNull(),
DataTypes.INT().notNull(),
DataTypes.STRING().notNull(),
DataTypes.STRING().notNull())
.testResult(
$("f0").split(","),
"SPLIT(f0, ',')",
new String[] {"123", "123", "23"},
DataTypes.ARRAY(DataTypes.STRING()).notNull())
.testResult(
$("f0").split(null),
"SPLIT(f0, NULL)",
null,
DataTypes.ARRAY(DataTypes.STRING()))
.testResult(
$("f0").split(""),
"SPLIT(f0, '')",
new String[] {"1", "2", "3", ",", "1", "2", "3", ",", "2", "3"},
DataTypes.ARRAY(DataTypes.STRING()).notNull())
.testResult(
$("f1").split(","),
"SPLIT(f1, ',')",
null,
DataTypes.ARRAY(DataTypes.STRING()))
.testResult(
$("f1").split(null),
"SPLIT(f1, null)",
null,
DataTypes.ARRAY(DataTypes.STRING()))
.testResult(
$("f2").split(","),
"SPLIT(f2, ',')",
new String[] {"", "123", "123"},
DataTypes.ARRAY(DataTypes.STRING()).notNull())
.testResult(
$("f3").split(","),
"SPLIT(f3, ',')",
new String[] {"", "123", "123", ""},
DataTypes.ARRAY(DataTypes.STRING()).notNull())
.testResult(
$("f5").split(","),
"SPLIT(f5, ',')",
new String[] {"12345"},
DataTypes.ARRAY(DataTypes.STRING()).notNull())
.testResult(
$("f6").split(","),
"SPLIT(f6, ',')",
new String[] {"", "123", "", "", "123", ""},
DataTypes.ARRAY(DataTypes.STRING()).notNull())
.testTableApiValidationError(
$("f4").split(","),
"Invalid input arguments. Expected signatures are:\n"
+ "SPLIT(<CHARACTER_STRING>, <CHARACTER_STRING>)")
.testSqlValidationError(
"SPLIT(f4, ',')",
"Invalid input arguments. Expected signatures are:\n"
+ "SPLIT(<CHARACTER_STRING>, <CHARACTER_STRING>)")
.testSqlValidationError(
"SPLIT()", "No match found for function signature SPLIT()")
.testSqlValidationError(
"SPLIT(f1, '1', '2')",
"No match found for function signature SPLIT(<CHARACTER>, <CHARACTER>, <CHARACTER>)"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.table.runtime.functions.scalar;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.binary.BinaryStringDataUtil;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.util.FlinkRuntimeException;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;

/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */
@Internal
public class SplitFunction extends BuiltInScalarFunction {
public SplitFunction(SpecializedFunction.SpecializedContext context) {
super(BuiltInFunctionDefinitions.SPLIT, context);
}

public @Nullable ArrayData eval(@Nullable StringData string, @Nullable StringData delimiter) {
try {
if (string == null || delimiter == null) {
return null;
}
if (delimiter.toString().isEmpty()) {
String str = string.toString();
List<StringData> res = new ArrayList<>();
for (int i = 0; i < str.length(); i++) {
res.add(StringData.fromString(String.valueOf(str.charAt(i))));
}
return new GenericArrayData(res.toArray());
}
BinaryStringData[] binaryStringData =
BinaryStringDataUtil.splitByWholeSeparatorPreserveAllTokens(
(BinaryStringData) string, (BinaryStringData) delimiter);
return new GenericArrayData(binaryStringData);
} catch (Throwable t) {
throw new FlinkRuntimeException(t);
}
}
}

0 comments on commit 4c6571d

Please sign in to comment.