Skip to content

Commit

Permalink
feat: Adds udf regexp_split_to_array (#5501)
Browse files Browse the repository at this point in the history
* feat: Adds udf regexp_split
  • Loading branch information
AlanConfluent committed Jun 1, 2020
1 parent 27d8ad5 commit 3766129
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 1 deletion.
20 changes: 19 additions & 1 deletion docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ the entire substring is returned by default.
For example, `REGEXP_EXTRACT("(.*) (.*)", 'hello there', 2)`
returns "there".

### REGEXP_EXTRACT_ALL
### `REGEXP_EXTRACT_ALL`

```sql
REGEXP_EXTRACT_ALL('.*', col1)
Expand All @@ -481,6 +481,24 @@ the entire substring is returned by default.
For example, `REGEXP_EXTRACT("(\\w+) (\\w+)", 'hello there nice day', 2)`
returns `['there', 'day']`.

### `REGEXP_SPLIT_TO_ARRAY`

```sql
REGEXP_SPLIT_TO_ARRAY(col1, 'a.b+')
```

Splits a string into an array of substrings based
on a regular expression. If there is no match,
the original string is returned as the only
element in the array. If the regular expression is empty,
then all characters in the string are split.
If either the string or the regular expression is `NULL`, a
NULL value is returned.

If the regular expression is found at the beginning or end
of the string, or there are contiguous matches,
then an empty element is added to the array.

### `SPLIT`

```sql
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.string;

import com.google.common.base.Splitter;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

@UdfDescription(name = "regexp_split_to_array",
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Splits a string into an array of substrings based on a regexp. "
+ "If the regexp is found at the beginning of the string, end of the string, or there "
+ "are contiguous matches in the string, then empty strings are added to the array. "
+ "If the regexp is not found, then the original string is returned as the only "
+ "element in the array. If the regexp is empty, then all characters in the string are "
+ "split.")
public class RegexpSplitToArray {

@Udf(description = "Splits a string into an array of substrings based on a regexp.")
public List<String> regexpSplit(
@UdfParameter(
description = "The string to be split. If NULL, then function returns NULL.")
final String string,
@UdfParameter(
description = "The regular expression to split the string by. "
+ "If NULL, then function returns NULL.")
final String regexp) {
if (string == null || regexp == null) {
return null;
}

// Use Guava version to be compatible with other splitting functions.
final Pattern p = getPattern(regexp);
if (regexp.isEmpty() || p.matcher("").matches()) {
return Arrays.asList(p.split(string));
} else {
return Splitter.on(p).splitToList(string);
}
}

private Pattern getPattern(final String regexp) {
try {
return Pattern.compile(regexp);
} catch (PatternSyntaxException e) {
throw new KsqlFunctionException("Invalid regular expression pattern: " + regexp, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.confluent.ksql.function.udf.string;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import io.confluent.ksql.function.KsqlFunctionException;
import org.junit.Test;

public class RegexpSplitToArrayTest {

private final RegexpSplitToArray udf = new RegexpSplitToArray();

@Test
public void shouldReturnNull() {
assertThat(udf.regexpSplit(null, "a"), is(nullValue()));
assertThat(udf.regexpSplit("string", null), is(nullValue()));
assertThat(udf.regexpSplit(null, null), is(nullValue()));
}

@Test
public void shouldReturnOriginalStringOnNotFoundRegexp() {
assertThat(udf.regexpSplit("", "z"), contains(""));
assertThat(udf.regexpSplit("x-y", "z"), contains("x-y"));
}

@Test
public void shouldSplitAllCharactersByGivenAnEmptyRegexp() {
assertThat(udf.regexpSplit("", ""), contains(""));
assertThat(udf.regexpSplit("x-y", ""), contains("x", "-", "y"));
assertThat(udf.regexpSplit("x", ""), contains("x"));
}

@Test
public void shouldSplitStringByGivenRegexp() {
assertThat(udf.regexpSplit("x-y", "-"), contains("x", "y"));
assertThat(udf.regexpSplit("x-y", "x"), contains("", "-y"));
assertThat(udf.regexpSplit("x-y", "y"), contains("x-", ""));
assertThat(udf.regexpSplit("a-b-c-d", "-"), contains("a", "b", "c", "d"));

assertThat(udf.regexpSplit("x-y", "."), contains("", "", "", ""));
assertThat(udf.regexpSplit("a-b-c-b-d", ".b."), contains("a", "c", "d"));
assertThat(udf.regexpSplit("a-b-c", "^.."), contains("", "b-c"));
assertThat(udf.regexpSplit("a-b-ccatd-ecatf", "(-|cat)"),
contains("a", "b", "c", "d", "e", "f"));
}

@Test
public void shouldSplitAndAddEmptySpacesIfRegexpIsFoundAtTheBeginningOrEnd() {
assertThat(udf.regexpSplit("-A", "-"), contains("", "A"));
assertThat(udf.regexpSplit("-A-B", "-"), contains("", "A", "B"));
assertThat(udf.regexpSplit("A-", "-"), contains("A", ""));
assertThat(udf.regexpSplit("A-B-", "-"), contains("A", "B", ""));
assertThat(udf.regexpSplit("-A-B-", "-"), contains("", "A", "B", ""));

assertThat(udf.regexpSplit("A", "^"), contains("A"));
assertThat(udf.regexpSplit("A", "$"), contains("A"));
assertThat(udf.regexpSplit("AB", "^"), contains("AB"));
assertThat(udf.regexpSplit("AB", "$"), contains("AB"));
}

@Test
public void shouldSplitAndAddEmptySpacesIfRegexIsFoundInContiguousPositions() {
assertThat(udf.regexpSplit("A--A", "-"), contains("A", "", "A"));
assertThat(udf.regexpSplit("z--A--z", "-"), contains("z", "", "A", "", "z"));
assertThat(udf.regexpSplit("--A--A", "-"), contains("", "", "A", "", "A"));
assertThat(udf.regexpSplit("A--A--", "-"), contains("A", "", "A", "", ""));

assertThat(udf.regexpSplit("aababa", "ab"), contains("a", "", "a"));
assertThat(udf.regexpSplit("aababa", "(ab)+"), contains("a", "a"));
assertThat(udf.regexpSplit("aabcda", "(ab|cd)"), contains("a", "", "a"));
}

@Test(expected = KsqlFunctionException.class)
public void shouldThrowOnInvalidPattern() {
udf.regexpSplit("abcd", "(()");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (K STRING KEY, INPUT_STRING STRING, PATTERN STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`K` STRING KEY, `INPUT_STRING` STRING, `PATTERN` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n REGEXP_SPLIT_TO_ARRAY(TEST.INPUT_STRING, TEST.PATTERN) EXTRACTED\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`K` STRING KEY, `EXTRACTED` ARRAY<STRING>",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`K` STRING KEY, `INPUT_STRING` STRING, `PATTERN` STRING"
},
"keyColumnNames" : [ "K" ],
"selectExpressions" : [ "REGEXP_SPLIT_TO_ARRAY(INPUT_STRING, PATTERN) AS EXTRACTED" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}

0 comments on commit 3766129

Please sign in to comment.