diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 15c30eda42d02c..7769df54b5ee71 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -628,12 +628,18 @@ collection: - sql: map ‘[’ value ‘]’ table: MAP.at(ANY) description: Returns the value specified by key value in map. + - sql: ARRAY_APPEND(haystack, element) + table: haystack.arrayAppend(needle) + description: Appends an element to the end of the array and returns the result. If the array itself is null, the function will return null. If an element to add is null, the null element will be added to the end of the array. The given element is cast implicitly to the array's element type if necessary. - sql: ARRAY_CONTAINS(haystack, needle) table: haystack.arrayContains(needle) description: Returns whether the given element exists in an array. Checking for null elements in the array is supported. If the array itself is null, the function will return null. The given element is cast implicitly to the array's element type if necessary. - sql: ARRAY_DISTINCT(haystack) table: haystack.arrayDistinct() description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements. + - sql: ARRAY_PREPEND(element, haystack) + table: haystack.arrayPrepend(needle) + description: Appends an element to the beginning of the array and returns the result. If the array itself is null, the function will return null. If an element to add is null, the null element will be added to the beginning of the array. The given element is cast implicitly to the array's element type if necessary. - sql: ARRAY_REMOVE(haystack, needle) table: haystack.arrayRemove(needle) description: Removes all elements that equal to element from array. If the array itself is null, the function will return null. Keeps ordering of elements. diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index ab608c5cd02709..4c97f742d188fe 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1470,6 +1470,16 @@ def element(self) -> 'Expression': """ return _unary_op("element")(self) + def array_append(self, addition) -> 'Expression': + """ + Appends an element to the end of the array and returns the result. + + If the array itself is null, the function will return null. If an element to add is null, + the null element will be added to the end of the array. The given element is cast + implicitly to the array's element type if necessary. + """ + return _binary_op("arrayAppend")(self, addition) + def array_contains(self, needle) -> 'Expression': """ Returns whether the given element exists in an array. @@ -1487,6 +1497,16 @@ def array_distinct(self) -> 'Expression': """ return _binary_op("arrayDistinct")(self) + def array_prepend(self, addition) -> 'Expression': + """ + Appends an element to the beginning of the array and returns the result. + + If the array itself is null, the function will return null. If an element to add is null, + the null element will be added to the beginning of the array. The given element is cast + implicitly to the array's element type if necessary. + """ + return _binary_op("arrayPrepend")(self, addition) + def array_remove(self, needle) -> 'Expression': """ Removes all elements that equal to element from array. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index c43307dcb3d445..89f74e0ed8bc93 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -53,9 +53,11 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_APPEND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_PREPEND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REMOVE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ASCII; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ASIN; @@ -1341,6 +1343,18 @@ public OutType element() { return toApiSpecificExpression(unresolvedCall(ARRAY_ELEMENT, toExpr())); } + /** + * Appends an element to the end of the array and returns the result. + * + *
If the array itself is null, the function will return null. If an element to add is null, + * the null element will be added to the end of the array. The given element is cast implicitly + * to the array's element type if necessary. + */ + public OutType arrayAppend(InType element) { + return toApiSpecificExpression( + unresolvedCall(ARRAY_APPEND, toExpr(), objectToExpression(element))); + } + /** * Returns whether the given element exists in an array. * @@ -1362,6 +1376,18 @@ public OutType arrayDistinct() { return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, toExpr())); } + /** + * Appends an element to the beginning of the array and returns the result. + * + *
If the array itself is null, the function will return null. If an element to add is null,
+ * the null element will be added to the beginning of the array. The given element is cast
+ * implicitly to the array's element type if necessary.
+ */
+ public OutType arrayPrepend(InType element) {
+ return toApiSpecificExpression(
+ unresolvedCall(ARRAY_PREPEND, toExpr(), objectToExpression(element)));
+ }
+
/**
* Removes all elements that equal to element from array.
*
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 8860fe02a17bd4..88b17b12f1c840 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -91,6 +91,7 @@
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;
+import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
/** Dictionary of function definitions for all built-in functions. */
@PublicEvolving
@@ -191,6 +192,20 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.CoalesceFunction")
.build();
+ public static final BuiltInFunctionDefinition ARRAY_APPEND =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("ARRAY_APPEND")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ sequence(
+ Arrays.asList("haystack", "element"),
+ Arrays.asList(
+ logical(LogicalTypeRoot.ARRAY), ARRAY_ELEMENT_ARG)))
+ .outputTypeStrategy(nullableIfArgs(nullableIfArgs(ARRAY_APPEND_PREPEND)))
+ .runtimeClass(
+ "org.apache.flink.table.runtime.functions.scalar.ArrayAppendFunction")
+ .build();
+
public static final BuiltInFunctionDefinition ARRAY_CONTAINS =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_CONTAINS")
@@ -220,6 +235,20 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.ArrayDistinctFunction")
.build();
+ public static final BuiltInFunctionDefinition ARRAY_PREPEND =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("ARRAY_PREPEND")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ sequence(
+ Arrays.asList("element", "haystack"),
+ Arrays.asList(
+ logical(LogicalTypeRoot.ARRAY), ARRAY_ELEMENT_ARG)))
+ .outputTypeStrategy(nullableIfArgs(ARRAY_APPEND_PREPEND))
+ .runtimeClass(
+ "org.apache.flink.table.runtime.functions.scalar.ArrayPrependFunction")
+ .build();
+
public static final BuiltInFunctionDefinition ARRAY_REMOVE =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_REMOVE")
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategy.java
new file mode 100644
index 00000000000000..a6d84f07639439
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategy.java
@@ -0,0 +1,32 @@
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Type strategy that returns a {@link DataTypes#ARRAY(DataType)} with element type equal to the
+ * type of the first argument.
+ */
+@Internal
+public class ArrayAppendPrependTypeStrategy implements TypeStrategy {
+ @Override
+ public Optional