Skip to content

Commit

Permalink
[FLINK-27891][Table] Add ARRAY_APPEND and ARRAY_PREPEND functions
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Mar 30, 2023
1 parent c2541cd commit 4c8c27f
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/data/sql_functions.yml
Expand Up @@ -628,12 +628,18 @@ collection:
- sql: map ‘[’ value ‘]’
table: MAP.at(ANY)
description: Returns the value specified by key value in map.
- sql: ARRAY_APPEND(haystack, element)
table: haystack.arrayAppend(needle)
description: Appends an element to the end of the array and returns the result. If the array itself is null, the function will return null. If an element to add is null, the null element will be added to the end of the array. The given element is cast implicitly to the array's element type if necessary.
- sql: ARRAY_CONTAINS(haystack, needle)
table: haystack.arrayContains(needle)
description: Returns whether the given element exists in an array. Checking for null elements in the array is supported. If the array itself is null, the function will return null. The given element is cast implicitly to the array's element type if necessary.
- sql: ARRAY_DISTINCT(haystack)
table: haystack.arrayDistinct()
description: Returns an array with unique elements. If the array itself is null, the function will return null. Keeps ordering of elements.
- sql: ARRAY_PREPEND(element, haystack)
table: haystack.arrayPrepend(needle)
description: Appends an element to the beginning of the array and returns the result. If the array itself is null, the function will return null. If an element to add is null, the null element will be added to the beginning of the array. The given element is cast implicitly to the array's element type if necessary.
- sql: ARRAY_REMOVE(haystack, needle)
table: haystack.arrayRemove(needle)
description: Removes all elements that equal to element from array. If the array itself is null, the function will return null. Keeps ordering of elements.
Expand Down
20 changes: 20 additions & 0 deletions flink-python/pyflink/table/expression.py
Expand Up @@ -1470,6 +1470,16 @@ def element(self) -> 'Expression':
"""
return _unary_op("element")(self)

def array_append(self, addition) -> 'Expression':
"""
Appends an element to the end of the array and returns the result.
If the array itself is null, the function will return null. If an element to add is null,
the null element will be added to the end of the array. The given element is cast
implicitly to the array's element type if necessary.
"""
return _binary_op("arrayAppend")(self, addition)

def array_contains(self, needle) -> 'Expression':
"""
Returns whether the given element exists in an array.
Expand All @@ -1487,6 +1497,16 @@ def array_distinct(self) -> 'Expression':
"""
return _binary_op("arrayDistinct")(self)

def array_prepend(self, addition) -> 'Expression':
"""
Appends an element to the beginning of the array and returns the result.
If the array itself is null, the function will return null. If an element to add is null,
the null element will be added to the beginning of the array. The given element is cast
implicitly to the array's element type if necessary.
"""
return _binary_op("arrayPrepend")(self, addition)

def array_remove(self, needle) -> 'Expression':
"""
Removes all elements that equal to element from array.
Expand Down
Expand Up @@ -53,9 +53,11 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_APPEND;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_PREPEND;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REMOVE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ASCII;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ASIN;
Expand Down Expand Up @@ -1341,6 +1343,18 @@ public OutType element() {
return toApiSpecificExpression(unresolvedCall(ARRAY_ELEMENT, toExpr()));
}

/**
* Appends an element to the end of the array and returns the result.
*
* <p>If the array itself is null, the function will return null. If an element to add is null,
* the null element will be added to the end of the array. The given element is cast implicitly
* to the array's element type if necessary.
*/
public OutType arrayAppend(InType element) {
return toApiSpecificExpression(
unresolvedCall(ARRAY_APPEND, toExpr(), objectToExpression(element)));
}

/**
* Returns whether the given element exists in an array.
*
Expand All @@ -1362,6 +1376,18 @@ public OutType arrayDistinct() {
return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, toExpr()));
}

/**
* Appends an element to the beginning of the array and returns the result.
*
* <p>If the array itself is null, the function will return null. If an element to add is null,
* the null element will be added to the beginning of the array. The given element is cast
* implicitly to the array's element type if necessary.
*/
public OutType arrayPrepend(InType element) {
return toApiSpecificExpression(
unresolvedCall(ARRAY_PREPEND, toExpr(), objectToExpression(element)));
}

/**
* Removes all elements that equal to element from array.
*
Expand Down
Expand Up @@ -91,6 +91,7 @@
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;

/** Dictionary of function definitions for all built-in functions. */
@PublicEvolving
Expand Down Expand Up @@ -191,6 +192,20 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.CoalesceFunction")
.build();

public static final BuiltInFunctionDefinition ARRAY_APPEND =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_APPEND")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
Arrays.asList("haystack", "element"),
Arrays.asList(
logical(LogicalTypeRoot.ARRAY), ARRAY_ELEMENT_ARG)))
.outputTypeStrategy(nullableIfArgs(nullableIfArgs(ARRAY_APPEND_PREPEND)))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArrayAppendFunction")
.build();

public static final BuiltInFunctionDefinition ARRAY_CONTAINS =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_CONTAINS")
Expand Down Expand Up @@ -220,6 +235,20 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.ArrayDistinctFunction")
.build();

public static final BuiltInFunctionDefinition ARRAY_PREPEND =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_PREPEND")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
Arrays.asList("element", "haystack"),
Arrays.asList(
logical(LogicalTypeRoot.ARRAY), ARRAY_ELEMENT_ARG)))
.outputTypeStrategy(nullableIfArgs(ARRAY_APPEND_PREPEND))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArrayPrependFunction")
.build();

public static final BuiltInFunctionDefinition ARRAY_REMOVE =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_REMOVE")
Expand Down
@@ -0,0 +1,32 @@
package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.table.types.logical.LogicalType;

import java.util.List;
import java.util.Optional;

import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;

/**
* Type strategy that returns a {@link DataTypes#ARRAY(DataType)} with element type equal to the
* type of the first argument.
*/
@Internal
public class ArrayAppendPrependTypeStrategy implements TypeStrategy {
@Override
public Optional<DataType> inferType(CallContext callContext) {
final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
final DataType arrayDataType = argumentDataTypes.get(0);
final DataType elementToAddDataType = argumentDataTypes.get(1);
LogicalType logicalType = arrayDataType.getLogicalType().getChildren().get(0);
if (elementToAddDataType.getLogicalType().isNullable() && !logicalType.isNullable()) {
return Optional.of(DataTypes.ARRAY(fromLogicalToDataType(logicalType).nullable()));
}
return Optional.of(arrayDataType);
}
}
Expand Up @@ -52,6 +52,9 @@ public final class SpecificTypeStrategies {
/** See {@link ArrayTypeStrategy}. */
public static final TypeStrategy ARRAY = new ArrayTypeStrategy();

/** See {@link ArrayTypeStrategy}. */
public static final TypeStrategy ARRAY_APPEND_PREPEND = new ArrayAppendPrependTypeStrategy();

/** See {@link GetTypeStrategy}. */
public static final TypeStrategy GET = new GetTypeStrategy();

Expand Down
Expand Up @@ -38,6 +38,76 @@ class CollectionFunctionsITCase extends BuiltInFunctionTestBase {
@Override
Stream<TestSetSpec> getTestSetSpecs() {
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_APPEND)
.onFieldsWithData(
new Integer[] {1, 2}, null, new String[] {"Hello", "World"})
.andDataTypes(
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.STRING().notNull()))
.testResult(
$("f0").arrayAppend(null),
"ARRAY_APPEND(f0, null)",
new Integer[] {1, 2, null},
DataTypes.ARRAY(DataTypes.INT()))
.testResult(
$("f1").arrayAppend(1),
"ARRAY_APPEND(f1, 1)",
null,
DataTypes.ARRAY(DataTypes.INT()).nullable())
.testResult(
$("f2").arrayAppend("!"),
"ARRAY_APPEND(f2, '!')",
new String[] {"Hello", "World", "!"},
DataTypes.ARRAY(DataTypes.STRING().notNull()))
.testResult(
$("f2").arrayAppend(null),
"ARRAY_APPEND(f2, NULL)",
new String[] {"Hello", "World", null},
DataTypes.ARRAY(DataTypes.STRING()))
.testSqlValidationError(
"ARRAY_APPEND(f2, 1)",
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_APPEND(haystack <ARRAY>, element <ARRAY ELEMENT>)")
.testTableApiValidationError(
$("f2").arrayAppend(1),
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_APPEND(haystack <ARRAY>, element <ARRAY ELEMENT>)"),
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_PREPEND)
.onFieldsWithData(
new Integer[] {1, 2}, null, new String[] {"Hello", "World"})
.andDataTypes(
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.STRING().notNull()))
.testResult(
$("f0").arrayPrepend(1),
"ARRAY_PREPEND(f0, 1)",
new Integer[] {1, 1, 2},
DataTypes.ARRAY(DataTypes.INT()))
.testResult(
$("f1").arrayPrepend(1),
"ARRAY_PREPEND(f1, 1)",
null,
DataTypes.ARRAY(DataTypes.INT()).nullable())
.testResult(
$("f2").arrayPrepend("!"),
"ARRAY_PREPEND(f2, '!')",
new String[] {"!", "Hello", "World"},
DataTypes.ARRAY(DataTypes.STRING().notNull()))
.testResult(
$("f2").arrayPrepend(null),
"ARRAY_PREPEND(f2, NULL)",
new String[] {null, "Hello", "World"},
DataTypes.ARRAY(DataTypes.STRING()))
.testSqlValidationError(
"ARRAY_PREPEND(1, f2)",
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_PREPEND(element <ARRAY>, haystack <ARRAY ELEMENT>)")
.testTableApiValidationError(
$("f2").arrayPrepend(1),
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_PREPEND(element <ARRAY>, haystack <ARRAY ELEMENT>)"),
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_CONTAINS)
.onFieldsWithData(
new Integer[] {1, 2, 3},
Expand Down
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.functions.scalar;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;

import javax.annotation.Nullable;

/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_APPEND}. */
@Internal
public class ArrayAppendFunction extends BuiltInScalarFunction {
private final ArrayData.ElementGetter elementGetter;

public ArrayAppendFunction(SpecializedFunction.SpecializedContext context) {
super(BuiltInFunctionDefinitions.ARRAY_APPEND, context);
final DataType dataType =
((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
.getElementDataType();
elementGetter = ArrayData.createElementGetter(dataType.getLogicalType());
}

public @Nullable ArrayData eval(ArrayData haystack, Object element) {
if (haystack == null) {
return null;
}
final int size = haystack.size();
final Object[] data = new Object[size + 1];
for (int pos = 0; pos < size; pos++) {
data[pos] = elementGetter.getElementOrNull(haystack, pos);
}
data[size] = element;
return new GenericArrayData(data);
}
}
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.functions.scalar;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;

import javax.annotation.Nullable;

/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_PREPEND}. */
@Internal
public class ArrayPrependFunction extends BuiltInScalarFunction {
private final ArrayData.ElementGetter elementGetter;

public ArrayPrependFunction(SpecializedFunction.SpecializedContext context) {
super(BuiltInFunctionDefinitions.ARRAY_APPEND, context);
final DataType dataType =
((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
.getElementDataType();
elementGetter = ArrayData.createElementGetter(dataType.getLogicalType());
}

public @Nullable ArrayData eval(ArrayData haystack, Object element) {
if (haystack == null) {
return null;
}
final int size = haystack.size();
final Object[] data = new Object[size + 1];
data[0] = element;
for (int pos = 0; pos < size; pos++) {
data[pos + 1] = elementGetter.getElementOrNull(haystack, pos);
}
return new GenericArrayData(data);
}
}

0 comments on commit 4c8c27f

Please sign in to comment.