Skip to content

Commit

Permalink
feat: add ARRAY_CONCAT UDF (#7761)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickstuedi committed Jul 12, 2021
1 parent 2b5520d commit 1de9ef8
Show file tree
Hide file tree
Showing 19 changed files with 1,680 additions and 1 deletion.
18 changes: 18 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,24 @@ ARRAY_UNION(ARRAY[1, 2, 3, 1, 2], [4, 1]) => [1, 2, 3, 4]
ARRAY_UNION(ARRAY['apple', 'apple', NULL, 'cherry'], ARRAY['cherry']) => ['apple', NULL, 'cherry']
```

### `ARRAY_CONCAT`

Since: 0.20.0

```sql
ARRAY_CONCAT(array1, array2)
```

Returns an array representing the concatenation of both input arrays.

Returns NULL if both input arrays are NULL. If only one argument is NULL, the result is the other argument.

Examples:
```sql
ARRAY_CONCAT(ARRAY[1, 2, 3, 1, 2], [4, 1]) => [1, 2, 3, 1, 2, 4, 1]
ARRAY_CONCAT(ARRAY['apple', 'apple', NULL, 'cherry'], ARRAY['cherry']) => ['apple', 'apple', NULL, 'cherry', 'cherry']
```

### `AS_MAP`

Since: 0.6.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the
* License.
*/

package io.confluent.ksql.function.udf.array;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.util.ArrayList;
import java.util.List;


@UdfDescription(
name = "array_concat",
category = FunctionCategory.ARRAY,
description = "Concatenates two arrays, creating an array that contains all the elements"
+ " in the first array followed by all the elements in the second array."
+ " Returns NULL if both input arrays are NULL."
+ " If only one argument is NULL, the result is the other argument."
+ " The two arrays must be of the same type.")
public class ArrayConcat {
@Udf
public <T> List<T> concat(
@UdfParameter(description = "First array of values") final List<T> left,
@UdfParameter(description = "Second array of values") final List<T> right) {
if (left == null && right == null) {
return null;
}
final int leftSize = left != null ? left.size() : 0;
final int rightSize = right != null ? right.size() : 0;
final List<T> result = new ArrayList(leftSize + rightSize);
if (left != null) {
result.addAll(left);
}
if (right != null) {
result.addAll(right);
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the
* License.
*/

package io.confluent.ksql.function.udf.array;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.junit.Test;

public class ArrayConcatTest {

private final ArrayConcat udf = new ArrayConcat();

@Test
public void shouldConcatArraysOfLikeType() {
final List<String> input1 = Arrays.asList("foo", " ", "bar");
final List<String> input2 = Arrays.asList("baz");
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList("foo", " ", "bar", "baz")));
}

@Test
public void shouldReturnDuplicateValues() {
final List<String> input1 = Arrays.asList("foo", "foo", "bar");
final List<String> input2 = Arrays.asList("baz", "foo");
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList("foo", "foo", "bar", "baz", "foo")));
}

@Test
public void shouldConcatArraysContainingNulls() {
final List<String> input1 = Arrays.asList(null, "bar");
final List<String> input2 = Arrays.asList("foo");
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList(null, "bar", "foo")));
}

@Test
public void shouldConcatArraysBothContainingNulls() {
final List<String> input1 = Arrays.asList(null, "foo", "bar");
final List<String> input2 = Arrays.asList("foo", null);
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList(null, "foo", "bar", "foo", null)));
}

@Test
public void shouldConcatArraysOfOnlyNulls() {
final List<String> input1 = Arrays.asList(null, null);
final List<String> input2 = Arrays.asList(null, null, null);
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList(null, null, null, null, null)));
}

@Test
public void shouldReturnNonNullForNullRightInput() {
final List<String> input1 = Arrays.asList("foo");
final List<String> result = udf.concat(input1, null);
assertThat(result, is(Arrays.asList("foo")));
}

@Test
public void shouldReturnNullForNullLeftInput() {
final List<String> input1 = Arrays.asList("foo");
final List<String> result = udf.concat(null, input1);
assertThat(result, is(Arrays.asList("foo")));
}

@Test
public void shouldReturnNullForAllNullInputs() {
final List<Long> result = udf.concat((List<Long>) null, (List<Long>) null);
assertThat(result, is(nullValue()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ID STRING KEY, ARR1 ARRAY<INTEGER>, ARR2 ARRAY<INTEGER>) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ID` STRING KEY, `ARR1` ARRAY<INTEGER>, `ARR2` ARRAY<INTEGER>",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n ARRAY_CONCAT(INPUT.ARR1, INPUT.ARR2) RESULT\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` STRING KEY, `RESULT` ARRAY<INTEGER>",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` STRING KEY, `ARR1` ARRAY<INTEGER>, `ARR2` ARRAY<INTEGER>"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "ARRAY_CONCAT(ARR1, ARR2) AS RESULT" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.query.push.scalable.interpreter.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}

0 comments on commit 1de9ef8

Please sign in to comment.