Skip to content

Commit

Permalink
[FLINK-31663] Implement ARRAY_EXCEPT function
Browse files Browse the repository at this point in the history
  • Loading branch information
hanyuzheng7 authored and dawidwys committed Mar 7, 2024
1 parent 6f7b248 commit 2429c29
Show file tree
Hide file tree
Showing 9 changed files with 424 additions and 2 deletions.
5 changes: 4 additions & 1 deletion docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,10 @@ collection:
- sql: MAP_FROM_ARRAYS(array_of_keys, array_of_values)
table: mapFromArrays(array_of_keys, array_of_values)
description: Returns a map created from an arrays of keys and values. Note that the lengths of two arrays should be the same.

- sql: ARRAY_EXCEPT(array1, array2)
table: arrayOne.arrayExcept(arrayTwo)
description: Returns an ARRAY that contains the elements from array1 that are not in array2. If no elements remain after excluding the elements in array2 from array1, the function returns an empty ARRAY. If one or both arguments are NULL, the function returns NULL. The order of the elements from array1 is kept.

json:
- sql: IS JSON [ { VALUE | SCALAR | ARRAY | OBJECT } ]
table: STRING.isJson([JsonType type])
Expand Down
1 change: 1 addition & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ advanced type helper functions
Expression.map_entries
Expression.map_keys
Expression.map_values
Expression.array_except


time definition functions
Expand Down
9 changes: 9 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,15 @@ def array_min(self) -> 'Expression':
"""
return _unary_op("arrayMin")(self)

def array_except(self, array) -> 'Expression':
"""
Returns an ARRAY that contains the elements from array1 that are not in array2.
If no elements remain after excluding the elements in array2 from array1,
the function returns an empty ARRAY. If one or both arguments are NULL,
the function returns NULL. The order of the elements from array1 is kept.
"""
return _binary_op("arrayExcept")(self, array)

@property
def map_keys(self) -> 'Expression':
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_EXCEPT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MAX;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MIN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_POSITION;
Expand Down Expand Up @@ -230,6 +231,19 @@ public OutType as(String name, String... extraNames) {
.toArray(Expression[]::new)));
}

/**
* Returns an ARRAY that contains the elements from array1 that are not in array2. If no
* elements remain after excluding the elements in array2 from array1, the function returns an
* empty ARRAY.
*
* <p>If one or both arguments are NULL, the function returns NULL. The order of the elements
* from array1 is kept.
*/
public OutType arrayExcept(InType array) {
return toApiSpecificExpression(
unresolvedCall(ARRAY_EXCEPT, toExpr(), objectToExpression(array)));
}

/**
* Boolean AND in three-valued logic. This is an infix notation. See also {@link
* Expressions#and(Object, Object, Object...)} for prefix notation with multiple arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.internal()
.build();

public static final BuiltInFunctionDefinition ARRAY_EXCEPT =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_EXCEPT")
.kind(SCALAR)
.inputTypeStrategy(commonArrayType(2))
.outputTypeStrategy(nullableIfArgs(COMMON))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArrayExceptFunction")
.build();

// --------------------------------------------------------------------------------------------
// Logic functions
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ Stream<TestSetSpec> getTestSetSpecs() {
arrayJoinTestCases(),
arraySliceTestCases(),
arrayMinTestCases(),
arraySortTestCases())
arraySortTestCases(),
arrayExceptTestCases())
.flatMap(s -> s);
}

Expand Down Expand Up @@ -1596,4 +1597,128 @@ private Stream<TestSetSpec> arraySortTestCases() {
},
DataTypes.ARRAY(DataTypes.DATE())));
}

private Stream<TestSetSpec> arrayExceptTestCases() {
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_EXCEPT)
.onFieldsWithData(
new Integer[] {1, 2, 2},
null,
new Row[] {
Row.of(true, LocalDate.of(2022, 4, 20)),
Row.of(true, LocalDate.of(1990, 10, 14)),
null
},
new Integer[] {null, null, 1},
new Integer[][] {
new Integer[] {1, null, 3}, new Integer[] {0}, new Integer[] {1}
},
new Map[] {
CollectionUtil.map(entry(1, "a"), entry(2, "b")),
CollectionUtil.map(entry(3, "c"), entry(4, "d")),
null
},
new Integer[] {1, 2, 3, 2, 4, 2})
.andDataTypes(
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(
DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())),
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())),
DataTypes.ARRAY(DataTypes.INT()))
// ARRAY<INT>
.testResult(
$("f0").arrayExcept(new Integer[] {1, null, 4}),
"ARRAY_EXCEPT(f0, ARRAY[1, NULL, 4])",
new Integer[] {2, 2},
DataTypes.ARRAY(DataTypes.INT()).nullable())
.testResult(
$("f0").arrayExcept(new Integer[] {1}),
"ARRAY_EXCEPT(f0, ARRAY[1])",
new Integer[] {2, 2},
DataTypes.ARRAY(DataTypes.INT()).nullable())
.testResult(
$("f0").arrayExcept(new Integer[] {42}),
"ARRAY_EXCEPT(f0, ARRAY[42])",
new Integer[] {1, 2, 2},
DataTypes.ARRAY(DataTypes.INT()).nullable())
.testResult(
$("f6").arrayExcept(new Integer[] {2, 2}),
"ARRAY_EXCEPT(f6, ARRAY[2, 2])",
new Integer[] {1, 3, 4, 2},
DataTypes.ARRAY(DataTypes.INT()).nullable())
// arrayTwo is NULL
.testResult(
$("f0").arrayExcept(
lit(null, DataTypes.ARRAY(DataTypes.INT()))
.cast(DataTypes.ARRAY(DataTypes.INT()))),
"ARRAY_EXCEPT(f0, CAST(NULL AS ARRAY<INT>))",
null,
DataTypes.ARRAY(DataTypes.INT()).nullable())
// arrayTwo contains null elements
.testResult(
$("f0").arrayExcept(new Integer[] {null, 2}),
"ARRAY_EXCEPT(f0, ARRAY[null, 2])",
new Integer[] {1, 2},
DataTypes.ARRAY(DataTypes.INT()).nullable())
// arrayOne is NULL
.testResult(
$("f1").arrayExcept(new Integer[] {1, 2, 3}),
"ARRAY_EXCEPT(f1, ARRAY[1,2,3])",
null,
DataTypes.ARRAY(DataTypes.INT()).nullable())
// arrayOne contains null elements
.testResult(
$("f3").arrayExcept(new Integer[] {null, 42}),
"ARRAY_EXCEPT(f3, ARRAY[null, 42])",
new Integer[] {null, 1},
DataTypes.ARRAY(DataTypes.INT()).nullable())
// ARRAY<ROW<BOOLEAN, DATE>>
.testResult(
$("f2").arrayExcept(
new Row[] {
Row.of(true, LocalDate.of(1990, 10, 14))
}),
"ARRAY_EXCEPT(f2, ARRAY[(TRUE, DATE '1990-10-14')])",
new Row[] {Row.of(true, LocalDate.of(2022, 4, 20)), null},
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.BOOLEAN(), DataTypes.DATE()))
.nullable())
// ARRAY<ARRAY<INT>>
.testResult(
$("f4").arrayExcept(new Integer[][] {new Integer[] {0}}),
"ARRAY_EXCEPT(f4, ARRAY[ARRAY[0]])",
new Integer[][] {new Integer[] {1, null, 3}, new Integer[] {1}},
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()).nullable()))
// ARRAY<MAP<INT, STRING>> with NULL elements
.testResult(
$("f5").arrayExcept(
new Map[] {
CollectionUtil.map(entry(3, "c"), entry(4, "d"))
}),
"ARRAY_EXCEPT(f5, ARRAY[MAP[3, 'c', 4, 'd']])",
new Map[] {CollectionUtil.map(entry(1, "a"), entry(2, "b")), null},
DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))
.nullable())
// Invalid signatures
.testSqlValidationError(
"ARRAY_EXCEPT(f0, TRUE)",
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_EXCEPT(<COMMON>, <COMMON>)")
.testTableApiValidationError(
$("f0").arrayExcept(true),
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_EXCEPT(<COMMON>, <COMMON>)")
.testSqlValidationError(
"ARRAY_EXCEPT(f0, ARRAY['hi', 'there'])",
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_EXCEPT(<COMMON>, <COMMON>)")
.testTableApiValidationError(
$("f0").arrayExcept(new String[] {"hi", "there"}),
"Invalid input arguments. Expected signatures are:\n"
+ "ARRAY_EXCEPT(<COMMON>, <COMMON>)"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.functions.scalar;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
import org.apache.flink.table.runtime.util.ObjectContainer;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.FlinkRuntimeException;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */
@Internal
public class ArrayExceptFunction extends BuiltInScalarFunction {
private final ArrayData.ElementGetter elementGetter;
private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;

public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) {
super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context);
final DataType dataType =
((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
.getElementDataType()
.toInternal();
elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType);
}

@Override
public void open(FunctionContext context) throws Exception {
equalityAndHashcodeProvider.open(context);
}

public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
try {
if (arrayOne == null || arrayTwo == null) {
return null;
}

List<Object> list = new ArrayList<>();
Map<ObjectContainer, Integer> map = new HashMap<>();
for (int pos = 0; pos < arrayTwo.size(); pos++) {
final Object element = elementGetter.getElementOrNull(arrayTwo, pos);
final ObjectContainer objectContainer = createObjectContainer(element);
map.merge(objectContainer, 1, (k, v) -> v + 1);
}
for (int pos = 0; pos < arrayOne.size(); pos++) {
final Object element = elementGetter.getElementOrNull(arrayOne, pos);
final ObjectContainer objectContainer = createObjectContainer(element);
if (map.containsKey(objectContainer)) {
map.compute(objectContainer, (k, v) -> v == null || v == 1 ? null : v - 1);
} else {
list.add(element);
}
}
return new GenericArrayData(list.toArray());
} catch (Throwable t) {
throw new FlinkRuntimeException(t);
}
}

private ObjectContainer createObjectContainer(Object element) {
if (element == null) {
return null;
}
return new ObjectContainer(
element,
equalityAndHashcodeProvider::equals,
equalityAndHashcodeProvider::hashCode);
}

@Override
public void close() throws Exception {
equalityAndHashcodeProvider.close();
}
}
Loading

0 comments on commit 2429c29

Please sign in to comment.